Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:ffhwpipe

Sample pipeline applications in FastFlow

The code appearing on this page is commented in the course notes PDF, Appendix A: FastFlow.

This is the code of a single concurrent activity printing an “Hello World”. We use a single stage pipeline without any kind of stream.

hello.cpp
#include <iostream>
#include <ff/pipeline.hpp>
 
using namespace ff;
 
class Stage1: public ff_node {
public:
 
    void * svc(void * task) {
        std::cout << "Hello world" << std::endl;
        return NULL;
    }
};
 
int main(int argc, char * argv[]) {
 
    ff_pipeline pipe;
    pipe.add_stage(new Stage1());
 
    ffTime(START_TIME);
    if (pipe.run_and_wait_end()<0) {
        error("running pipeline\n");
        return -1;
    }
    ffTime(STOP_TIME);
 
    std::cerr << "DONE, pipe  time= " << pipe.ffTime() << " (ms)\n";
    std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n";
    pipe.ffStats(std::cerr);
    return 0;
}

The second version following uses a two stage pipeline: the first stage prints “Hello” then send “world” to be printed by the second stage. The program does not terminate.

hello2stages.cpp
#include <iostream>
#include <ff/pipeline.hpp>
 
using namespace ff;
 
class Stage1: public ff_node {
public:
 
    void * svc(void * task) {
        std::cout << "Hello " << std::endl;
        char * p = (char *) calloc(sizeof(char),10); 
        strcpy(p,"World");
        sleep(1);
        return ((void *)p);
    }
};
 
class Stage2: public ff_node {
public:
 
    void * svc(void * task) {
      std::cout << ((char *)task) << std::endl;
      free(task);
      return GO_ON;
    }
};
 
int main(int argc, char * argv[]) {
 
    ff_pipeline pipe;
    pipe.add_stage(new Stage1());
    pipe.add_stage(new Stage2());
 
    if (pipe.run_and_wait_end()<0) {
        error("running pipeline\n");
        return -1;
    }
 
    return 0;
}

We add termination here:

hello2terminate.cpp
#include <iostream>
#include <ff/pipeline.hpp>
 
using namespace ff;
 
class Stage1: public ff_node {
public:
 
  Stage1() { first = (1==1); }
 
  void * svc(void * task) {
    if(first) {
      std::cout << "Hello " << std::endl;
      char * p = (char *) calloc(sizeof(char),10); 
      strcpy(p,"World");
      sleep(1);
      first = 0; 
      return ((void *)p);
    } else {
      return NULL; 
    }
  }
private:
  int first; 
};
 
class Stage2: public ff_node {
public:
 
    void * svc(void * task) {
      std::cout << ((char *)task) << std::endl;
      free(task);
      return GO_ON;
    }
};
 
int main(int argc, char * argv[]) {
 
    ff_pipeline pipe;
    pipe.add_stage(new Stage1());
    pipe.add_stage(new Stage2());
 
    if (pipe.run_and_wait_end()<0) {
        error("running pipeline\n");
        return -1;
    }
 
    return 0;
}
magistraleinformaticanetworking/spm/ffhwpipe.txt · Ultima modifica: 21/03/2012 alle 18:53 (13 anni fa) da Marco Danelutto

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki