1 /******************************************************************************* 2 3 Framework for reading from a stream and throttling based on the progress of 4 processing the received data. 5 6 It is a relatively simple utility built on top of a task pool, the 7 scheduler, and `ISuspendable`, to provide "ready to go" functionality to be 8 used in applications. 9 10 Usage example: 11 See the documented unittest of the `StreamProcessor` class 12 13 Copyright: 14 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 15 All rights reserved. 16 17 License: 18 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 19 Alternatively, this file may be distributed under the terms of the Tango 20 3-Clause BSD License (see LICENSE_BSD.txt for details). 21 22 *******************************************************************************/ 23 24 module ocean.task.util.StreamProcessor; 25 26 27 import ocean.meta.types.Qualifiers; 28 29 import ocean.task.Task; 30 import ocean.task.ThrottledTaskPool; 31 import ocean.task.IScheduler; 32 33 import ocean.meta.types.Function /* : ParametersOf */; 34 import ocean.core.Enforce; 35 import ocean.text.convert.Formatter; 36 import ocean.io.model.ISuspendable; 37 import ocean.io.model.ISuspendableThrottler; 38 39 debug (TaskScheduler) 40 { 41 import ocean.io.Stdout; 42 } 43 44 version (unittest) 45 { 46 import ocean.core.Test; 47 } 48 49 /******************************************************************************* 50 51 Config struct to use when creating a stream processor that should use 52 the default PoolThrottler for throttling. 53 54 *******************************************************************************/ 55 56 public struct ThrottlerConfig 57 { 58 /*************************************************************************** 59 60 The number of busy tasks to suspend at. 61 62 The default value of `size_t.max` means that the suspend point will be 63 calculated based on the task queue size in the constructor. 64 65 ***************************************************************************/ 66 67 size_t suspend_point = size_t.max; 68 69 /*************************************************************************** 70 71 The number of busy tasks to resume at. 72 73 The default value of `size_t.max` means that the resume point will be 74 calculated based on the task queue size in the constructor. 75 76 ***************************************************************************/ 77 78 size_t resume_point = size_t.max; 79 } 80 81 /******************************************************************************* 82 83 Class that handles distribution of data read from streams over a set of 84 tasks in a task pool. The developer must define their own task class to do 85 the work required to handle one record read from the streams; the stream 86 processor takes care of throttling the input streams and distributing the 87 resulting work over the pools of tasks / fibers. 88 89 Params: 90 TaskT = application-defined task class. Must have a `copyArguments` 91 method whose arguments == the set of parameters expected from a data 92 item read from the throttled stream(s) 93 94 *******************************************************************************/ 95 96 class StreamProcessor ( TaskT : Task ) 97 { 98 /*************************************************************************** 99 100 Exception thrown when incorrect stream processor state is met that 101 should never happen with valid throttling algorithm. 102 103 ***************************************************************************/ 104 105 private ThrottlerFailureException throttler_failure_e; 106 107 /*************************************************************************** 108 109 Pool of tasks used for processing stream data. Size of the pool is 110 configured from StreamProcessor constructor and defines how fast 111 the data can be consumed from streams (together with system-wide 112 worker fiber count configured in scheduler). 113 114 ***************************************************************************/ 115 116 protected ThrottledTaskPool!(TaskT) task_pool; 117 118 /*************************************************************************** 119 120 Common constructor code 121 122 ***************************************************************************/ 123 124 private this ( ) 125 { 126 this.throttler_failure_e = new ThrottlerFailureException; 127 } 128 129 /*************************************************************************** 130 131 Constructor which accepts a custom throttler. (For standard throttling 132 behaviour, based on the number of busy tasks, use the other ctor.) 133 134 Params: 135 throttler = custom throttler to use. 136 137 ***************************************************************************/ 138 139 public this ( ISuspendableThrottler throttler ) 140 { 141 this(); 142 this.task_pool = new ThrottledTaskPool!(TaskT)(throttler); 143 } 144 145 /*************************************************************************** 146 147 Constructor 148 149 NB: configure suspend point so that there is always at least one 150 "extra" spare task in the pool available after the limit is 151 reached. This is necessary because throttling happens in the 152 end of the task, not after it finishes and gets recycled. 153 154 Params: 155 throttler_config = The throttler configuration 156 157 ***************************************************************************/ 158 159 public this ( ThrottlerConfig throttler_config ) 160 { 161 this(); 162 163 auto total = theScheduler.getStats().task_queue_total; 164 165 if (throttler_config.suspend_point == size_t.max) 166 throttler_config.suspend_point = total / 3 * 2; 167 enforce( 168 this.throttler_failure_e, 169 throttler_config.suspend_point < total, 170 format( 171 "Trying to configure StreamProcessor with suspend point ({}) " ~ 172 "larger or equal to task queue size {}", 173 throttler_config.suspend_point, total 174 ) 175 ); 176 177 if (throttler_config.resume_point == size_t.max) 178 throttler_config.resume_point = total / 5; 179 enforce( 180 this.throttler_failure_e, 181 throttler_config.resume_point < total, 182 format( 183 "Trying to configure StreamProcessor with resume point ({}) " ~ 184 "larger or equal to task queue size {}", 185 throttler_config.resume_point, total 186 ) 187 ); 188 189 this.task_pool = new ThrottledTaskPool!(TaskT)(throttler_config.suspend_point, throttler_config.resume_point); 190 } 191 192 /*************************************************************************** 193 194 Method to be called to start processing a record newly received from a 195 stream. 196 197 Params: 198 args = set of arguments to supply to processing task 199 200 Throws: 201 ThrottlerFailureException if it is not possible to process data 202 because task pool limit is reached. 203 204 ***************************************************************************/ 205 206 public void process ( ParametersOf!(TaskT.copyArguments) args ) 207 { 208 if (!this.task_pool.start(args)) 209 { 210 enforce(this.throttler_failure_e, false, 211 "Throttler failure resulted in an attempt to process record " ~ 212 "with task pool full"); 213 } 214 } 215 216 /*************************************************************************** 217 218 Adds an input stream (which must implement ISuspendable) to the set of 219 streams which are to be throttled. If it is already in the set, nothing 220 happens. 221 222 Params: 223 s = suspendable input stream to be throttled 224 225 ***************************************************************************/ 226 227 public void addStream ( ISuspendable s ) 228 { 229 this.task_pool.throttler.addSuspendable(s); 230 } 231 232 /*************************************************************************** 233 234 Removes an input stream (which must implement ISuspendable) from the set 235 of streams which are be throttled. If it is not in the set, nothing 236 happens. 237 238 Params: 239 s = suspendable input stream to stop throttling 240 241 ***************************************************************************/ 242 243 public void removeStream ( ISuspendable s ) 244 { 245 this.task_pool.throttler.removeSuspendable(s); 246 } 247 248 /*************************************************************************** 249 250 Get the task pool. 251 252 Returns: 253 The task pool 254 255 ***************************************************************************/ 256 257 public ThrottledTaskPool!(TaskT) getTaskPool ( ) 258 { 259 return this.task_pool; 260 } 261 } 262 263 /// 264 unittest 265 { 266 void example ( ) 267 { 268 static class MyProcessingTask : Task 269 { 270 import ocean.core.Array; 271 272 // The task requires a single array as context, which is copied from 273 // the outside by `copyArguments()` 274 ubyte[] buffer; 275 276 public void copyArguments ( ubyte[] data ) 277 { 278 this.buffer.copy(data); 279 } 280 281 override public void run ( ) 282 { 283 // Do something with the context and return when the task is 284 // finished. Use `this.resume()` and `this.suspend()` to 285 // control the execution of the bound worker fiber, if required 286 } 287 288 override public void recycle ( ) 289 { 290 this.buffer.length = 0; 291 assumeSafeAppend(this.buffer); 292 } 293 } 294 295 auto throttler_config = ThrottlerConfig(5, 1); 296 auto stream_processor = new StreamProcessor!(MyProcessingTask)(throttler_config); 297 298 // Set of input streams. In this example there are none. In your 299 // application there should be more than none. 300 ISuspendable[] input_streams; foreach ( input_stream; input_streams ) 301 stream_processor.addStream(input_stream); 302 303 // An imaginary record arrives from one of the input streams and is 304 // passed to the process() method. Arguments expected by `process` 305 // method are identical to arguments expected by `copyArguments` method 306 // of your task class 307 ubyte[] record = [ 1, 2, 3 ]; stream_processor.process(record); 308 309 theScheduler.eventLoop(); 310 } 311 } 312 313 314 /******************************************************************************* 315 316 Exception that indicates that used throttler doesn't work as intended 317 318 *******************************************************************************/ 319 320 public class ThrottlerFailureException : Exception 321 { 322 /*************************************************************************** 323 324 Constructor. Does not setup any exception information, relies on 325 `enforce` to do it instead. 326 327 ***************************************************************************/ 328 329 private this ( ) 330 { 331 super("", "", 0); 332 } 333 } 334 335 private void debug_trace ( T... ) ( cstring format, T args ) 336 { 337 debug ( TaskScheduler ) 338 { 339 Stdout.formatln( "[ocean.task.util.StreamProcessor] " ~ format, args ).flush(); 340 } 341 }