Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:samplefastflowcode

FastFlow sample code

This is the code used during the FastFlow lesson. All code should be compiled with a command such as

 g++ -Idir-where-ff-dir-has-been-saved -lpthread -o file file.cpp
pipe.cpp
// we include farm, pipeline and allocator hpp files 
// as we use only this three features of FastFlow
#include <ff/node.hpp>
#include <ff/pipeline.hpp>
#include <ff/allocator.hpp>
 
// we use the FastFlow namespace 
using namespace ff; 
 
// we use the fastflow mem allocator
// should be initialized (main) before actually using it
static ff_allocator allocator; 
 
// we implement a pipeline application here:
//
//  Pipeline
//
//    +------+    +------+  +------+  +------+
//    |Stage1| -> |Stage2|->|Stage3|->|Stage4|
//    +------+    +------+  +------+  +------+
//
//  Stage1 -> generate an integer stream
//  StageN -> prints the input stream
//  StageK -> increases integers
//
 
// definition of the generic inc stage
// subclass ff_node (seq code wrapper)
 
class IntIncStage:public ff_node {
 
  // method called on initialization (optional) 
  int svc_init() {
    std::cout << "IntIncStage " << ff_node::get_my_id() 
              << " inited !" << std::endl;
  }
 
  // method wrapping "task computation code"
  // for each task on the input stream, this method is called
  // and it returns the data to be placed onto the output stream
  // in and out data are pointers, as usual
  void * svc (void * task) {
    int * i = (int *) task; 
    std::cout << "Stage " << ff_node::get_my_id() 
              << " got task " << *i ;
    (*i)++; 
    std::cout << " computed " << *i << std::endl;
    return task; 
  }
 
  // this  is called before termination
  // (optional) 
  void svc_end() {
    std::cout << "Stage " << ff_node::get_my_id() 
	      << " terminating " << std::endl;
  }
};
 
// last stage: print in stream contents
// it's sequential => subclass ff_node
class DrainStage:public ff_node {
 
  // print initialization message
  int svc_init() {
    std::cout << "DrainStage " << ff_node::get_my_id() 
              << " inited !" << std::endl;
  }
 
  // stage body: actual wrapping the sequential code
  void * svc (void * task) {
    int * i = (int *) task;
    std::cout << "Stage " << ff_node::get_my_id() 
              << " got result " << *i << std::endl;
    allocator.free(task);
    return task;
  }
};
 
// stage generating the stream
// seq => subclass ff_node
// to output data items on the output stream uses ff_send_out()
// to terminate the stream, outputs a FF_EOS
// to terminate, returns a NULL
class GenerateStream:public ff_node {
 
private:
  int ntasks; 
 
public:
  // constructor: used to pass the stream "size"
  GenerateStream(int ntasks):ntasks(ntasks) { }
 
  int svc_init() {
    std::cout << "GenerateStream(" << ntasks << ") " 
              << ff_node::get_my_id() << " inited !" << std::endl;
  }
 
  // Stage body: output a stream with ntasks, ntasks-1, ntasks-2, ... 
  // down to 0
  // then output the FF_EOS (end of stream)
  // and terminate (return NULL) 
 
  void * svc(void * task) {
    while(ntasks != 0) {
      int * tt = (int *) allocator.malloc(sizeof(int));
      *tt = ntasks;
      ff::ff_node::ff_send_out((void *) tt);
      ntasks--;
      std::cout << "GenerateStream(" << ntasks << ")" << std::endl;
    }
    ff_send_out(EOS); 
    return NULL; 
  }
 
};
 
 
int main(int argc, char * argv[]) {
  int tasks = 10;  // dummy, should be taken from the input line
 
  // allocator must be initialized before using it
  allocator.init();
 
  // declare a pipeline object
  ff_pipeline pipe_a; 
  // add stages in order: 
  //    the first stage added is the first pipeline stage
  //    the i-th stage added is the i-th pipeline stage
  pipe_a.add_stage(new GenerateStream(tasks)); 
  pipe_a.add_stage(new IntIncStage); 
  pipe_a.add_stage(new IntIncStage); 
  pipe_a.add_stage(new DrainStage); 
 
 
  // now we ca run the application: 
  std::cout << "Starting application ..." << std::endl;
  pipe_a.run();
  std::cout << "Application started" << std::endl;
 
  // here more (unrelated) work can be performed ... 
  // when results are needed we should wait for application termination
  pipe_a.wait();
  // alternatively, if we have nothing else to do, we can issue a single 
  // call such as:     pipe_a.run_and_wait_end();
  // in this case the pipeline is started and its termination awaited 
  // synchronously 
  std::cout << "Application terminated" << std::endl;
  return(0); 
}

In case we want to parallelize the increase integers stage with a farm, we can keep the same code and use the following main code:

altmain.cpp
int main(int argc, char * argv[]) {
  int tasks = 10;  // dummy, should be taken from the input line
 
  // allocator must be initialized before using it
  allocator.init();
 
  // declare a pipeline object
  ff_pipeline pipe_a; 
  // add stages in order: 
  //    the first stage added is the first pipeline stage
  //    the i-th stage added is the i-th pipeline stage
  pipe_a.add_stage(new GenerateStream(tasks)); 
 
  // we create a farm to process increments in the second stage
  // declare a ff_farm object
  ff_farm<> farm;
  // default collector
  farm.add_collector(NULL); 
  // declare worker vector (string)
  std::vector<ff_node *> workers;
  // two workers
  workers.push_back(new IntIncStage);
  workers.push_back(new IntIncStage);
  // add workers to farm
  farm.add_workers(workers);
  // second pipeline stage is the farm
  pipe_a.add_stage(&farm); 
  // the third one is the print stream
  pipe_a.add_stage(new DrainStage); 
 
 
  // now we ca run the application: 
  std::cout << "Starting application ..." << std::endl;
  pipe_a.run();
  std::cout << "Application started" << std::endl;
 
  // here more (unrelated) work can be performed ... 
  // when results are needed we should wait for application termination
  pipe_a.wait();
  // alternatively, if we have nothing else to do, we can issue a single 
  // call such as:     pipe_a.run_and_wait_end();
  // in this case the pipeline is started and its termination awaited 
  // synchronously 
  std::cout << "Application terminated" << std::endl;
  return(0); 
}

Sample farm code (taken from FF tests/ directory):

farm.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
 
/**
 
 Very basic test for the FastFlow farm.
 
*/
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
 
using namespace ff;
 
// generic worker
class Worker: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        std::cout << "Worker " << ff_node::get_my_id() 
                  << " received task " << *t << "\n";
        return task;
    }
    // I don't need the following functions for this test
    //int   svc_init() { return 0; }
    //void  svc_end() {}
 
};
 
// the gatherer filter
class Collector: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        if (*t == -1) return NULL;
        return task;
    }
};
 
// the load-balancer filter
class Emitter: public ff_node {
public:
    Emitter(int max_task):ntask(max_task) {};
 
    void * svc(void *) {        
        int * task = new int(ntask);
        --ntask;
        if (ntask<0) return NULL;
        return task;
    }
private:
    int ntask;
};
 
 
int main(int argc, char * argv[]) {
 
    if (argc<3) {
        std::cerr << "use: " 
                  << argv[0] 
                  << " nworkers streamlen\n";
        return -1;
    }
 
    int nworkers=atoi(argv[1]);
    int streamlen=atoi(argv[2]);
 
    if (!nworkers || !streamlen) {
        std::cerr << "Wrong parameters values\n";
        return -1;
    }
 
    ff_farm<> farm; // farm object
 
    Emitter E(streamlen);
    farm.add_emitter(&E);
 
    std::vector<ff_node *> w;
    for(int i=0;i<nworkers;++i) w.push_back(new Worker);
    farm.add_workers(w); // add all workers to the farm
 
    Collector C;
    farm.add_collector(&C);
 
    if (farm.run_and_wait_end()<0) {
        error("running farm\n");
        return -1;
    }
    std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n";
    farm.ffStats(std::cerr);
 
    return 0;
}

Sample accelerator code:

accelerator.cpp
#include <iostream>
#include <math.h>
 
#include <ff/farm.hpp>
#include <ff/node.hpp>
 
using namespace std;
using namespace ff; 
 
 
// should be global to be accessible from workers 
#define MAX 1024
double x[MAX];
double y[MAX];
 
#define ITERNO 100000
int iterno = ITERNO; 
 
class Worker: public ff_node {
  int svc_init() {
    cout << "Worker initialized" << endl; 
    return 0; 
  }
 
  void * svc(void * in) {
    int i = * ((int *) in);
    // cout << "Computing " << i << endl;
    for(int j=0; j<iterno; j++) 
       x[i] = sin(x[i]);
    y[i] += x[i];
    return in; 
  }
};
 
 
#define NW 2 
 
int main(int argc, char * argv[]) 
{
  ffTime(START_TIME);
 
  cout << "init " << argc  << endl; 
  int nw = (argc==1 ? NW : atoi(argv[1]));
  iterno = (argc<=2 ? ITERNO : atoi(argv[2]));
 
  cout << "using " << nw << " workers iterating " << iterno << " times "<< endl; 
 
  // init input (fake)
  for(int i=0; i<MAX; i++) {
    x[i] = (double)(i*10);
    y[i] = (double)(-i*13);
  }
  cout << "Setting up farm" << endl; 
  ff_farm<> farm(true,nw);    // true for offloading, NW for the number of workers ... 
 
  std::vector<ff_node *> w;   // prepare workers
  for(int i=0;i<nw;++i) 
    w.push_back(new Worker);
  farm.add_workers(w);        // add them to the farm
  farm.run_then_freeze();     // run farm asynchronously
 
  cout << "Sending tasks ..." << endl; 
  int tasks[MAX]; 
  for(int i=0; i<MAX; i++) {
     tasks[i]=i;
     farm.offload((void *) &tasks[i]);
  }
  farm.offload((void *) FF_EOS);
 
  cout << "Waiting termination" << endl;
  farm.wait();
 
  cout << "Farm terminated after computing for " << farm.ffTime() << endl; 
 
  ffTime(STOP_TIME);
  cout << "Spent overall " << ffTime(GET_TIME) << endl;
 
}
magistraleinformaticanetworking/spm/samplefastflowcode.txt · Ultima modifica: 19/05/2011 alle 14:33 (13 anni fa) da Marco Danelutto