1 /*******************************************************************************
2 
3     Framework for reading from a stream and throttling based on the progress of
4     processing the received data.
5 
6     It is a relatively simple utility built on top of a task pool, the
7     scheduler, and `ISuspendable`, to provide "ready to go" functionality to be
8     used in applications.
9 
10     Usage example:
11         See the documented unittest of the `StreamProcessor` class
12 
13     Copyright:
14         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
15         All rights reserved.
16 
17     License:
18         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
19         Alternatively, this file may be distributed under the terms of the Tango
20         3-Clause BSD License (see LICENSE_BSD.txt for details).
21 
22 *******************************************************************************/
23 
24 module ocean.task.util.StreamProcessor;
25 
26 
27 import ocean.meta.types.Qualifiers;
28 
29 import ocean.task.Task;
30 import ocean.task.ThrottledTaskPool;
31 import ocean.task.IScheduler;
32 
33 import ocean.meta.types.Function /* : ParametersOf */;
34 import ocean.core.Enforce;
35 import ocean.text.convert.Formatter;
36 import ocean.io.model.ISuspendable;
37 import ocean.io.model.ISuspendableThrottler;
38 
39 debug (TaskScheduler)
40 {
41     import ocean.io.Stdout;
42 }
43 
44 version (unittest)
45 {
46     import ocean.core.Test;
47 }
48 
49 /*******************************************************************************
50 
51     Config struct to use when creating a stream processor that should use
52     the default PoolThrottler for throttling.
53 
54 *******************************************************************************/
55 
56 public struct ThrottlerConfig
57 {
58     /***************************************************************************
59 
60         The number of busy tasks to suspend at.
61 
62         The default value of `size_t.max` means that the suspend point will be
63         calculated based on the task queue size in the constructor.
64 
65     ***************************************************************************/
66 
67     size_t suspend_point = size_t.max;
68 
69     /***************************************************************************
70 
71         The number of busy tasks to resume at.
72 
73         The default value of `size_t.max` means that the resume point will be
74         calculated based on the task queue size in the constructor.
75 
76     ***************************************************************************/
77 
78     size_t resume_point = size_t.max;
79 }
80 
81 /*******************************************************************************
82 
83     Class that handles distribution of data read from streams over a set of
84     tasks in a task pool. The developer must define their own task class to do
85     the work required to handle one record read from the streams; the stream
86     processor takes care of throttling the input streams and distributing the
87     resulting work over the pools of tasks / fibers.
88 
89     Params:
90         TaskT = application-defined task class. Must have a `copyArguments`
91             method whose arguments == the set of parameters expected from a data
92             item read from the throttled stream(s)
93 
94 *******************************************************************************/
95 
96 class StreamProcessor ( TaskT : Task )
97 {
98     /***************************************************************************
99 
100         Exception thrown when incorrect stream processor state is met that
101         should never happen with valid throttling algorithm.
102 
103     ***************************************************************************/
104 
105     private ThrottlerFailureException throttler_failure_e;
106 
107     /***************************************************************************
108 
109         Pool of tasks used for processing stream data. Size of the pool is
110         configured from StreamProcessor constructor and defines how fast
111         the data can be consumed from streams (together with system-wide
112         worker fiber count configured in scheduler).
113 
114     ***************************************************************************/
115 
116     protected ThrottledTaskPool!(TaskT) task_pool;
117 
118     /***************************************************************************
119 
120         Common constructor code
121 
122     ***************************************************************************/
123 
124     private this ( )
125     {
126         this.throttler_failure_e = new ThrottlerFailureException;
127     }
128 
129     /***************************************************************************
130 
131         Constructor which accepts a custom throttler. (For standard throttling
132         behaviour, based on the number of busy tasks, use the other ctor.)
133 
134         Params:
135             throttler = custom throttler to use.
136 
137     ***************************************************************************/
138 
139     public this ( ISuspendableThrottler throttler )
140     {
141         this();
142         this.task_pool = new ThrottledTaskPool!(TaskT)(throttler);
143     }
144 
145     /***************************************************************************
146 
147         Constructor
148 
149         NB: configure suspend point so that there is always at least one
150             "extra" spare task in the pool available after the limit is
151             reached. This is necessary because throttling happens in the
152             end of the task, not after it finishes and gets recycled.
153 
154         Params:
155             throttler_config = The throttler configuration
156 
157     ***************************************************************************/
158 
159     public this ( ThrottlerConfig throttler_config )
160     {
161         this();
162 
163         auto total = theScheduler.getStats().task_queue_total;
164 
165         if (throttler_config.suspend_point == size_t.max)
166             throttler_config.suspend_point = total / 3 * 2;
167         enforce(
168             this.throttler_failure_e,
169             throttler_config.suspend_point < total,
170             format(
171                 "Trying to configure StreamProcessor with suspend point ({}) " ~
172                     "larger or equal to task queue size {}",
173                 throttler_config.suspend_point, total
174             )
175         );
176 
177         if (throttler_config.resume_point == size_t.max)
178             throttler_config.resume_point = total / 5;
179         enforce(
180             this.throttler_failure_e,
181             throttler_config.resume_point < total,
182             format(
183                 "Trying to configure StreamProcessor with resume point ({}) " ~
184                     "larger or equal to task queue size {}",
185                 throttler_config.resume_point, total
186             )
187         );
188 
189         this.task_pool = new ThrottledTaskPool!(TaskT)(throttler_config.suspend_point, throttler_config.resume_point);
190     }
191 
192     /***************************************************************************
193 
194         Method to be called to start processing a record newly received from a
195         stream.
196 
197         Params:
198             args = set of arguments to supply to processing task
199 
200         Throws:
201             ThrottlerFailureException if it is not possible to process data
202             because task pool limit is reached.
203 
204     ***************************************************************************/
205 
206     public void process ( ParametersOf!(TaskT.copyArguments) args )
207     {
208         if (!this.task_pool.start(args))
209         {
210             enforce(this.throttler_failure_e, false,
211                 "Throttler failure resulted in an attempt to process record " ~
212                 "with task pool full");
213         }
214     }
215 
216     /***************************************************************************
217 
218         Adds an input stream (which must implement ISuspendable) to the set of
219         streams which are to be throttled. If it is already in the set, nothing
220         happens.
221 
222         Params:
223             s = suspendable input stream to be throttled
224 
225     ***************************************************************************/
226 
227     public void addStream ( ISuspendable s )
228     {
229         this.task_pool.throttler.addSuspendable(s);
230     }
231 
232     /***************************************************************************
233 
234         Removes an input stream (which must implement ISuspendable) from the set
235         of streams which are be throttled. If it is not in the set, nothing
236         happens.
237 
238         Params:
239             s = suspendable input stream to stop throttling
240 
241     ***************************************************************************/
242 
243     public void removeStream ( ISuspendable s )
244     {
245         this.task_pool.throttler.removeSuspendable(s);
246     }
247 
248     /***************************************************************************
249 
250         Get the task pool.
251 
252         Returns:
253             The task pool
254 
255     ***************************************************************************/
256 
257     public ThrottledTaskPool!(TaskT) getTaskPool ( )
258     {
259         return this.task_pool;
260     }
261 }
262 
263 ///
264 unittest
265 {
266     void example ( )
267     {
268         static class MyProcessingTask : Task
269         {
270             import ocean.core.Array;
271 
272             // The task requires a single array as context, which is copied from
273             // the outside by `copyArguments()`
274             ubyte[] buffer;
275 
276             public void copyArguments ( ubyte[] data )
277             {
278                 this.buffer.copy(data);
279             }
280 
281             override public void run ( )
282             {
283                 // Do something with the context and return when the task is
284                 // finished. Use `this.resume()` and `this.suspend()` to
285                 // control the execution of the bound worker fiber, if required
286             }
287 
288             override public void recycle ( )
289             {
290                 this.buffer.length = 0;
291                 assumeSafeAppend(this.buffer);
292             }
293         }
294 
295         auto throttler_config = ThrottlerConfig(5, 1);
296         auto stream_processor = new StreamProcessor!(MyProcessingTask)(throttler_config);
297 
298         // Set of input streams. In this example there are none. In your
299         // application there should be more than none.
300         ISuspendable[] input_streams; foreach ( input_stream; input_streams )
301             stream_processor.addStream(input_stream);
302 
303         // An imaginary record arrives from one of the input streams and is
304         // passed to the process() method. Arguments expected by `process`
305         // method are identical to arguments expected by `copyArguments` method
306         // of your task class
307         ubyte[] record = [ 1, 2, 3 ]; stream_processor.process(record);
308 
309         theScheduler.eventLoop();
310     }
311 }
312 
313 
314 /*******************************************************************************
315 
316     Exception that indicates that used throttler doesn't work as intended
317 
318 *******************************************************************************/
319 
320 public class ThrottlerFailureException : Exception
321 {
322     /***************************************************************************
323 
324         Constructor. Does not setup any exception information, relies on
325         `enforce` to do it instead.
326 
327     ***************************************************************************/
328 
329     private this ( )
330     {
331         super("", "", 0);
332     }
333 }
334 
335 private void debug_trace ( T... ) ( cstring format, T args )
336 {
337     debug ( TaskScheduler )
338     {
339         Stdout.formatln( "[ocean.task.util.StreamProcessor] " ~ format, args ).flush();
340     }
341 }