#include #include #include using namespace ff; static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; class Sieve: public ff_node { public: Sieve() { filter = 0; } void * svc(void * task) { unsigned int * t = (unsigned int *)task; if (filter == 0) { filter = *t; return GO_ON; } else { if(*t % filter == 0) return GO_ON; else return task; } } void svc_end() { pthread_mutex_lock(&lock); std::cout << "Prime(" << filter << ")\n"; pthread_mutex_unlock(&lock); return; } private: int filter; }; class Generate: public ff_node { public: Generate(int n) { streamlen = n; task = 2; pthread_mutex_lock(&lock); std::cout << "Generate object created" << std::endl; pthread_mutex_unlock(&lock); return; } int svc_init() { pthread_mutex_lock(&lock); std::cout << "Sieve started. Generating a stream of " << streamlen << " elements, starting with " << task << std::endl; pthread_mutex_unlock(&lock); return 0; } void * svc(void * tt) { unsigned int * t = (unsigned int *)tt; if(task < streamlen) { int * xi = (int *) calloc(1, sizeof(int)); *xi = task++; return xi; } else { return NULL; } } private: int streamlen; int task; }; class Printer: public ff_node { int svc_init() { pthread_mutex_lock(&lock); std::cout << "Printer started " << std::endl; pthread_mutex_unlock(&lock); first = 0; } void * svc(void *t) { int * xi = (int *) t; if (first == 0) { first = *xi; } return GO_ON; } void svc_end() { pthread_mutex_lock(&lock); std::cout << "Sieve terminating, prime numbers found up to " << first << std::endl; pthread_mutex_unlock(&lock); } private: int first; }; int main(int argc, char * argv[]) { if (argc!=3) { std::cerr << "use: " << argv[0] << " nstages streamlen\n"; return -1; } ff_pipeline pipe; int nstages = atoi(argv[1]); pipe.add_stage(new Generate(atoi(argv[2]))); for(int j=0; j