StreamProcessor

Class that handles distribution of data read from streams over a set of tasks in a task pool. The developer must define their own task class to do the work required to handle one record read from the streams; the stream processor takes care of throttling the input streams and distributing the resulting work over the pools of tasks / fibers.

Constructors

this
this(ISuspendableThrottler throttler)

Constructor which accepts a custom throttler. (For standard throttling behaviour, based on the number of busy tasks, use the other ctor.)

this
this(ThrottlerConfig throttler_config)

Constructor

Members

Functions

addStream
void addStream(ISuspendable s)

Adds an input stream (which must implement ISuspendable) to the set of streams which are to be throttled. If it is already in the set, nothing happens.

getTaskPool
ThrottledTaskPool!(TaskT) getTaskPool()

Get the task pool.

process
void process(ParametersOf!(TaskT.copyArguments) args)

Method to be called to start processing a record newly received from a stream.

removeStream
void removeStream(ISuspendable s)

Removes an input stream (which must implement ISuspendable) from the set of streams which are be throttled. If it is not in the set, nothing happens.

Variables

task_pool
ThrottledTaskPool!(TaskT) task_pool;

Pool of tasks used for processing stream data. Size of the pool is configured from StreamProcessor constructor and defines how fast the data can be consumed from streams (together with system-wide worker fiber count configured in scheduler).

Parameters

TaskT

application-defined task class. Must have a copyArguments method whose arguments == the set of parameters expected from a data item read from the throttled stream(s)

Examples

void example ( )
{
    static class MyProcessingTask : Task
    {
        import ocean.core.Array;

        // The task requires a single array as context, which is copied from
        // the outside by `copyArguments()`
        ubyte[] buffer;

        public void copyArguments ( ubyte[] data )
        {
            this.buffer.copy(data);
        }

        override public void run ( )
        {
            // Do something with the context and return when the task is
            // finished. Use `this.resume()` and `this.suspend()` to
            // control the execution of the bound worker fiber, if required
        }

        override public void recycle ( )
        {
            this.buffer.length = 0;
            assumeSafeAppend(this.buffer);
        }
    }

    auto throttler_config = ThrottlerConfig(5, 1);
    auto stream_processor = new StreamProcessor!(MyProcessingTask)(throttler_config);

    // Set of input streams. In this example there are none. In your
    // application there should be more than none.
    ISuspendable[] input_streams; foreach ( input_stream; input_streams )
        stream_processor.addStream(input_stream);

    // An imaginary record arrives from one of the input streams and is
    // passed to the process() method. Arguments expected by `process`
    // method are identical to arguments expected by `copyArguments` method
    // of your task class
    ubyte[] record = [ 1, 2, 3 ]; stream_processor.process(record);

    theScheduler.eventLoop();
}

Meta