Constructor which accepts a custom throttler. (For standard throttling behaviour, based on the number of busy tasks, use the other ctor.)
Constructor
Adds an input stream (which must implement ISuspendable) to the set of streams which are to be throttled. If it is already in the set, nothing happens.
Get the task pool.
Method to be called to start processing a record newly received from a stream.
Removes an input stream (which must implement ISuspendable) from the set of streams which are be throttled. If it is not in the set, nothing happens.
Pool of tasks used for processing stream data. Size of the pool is configured from StreamProcessor constructor and defines how fast the data can be consumed from streams (together with system-wide worker fiber count configured in scheduler).
application-defined task class. Must have a copyArguments method whose arguments == the set of parameters expected from a data item read from the throttled stream(s)
void example ( ) { static class MyProcessingTask : Task { import ocean.core.Array; // The task requires a single array as context, which is copied from // the outside by `copyArguments()` ubyte[] buffer; public void copyArguments ( ubyte[] data ) { this.buffer.copy(data); } override public void run ( ) { // Do something with the context and return when the task is // finished. Use `this.resume()` and `this.suspend()` to // control the execution of the bound worker fiber, if required } override public void recycle ( ) { this.buffer.length = 0; assumeSafeAppend(this.buffer); } } auto throttler_config = ThrottlerConfig(5, 1); auto stream_processor = new StreamProcessor!(MyProcessingTask)(throttler_config); // Set of input streams. In this example there are none. In your // application there should be more than none. ISuspendable[] input_streams; foreach ( input_stream; input_streams ) stream_processor.addStream(input_stream); // An imaginary record arrives from one of the input streams and is // passed to the process() method. Arguments expected by `process` // method are identical to arguments expected by `copyArguments` method // of your task class ubyte[] record = [ 1, 2, 3 ]; stream_processor.process(record); theScheduler.eventLoop(); }
Class that handles distribution of data read from streams over a set of tasks in a task pool. The developer must define their own task class to do the work required to handle one record read from the streams; the stream processor takes care of throttling the input streams and distributing the resulting work over the pools of tasks / fibers.