-
Notifications
You must be signed in to change notification settings - Fork 0
Using pipelines
A GLADOS pipeline is a connection of stages. Each stage runs in its own thread; all of these threads are managed by the pipeline.
There are two types of pipelines:
- The simple
pipeline
. Use this if all of the data is processed at once. - The
task_pipeline
. Use this if the data has to be divided up into tasks due to constraints on memory, the number of GPUs or scalability.
To create a pipeline
in your program, define a variable as follows:
#include <glados/pipeline/pipeline.h>
// ...
auto pipeline = glados::pipeline::pipeline{};
Before creating a task_pipeline
you need to define an arbitrary task_type
in your program and generate a number of tasks according to your needs. These tasks need to be placed into a std::queue
. This queue is then transformed into a task_queue
which in turn is passed to the task_pipeline
:
#include <glados/pipeline/pipeline.h>
// ...
auto tasks = generate_tasks(); // returns a std::queue
// note the '&&'
auto&& task_queue = glados::pipeline::task_queue<task_type>{tasks};
auto pipeline = glados::pipeline::task_pipeline<task_type>{task_queue};
All pipeline types expose the same behaviour with regard to stage management. Stages are created by calling make_stage
:
auto source = pipeline.make_stage<source_stage>(/* params */);
auto h2d = pipeline.make_stage<host2device>(/* params */);
auto filter = pipeline.make_stage<filter_stage>(/* params */);
auto d2h = pipeline.make_stage<device2host>(/* params */);
auto sink = pipeline.make_stage<sink_stage>(/* params */);
If you want to pass an input limit to one or more stages (input of data will result in blocking the calling thread), define a variable of type std::size_t
and pass it as first parameter to make_stage
:
auto limit = std::size_t{5};
auto filter = pipeline.make_stage<filter_stage>(limit, /* params */ );
Connecting stages is as trivial as passing the individual stages in the right order:
// this will establish the connection 'source -> h2d -> filter -> d2h -> sink'
pipeline.connect(source, h2d, filter, d2h, sink);
To run the stages, exchange connect
with run
:
pipeline.run(source, h2d, filter, d2h, sink);
After launching the stage threads the run
function immediately returns to the user, thus exposing asynchronous behaviour. Synchronization is done by calling the wait
function:
// wait until all stages have completed their tasks
pipeline.wait();