Sample sources

C version with timings (to be compiled with gcc -pthread -lm)

farm.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <math.h>
#include <time.h>
 
pthread_mutex_t mutex;
 
typedef struct __task {
  long n;
  float x;
  struct __task * next;
} TASK;
 
TASK * tasks = NULL;
 
typedef unsigned long long ticks;
 
static __inline__ ticks getticks(void)
{
  unsigned a, d;
  asm volatile("rdtsc" : "=a" (a), "=d" (d));
  return ((ticks)a) | (((ticks)d) << 32);
}
 
 
 
void * worker(void * x) {
  int wno = *((int *) x);
  int * taskno = (int *) calloc(1, sizeof(int));
 
  *taskno = 0;
 
#ifdef DEBUG
  printf("Worker no %d created\n", wno);
#endif
 
  while(1==1) {
    // get a task from the list
    pthread_mutex_lock(&mutex);
    TASK * t = tasks;
    if(tasks!=NULL) {
      tasks = tasks->next;
    }
    pthread_mutex_unlock(&mutex);
    // process
    if(t == NULL)
      break; // nothing else to process
#ifdef DEBUG
    printf("Thread %ld processing task <%ld,%f>\n",pthread_self(), t->n, t->x);
#endif
    {
      long l;
      for(l=0; l<t->n; l++)
        t->x = sin(t->x);
    }
    // write results
#ifdef DEBUG
    printf("Thread %ld computed %f\n",pthread_self(),t->x);
#endif
    (*taskno)++;
  }
  return ((void *) taskno);
}
 
int main(int argc, char * argv[]) {
  int opt, nw, m, i ;
  float ta,tb,elapsed;
  pthread_t *w;
  struct timeval t0, t1;
  clock_t before, after;
  struct timespec c0,c1,p0,p1;
  ticks tck0, tck1;
 
  while((opt = getopt(argc, argv, "n:m:"))!= (-1)) {
    switch(opt) {
    case 'n': {
      nw = atoi(optarg);
      break;
    }
    case 'm': {
      m = atoi(optarg);
      break;
    }
    default: {
      printf("Usage is:\n%s -n pardegree -m streamlenght\n", argv[0]);
      return(0);
    }
    }
  }
 
#ifdef DEBUG
  printf("%d workers and %d tasks\n", nw, m);
#endif
  w = (pthread_t *) calloc(nw, sizeof(pthread_t));
 
  if((pthread_mutex_init(&mutex,NULL)!=0)) {
    printf("Failed to initialize the mutex\n");
    return(-1);
  }
 
  for(i=0; i<m; i++) {
    TASK * t = (TASK *) calloc(1, sizeof(TASK));
    t->x = (float)i ;
    t->n = (long) i*(100000L);
    pthread_mutex_lock(&mutex);   // may be in principle concurrent with task consuming
    t->next = tasks;
    tasks = t;
    pthread_mutex_unlock(&mutex);
  }
 
  before = clock();
  clock_gettime(CLOCK_THREAD_CPUTIME_ID,&c0);
  clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&p0);
  gettimeofday(&t0,NULL);
  tck0 = getticks();
#ifdef DEBUG
  printf("Start %ld %ld \n", t0.tv_sec, t0.tv_usec);
#endif
  for(i=0; i<nw; i++) {
    // create the workers: they'll start working immediately
    if(pthread_create(&w[i],NULL, worker, &i)!=0) {
      printf("Error creating workers\n");
      return(-1);
    }
  }
 
  // workers running: wait termination
  for(i=0; i<nw; i++) {
    void * retcode;
    pthread_join(w[i],&retcode);
    printf("Worker %d terminated computing %d tasks\n",
           i, *((int *) retcode));
    fflush(stdout);
  }
  gettimeofday(&t1,NULL);
  clock_gettime(CLOCK_THREAD_CPUTIME_ID,&c1);
  clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&p1);
  after = clock();
  tck1 = getticks();
#ifdef DEBUG
  printf("Start %ld %ld \n", t1.tv_sec, t1.tv_usec);
#endif
  printf("Elapsed %f msecs\n", (t1.tv_sec-t0.tv_sec)*1000.0 + ((float) (t1.tv_usec - t0.tv_usec))/1000.0);
  printf("Clock cycles %ld (%f msecs)\n", after-before, ((after-before)/ CLOCKS_PER_SEC));
  {
    float tproc =  ((float) (p1.tv_sec - p0.tv_sec))*1000.0 + ((float) (p1.tv_nsec-p0.tv_nsec))/1000000.0;
    float tthread = ((float) (c1.tv_sec - c0.tv_sec))*1000.0 + ((float) (c1.tv_nsec-c0.tv_nsec))/1000000.0;
    printf("clock_gettime (per process) %f msecs\n", tproc);
    printf("clock_gettime (per thread) %f msecs\n", tthread);
    printf("clock_gettime: average per worker %f\n", (tproc - tthread)/nw);
  }
  printf("Ticks %lld\n", tck1-tck0);
 
  printf("Workers terminated\n");
  return(0);
}

C++ version with timings (to be compiled with g++ -std=c++0x -pthread -lm -rt)

Farm.cpp
#include <iostream>
#include <vector>
#include <list>
#include <stdlib.h>
#include <unistd.h>
#include <thread>
#include <cmath>
#include <mutex>
#include <sys/time.h>
 
using namespace std;
 
typedef struct __task {
  float          x;
  long           n;
} farm_task_t;
 
mutex locche;
list<farm_task_t *> tasks;
 
void worker(int no) {
  int taskno = 0;
  thread::id mythreadid = this_thread::get_id();
 
#ifdef DEBUG
  cout << "Worker No. " << no << " started (on thread " << mythreadid << ")" << endl;
#endif
 
  while(true) {
    farm_task_t t;
 
    locche.lock();
    if(!tasks.empty()) {
      t = *(tasks.front());
      tasks.pop_front();
      locche.unlock();
    } else {
      locche.unlock();
      break; // TODO
    }
#ifdef DEBUG
    cout << "Worker no. " << no << " got task " << t.x <<  " " << t.n << endl;
#endif
    taskno++;
    for(long i=0; i<t.n; i++) {
      t.x = sin(t.x);
    }
  }
#ifdef DEBUG
  cout << "Worker " << no << " computed " << taskno << " tasks " << endl;
#endif
  return;
}
 
 
int main(int argc, char * argv[]) {
 
  int opt, nw, ntasks;
  struct timeval t0, t1;
 
  while((opt = getopt(argc, argv, "n:m:"))!= (-1)) {
    switch(opt) {
    case 'n': {
      nw = atoi(optarg);
      break;
    }
    case 'm': {
      ntasks = atoi(optarg);
      break;
    }
    default: {
      printf("Usage is:\n%s -n pardegree -m streamlenght\n", argv[0]);
      return(0);
    }
    }
  }
 
  cout << "Initializing ..." << endl;
 
  for(int i=0; i<ntasks; i++) {
    farm_task_t * t = (farm_task_t *) calloc(1, sizeof(farm_task_t));
    t->x = (float) i;
    t->n = i*100000L;
    tasks.push_back(t);
  }
  cout << "Tasks created ..." << endl;
 
  vector<thread> workers;
 
  gettimeofday(&t0,NULL);
  for(int i=0; i<nw; i++) {
    workers.push_back(thread(bind(worker,i)));
  }
 #ifdef DEBUG
  cout << "Workers created ..." << endl;
#endif
 
#ifdef DEBUG
  cout << "Awaiting worker termination ..." << endl;
#endif
  for(auto &t : workers) {
    t.join();
  }
  gettimeofday(&t1,NULL);
  cout << "All workers terminated " << endl;
  cout << "Elapsed time " <<   (t1.tv_sec-t0.tv_sec)*1000.0 + ((float) (t1.tv_usec - t0.tv_usec))/1000.0 << " msecs " << endl;
  return(0);
}

Version C++/Phtread with synchronizations wrapped into a new Queue<T> type:

FarmQueue.cpp
#include <iostream>
#include <queue>
#include <pthread.h>
 
using namespace std; 
 
class Task {
private: 
  int n; 
  float * v;
 
public: 
  Task(int n, float * v):n(n),v(v) {}
 
  float * get() {
    return v;
  }
 
  int length() {
    return n; 
  }
};
 
template <class Task> class Queue {
private: 
  std::queue<Task> tasks;
  pthread_mutex_t lock; 
public: 
  Queue() {
    lock = PTHREAD_MUTEX_INITIALIZER;
  }
 
  ~Queue() {
    // TBD
  }
 
  void send(Task m) {
    pthread_mutex_lock(&lock); 
    tasks.push(m);
    pthread_mutex_unlock(&lock);
    return; 
  }
 
  Task receive() {
    pthread_mutex_lock(&lock);
    Task t = (Task) tasks.front();
    tasks.pop();
    pthread_mutex_unlock(&lock); 
    return t;
  }
}; 
 
typedef struct __comms {
  Queue<Task> * in; 
  Queue<Task> * out; 
} COMMS; 
 
Task f(Task x) {
  return x;
}
 
void * body(void * x) {
  COMMS * q = (COMMS *) x; 
  int * i = new int();
 
  while(true) {
    Task t = (q->in)->receive();
    if(t.length() < 0) 
      break; // EOS
    Task r = f(t); 
    (*i)++;
    q->out->send(r);
  }
  return ((void *) i);
}
 
int main(int argc, char * argv []) {
 
  Queue<Task> tasks;
  Queue<Task> ress; 
  COMMS c; 
  c.in = &tasks;
  c.out = &ress;
 
  int nw = atoi(argv[1]);
  pthread_t * tid = new pthread_t[nw];
 
  for(int i=0; i<nw; i++) {
    int ret = pthread_create(&tid[i], NULL, body, (void *) &c);
    if(ret != 0) {
      // TBD
      cerr << "error creating thread " << i << endl; 
    }
  }
  return(0);
}