Heratosthene Sieve in FastFlow

This computes the prime numbers using a variable number of stages. Printing is protected through a Pthread lock.

sievelock.cpp
#include <iostream>
#include <ff/pipeline.hpp>
#include <pthread.h>
 
using namespace ff;
 
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; 
 
class Sieve: public ff_node {
public:
 
  Sieve() { filter = 0; }
 
  void * svc(void * task) {
    unsigned int * t = (unsigned int *)task;
 
    if (filter == 0) {
      filter = *t; 
      return GO_ON;
    } else {
      if(*t % filter == 0) 
	return GO_ON;
      else 
	return task;
    }
  }
 
  void svc_end() { 
    pthread_mutex_lock(&lock);
    std::cout << "Prime(" << filter << ")\n";
    pthread_mutex_unlock(&lock);
    return; 
  }
 
 
private:
  int filter; 
};
 
class Generate: public ff_node {
public:
 
  Generate(int n) {
    streamlen = n; 
    task = 2;
    pthread_mutex_lock(&lock);
    std::cout << "Generate object created" << std::endl; 
    pthread_mutex_unlock(&lock);
    return;
  }
 
 
  int svc_init() {
    pthread_mutex_lock(&lock);
    std::cout << "Sieve started. Generating a stream of " << streamlen << 
      " elements, starting with " << task << std::endl;
    pthread_mutex_unlock(&lock);
    return 0; 
  }
 
  void * svc(void * tt) {
    unsigned int * t = (unsigned int *)tt;
 
    if(task < streamlen) {
      int * xi = (int *) calloc(1, sizeof(int));
      *xi = task++;
      return xi;
    } else {
      return NULL; 
    } 
  }
private: 
  int streamlen;
  int task;
};
 
class Printer: public ff_node {
 
  int svc_init() {
    pthread_mutex_lock(&lock);
    std::cout << "Printer started " << std::endl;
    pthread_mutex_unlock(&lock);
    first = 0; 
  }
 
  void * svc(void *t) {
    int * xi = (int *) t;
    if (first == 0) {
      first = *xi; 
    }
    return GO_ON;
  }
 
  void svc_end() {
    pthread_mutex_lock(&lock);
    std::cout << "Sieve terminating, prime numbers found up to " << first 
	      << std::endl;
    pthread_mutex_unlock(&lock);
  }
 
private: 
  int first;
};
 
int main(int argc, char * argv[]) {
  if (argc!=3) {
    std::cerr << "use: "  << argv[0] << " nstages streamlen\n";
    return -1;
  }
 
  ff_pipeline pipe;
  int nstages = atoi(argv[1]);  
  pipe.add_stage(new Generate(atoi(argv[2])));
  for(int j=0; j<nstages; j++)
    pipe.add_stage(new Sieve());
  pipe.add_stage(new Printer());
 
  ffTime(START_TIME);
  if (pipe.run_and_wait_end()<0) {
    error("running pipeline\n");
    return -1;
  }
  ffTime(STOP_TIME);
 
  std::cerr << "DONE, pipe  time= " << pipe.ffTime() << " (ms)\n";
  std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n";
  pipe.ffStats(std::cerr);
  return 0;
}