===== Sample sources ===== C version with timings (to be compiled with **gcc -pthread -lm**) #include #include #include #include #include #include pthread_mutex_t mutex; typedef struct __task { long n; float x; struct __task * next; } TASK; TASK * tasks = NULL; typedef unsigned long long ticks; static __inline__ ticks getticks(void) { unsigned a, d; asm volatile("rdtsc" : "=a" (a), "=d" (d)); return ((ticks)a) | (((ticks)d) << 32); } void * worker(void * x) { int wno = *((int *) x); int * taskno = (int *) calloc(1, sizeof(int)); *taskno = 0; #ifdef DEBUG printf("Worker no %d created\n", wno); #endif while(1==1) { // get a task from the list pthread_mutex_lock(&mutex); TASK * t = tasks; if(tasks!=NULL) { tasks = tasks->next; } pthread_mutex_unlock(&mutex); // process if(t == NULL) break; // nothing else to process #ifdef DEBUG printf("Thread %ld processing task <%ld,%f>\n",pthread_self(), t->n, t->x); #endif { long l; for(l=0; ln; l++) t->x = sin(t->x); } // write results #ifdef DEBUG printf("Thread %ld computed %f\n",pthread_self(),t->x); #endif (*taskno)++; } return ((void *) taskno); } int main(int argc, char * argv[]) { int opt, nw, m, i ; float ta,tb,elapsed; pthread_t *w; struct timeval t0, t1; clock_t before, after; struct timespec c0,c1,p0,p1; ticks tck0, tck1; while((opt = getopt(argc, argv, "n:m:"))!= (-1)) { switch(opt) { case 'n': { nw = atoi(optarg); break; } case 'm': { m = atoi(optarg); break; } default: { printf("Usage is:\n%s -n pardegree -m streamlenght\n", argv[0]); return(0); } } } #ifdef DEBUG printf("%d workers and %d tasks\n", nw, m); #endif w = (pthread_t *) calloc(nw, sizeof(pthread_t)); if((pthread_mutex_init(&mutex,NULL)!=0)) { printf("Failed to initialize the mutex\n"); return(-1); } for(i=0; ix = (float)i ; t->n = (long) i*(100000L); pthread_mutex_lock(&mutex); // may be in principle concurrent with task consuming t->next = tasks; tasks = t; pthread_mutex_unlock(&mutex); } before = clock(); clock_gettime(CLOCK_THREAD_CPUTIME_ID,&c0); clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&p0); gettimeofday(&t0,NULL); tck0 = getticks(); #ifdef DEBUG printf("Start %ld %ld \n", t0.tv_sec, t0.tv_usec); #endif for(i=0; i C++ version with timings (to be compiled with **g++ -std=c++0x -pthread -lm -rt**) #include #include #include #include #include #include #include #include #include using namespace std; typedef struct __task { float x; long n; } farm_task_t; mutex locche; list tasks; void worker(int no) { int taskno = 0; thread::id mythreadid = this_thread::get_id(); #ifdef DEBUG cout << "Worker No. " << no << " started (on thread " << mythreadid << ")" << endl; #endif while(true) { farm_task_t t; locche.lock(); if(!tasks.empty()) { t = *(tasks.front()); tasks.pop_front(); locche.unlock(); } else { locche.unlock(); break; // TODO } #ifdef DEBUG cout << "Worker no. " << no << " got task " << t.x << " " << t.n << endl; #endif taskno++; for(long i=0; ix = (float) i; t->n = i*100000L; tasks.push_back(t); } cout << "Tasks created ..." << endl; vector workers; gettimeofday(&t0,NULL); for(int i=0; i Version C++/Phtread with synchronizations wrapped into a new Queue type: #include #include #include using namespace std; class Task { private: int n; float * v; public: Task(int n, float * v):n(n),v(v) {} float * get() { return v; } int length() { return n; } }; template class Queue { private: std::queue tasks; pthread_mutex_t lock; public: Queue() { lock = PTHREAD_MUTEX_INITIALIZER; } ~Queue() { // TBD } void send(Task m) { pthread_mutex_lock(&lock); tasks.push(m); pthread_mutex_unlock(&lock); return; } Task receive() { pthread_mutex_lock(&lock); Task t = (Task) tasks.front(); tasks.pop(); pthread_mutex_unlock(&lock); return t; } }; typedef struct __comms { Queue * in; Queue * out; } COMMS; Task f(Task x) { return x; } void * body(void * x) { COMMS * q = (COMMS *) x; int * i = new int(); while(true) { Task t = (q->in)->receive(); if(t.length() < 0) break; // EOS Task r = f(t); (*i)++; q->out->send(r); } return ((void *) i); } int main(int argc, char * argv []) { Queue tasks; Queue ress; COMMS c; c.in = &tasks; c.out = &ress; int nw = atoi(argv[1]); pthread_t * tid = new pthread_t[nw]; for(int i=0; i