1 /*******************************************************************************
2 3 Adds functionality to suspend/resume registered ISuspendable instances
4 based on the number of active tasks in the task pool.
5 6 Copyright:
7 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
8 All rights reserved.
9 10 License:
11 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
12 Alternatively, this file may be distributed under the terms of the Tango
13 3-Clause BSD License (see LICENSE_BSD.txt for details).
14 15 *******************************************************************************/16 17 moduleocean.task.ThrottledTaskPool;
18 19 importocean.task.TaskPool;
20 importocean.task.Task;
21 importocean.task.IScheduler;
22 importocean.text.convert.Formatter;
23 24 importocean.io.model.ISuspendableThrottler;
25 importocean.util.container.pool.model.IPoolInfo;
26 importocean.util.container.pool.model.ILimitable;
27 28 importocean.meta.traits.Aggregates/* : hasMethod */;
29 importocean.meta.types.Function/* ParametersOf */;
30 31 importocean.transition;
32 importocean.core.Verify;
33 34 debug (TaskScheduler)
35 importocean.io.Stdout;
36 37 /*******************************************************************************
38 39 Special modified version of task pool to enhance `outer` context of task
40 with reference to throttler.
41 42 *******************************************************************************/43 44 publicclassThrottledTaskPool ( TaskT ) : TaskPool!(TaskT)
45 {
46 importocean.core.Enforce;
47 48 /***************************************************************************
49 50 Indicates that throttling hook has already been registered for a next
51 epoll cycle.
52 53 ***************************************************************************/54 55 privateboolhook_registered;
56 57 /***************************************************************************
58 59 Throttler used to control tempo of data consumption from streams. By
60 default internally defined PoolThrottler is used which is bound by
61 task pool size limit.
62 63 ***************************************************************************/64 65 publicISuspendableThrottlerthrottler;
66 67 /***************************************************************************
68 69 Constructor
70 71 If this constructor is used, one must call `useThrottler` method before
72 actually using the pool itself. Typical use case for that is deriving
73 from the default `PoolThrottler` class (defined in this module) which
74 requires reference to the pool as its constructor argument.
75 76 ***************************************************************************/77 78 publicthis ( )
79 {
80 this.throttler = null;
81 }
82 83 /***************************************************************************
84 85 Constructor
86 87 Params:
88 throttler = custom throttler to use.
89 90 ***************************************************************************/91 92 publicthis ( ISuspendableThrottlerthrottler )
93 {
94 assert(throttler !isnull);
95 this.throttler = throttler;
96 }
97 98 /***************************************************************************
99 100 Constructor
101 102 Params:
103 suspend_point = when number of busy tasks reaches this count,
104 processing will get suspended
105 resume_point = when number of busy tasks reaches this count,
106 processing will get resumed
107 108 ***************************************************************************/109 110 publicthis ( size_tsuspend_point, size_tresume_point )
111 {
112 autototal = theScheduler.getStats().task_queue_total;
113 114 enforce(suspend_point < total, format(
115 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
116 "larger or equal to task queue size {}",
117 suspend_point, total));
118 119 enforce(resume_point < total, format(
120 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
121 "larger or equal to task queue size {}",
122 suspend_point, total));
123 124 autoname = TaskT.classinfo.name;
125 126 if (theScheduler.getSpecializedPoolStats(name).isDefined())
127 {
128 this.throttler = newSpecializedPoolThrottler(this,
129 suspend_point, resume_point, name);
130 }
131 else132 {
133 this.throttler = newPoolThrottler(this, suspend_point, resume_point);
134 }
135 }
136 137 /***************************************************************************
138 139 Sets or replaces current throttler instance
140 141 Params:
142 throttler = throttler to use
143 144 ***************************************************************************/145 146 publicvoiduseThrottler ( ISuspendableThrottlerthrottler )
147 {
148 this.throttler = throttler;
149 }
150 151 /***************************************************************************
152 153 Rewrite of TaskPool.start changed to use `ProcessingTask` as actual
154 task type instead of plain OwnedTask. Right now it is done by dumb
155 copy-paste, if that pattern will appear more often, TaskPool base
156 class may need a slight refactoring to support it.
157 158 Params:
159 args = same set of args as defined by `copyArguments` method of
160 user-supplied task class, will be forwarded to it.
161 162 Returns:
163 False if the pool is at maximum capacity;
164 165 ***************************************************************************/166 167 overridepublicboolstart ( ParametersOf!(TaskT.copyArguments) args )
168 {
169 assert (this.throttler !isnull);
170 171 if (this.num_busy() >= this.limit())
172 returnfalse;
173 174 autotask = cast(TaskT) this.get(newTaskT);
175 assert (task !isnull);
176 177 task.copyArguments(args);
178 this.registerThrottlingHook();
179 this.startImpl(task);
180 181 this.throttler.throttledSuspend();
182 183 returntrue;
184 }
185 186 staticif (hasMethod!(TaskT, "deserialize", voiddelegate(void[])))
187 {
188 /***********************************************************************
189 190 Starts a task in the same manner as `start` but instead calls
191 a restore method on the derived task with a serialized buffer of the
192 state. This is to support dumping and loading tasks from disk.
193 194 Params:
195 serialized = same set of args as defined by `serialized` method
196 of user-supplied task class, will be forwarded to it.
197 198 Returns:
199 'false' if new task can't be started because pool limit is
200 reached for now, 'true' otherwise
201 202 ***********************************************************************/203 204 overridepublicboolrestore ( void[] serialized )
205 {
206 assert (this.throttler !isnull);
207 208 if (this.num_busy() >= this.limit())
209 returnfalse;
210 211 autotask = cast(TaskT) this.get(newTaskT);
212 assert (task !isnull);
213 214 task.deserialize(serialized);
215 this.registerThrottlingHook();
216 this.startImpl(task);
217 218 this.throttler.throttledSuspend();
219 220 returntrue;
221 }
222 }
223 224 /***************************************************************************
225 226 Registers throttling hook if not already present
227 228 ***************************************************************************/229 230 voidregisterThrottlingHook ( )
231 {
232 if (!this.hook_registered)
233 {
234 this.hook_registered = true;
235 theScheduler.epoll.onCycleEnd(&this.throttlingHook);
236 }
237 }
238 239 /***************************************************************************
240 241 Called upon owned task termination
242 243 ***************************************************************************/244 245 privatevoidthrottlingHook ( )
246 {
247 this.throttler.throttledResume();
248 249 if (this.num_busy() > 0)
250 theScheduler.epoll.onCycleEnd(&this.throttlingHook);
251 else252 this.hook_registered = false;
253 }
254 }
255 256 /*******************************************************************************
257 258 Default throttler implementation used if no external one is supplied
259 via constructor. It throttles on amount of busy tasks in internal
260 task pool.
261 262 *******************************************************************************/263 264 publicclassPoolThrottler : ISuspendableThrottler265 {
266 /***************************************************************************
267 268 Reference to the throttled pool
269 270 ***************************************************************************/271 272 protectedIPoolInfopool;
273 274 /***************************************************************************
275 276 When amount of total queued tasks is >= this value, the input
277 will be suspended.
278 279 ***************************************************************************/280 281 protectedsize_tsuspend_point;
282 283 /***************************************************************************
284 285 When amount of total queued tasks is <= this value, the input
286 will be resumed.
287 288 ***************************************************************************/289 290 protectedsize_tresume_point;
291 292 /***************************************************************************
293 294 Constructor
295 296 Params:
297 pool = pool to base throttling decision on
298 suspend_point = when number of busy tasks reaches this count,
299 processing will get suspended
300 resume_point = when number of busy tasks reaches this count,
301 processing will get resumed
302 303 ***************************************************************************/304 305 publicthis ( IPoolInfopool, size_tsuspend_point, size_tresume_point )
306 {
307 assert(suspend_point > resume_point);
308 assert(suspend_point < pool.limit());
309 310 this.pool = pool;
311 this.suspend_point = suspend_point;
312 this.resume_point = resume_point;
313 }
314 315 /***************************************************************************
316 317 Check if the total number of active tasks has reached the desired
318 limit to suspend.
319 320 Checks both amount of unused tasks in this pool and amount of unused
321 tasks in global scheduler queue.
322 323 ***************************************************************************/324 325 overrideprotectedboolsuspend ( )
326 {
327 autostats = theScheduler.getStats();
328 autototal = stats.task_queue_total;
329 autoused = stats.task_queue_busy;
330 331 autoresult = used >= this.suspend_point332 || (this.pool.num_busy() >= this.pool.limit() - 1);
333 334 debug_trace("Throttler.suspend -> {}", result);
335 336 returnresult;
337 }
338 339 /***************************************************************************
340 341 Check if the total number of active tasks is below the desired
342 limit to resume.
343 344 Checks both amount of unused tasks in this pool and amount of unused
345 tasks in global scheduler queue.
346 347 ***************************************************************************/348 349 overrideprotectedboolresume ( )
350 {
351 autostats = theScheduler.getStats();
352 autototal = stats.task_queue_total;
353 autoused = stats.task_queue_busy;
354 355 autoresult = used <= this.resume_point356 && (this.pool.num_busy() < this.pool.limit());
357 358 debug_trace("Throttler.resume -> {}", result);
359 360 returnresult;
361 }
362 }
363 364 /*******************************************************************************
365 366 Throttler implementation intended to be used with a specialized task
367 pools.
368 369 *******************************************************************************/370 371 publicclassSpecializedPoolThrottler : ISuspendableThrottler372 {
373 /***************************************************************************
374 375 Reference to the throttled pool
376 377 ***************************************************************************/378 379 protectedIPoolInfopool;
380 381 /***************************************************************************
382 383 When amount of total queued tasks is >= this value, the input
384 will be suspended.
385 386 ***************************************************************************/387 388 protectedsize_tsuspend_point;
389 390 /***************************************************************************
391 392 When amount of total queued tasks is <= this value, the input
393 will be resumed.
394 395 ***************************************************************************/396 397 protectedsize_tresume_point;
398 399 /***************************************************************************
400 401 String representation of the class name of the task handled by the host
402 task pool.
403 404 ***************************************************************************/405 406 protectedistringtask_class_name;
407 408 /***************************************************************************
409 410 Constructor
411 412 Params:
413 pool = pool to base throttling decision on
414 suspend_point = when number of busy tasks reaches this count,
415 processing will get suspended
416 resume_point = when number of busy tasks reaches this count,
417 processing will get resumed
418 name = class name for the task type handled by the `pool`
419 420 ***************************************************************************/421 422 publicthis ( IPoolInfopool, size_tsuspend_point, size_tresume_point,
423 istringname )
424 {
425 assert(suspend_point > resume_point);
426 assert(suspend_point < pool.limit());
427 428 this.pool = pool;
429 this.suspend_point = suspend_point;
430 this.resume_point = resume_point;
431 this.task_class_name = name;
432 }
433 434 /***************************************************************************
435 436 Check if the total number of active tasks has reached the desired
437 limit to suspend.
438 439 Checks both amount of unused tasks in this pool and amount of unused
440 tasks in global scheduler queue.
441 442 ***************************************************************************/443 444 overrideprotectedboolsuspend ( )
445 {
446 boolresult;
447 448 autostats = theScheduler.getSpecializedPoolStats(this.task_class_name);
449 stats.visit(
450 ( ) {
451 verify(false, "Specialized task pool throttler initalized " ~
452 "with missing task class name");
453 },
454 (refIScheduler.SpecializedPoolStatss) {
455 result = s.used_fibers >= this.suspend_point;
456 }
457 );
458 459 debug_trace("Throttler.suspend -> {}", result);
460 returnresult;
461 }
462 463 /***************************************************************************
464 465 Check if the total number of active tasks is below the desired
466 limit to resume.
467 468 Checks both amount of unused tasks in this pool and amount of unused
469 tasks in global scheduler queue.
470 471 ***************************************************************************/472 473 overrideprotectedboolresume ( )
474 {
475 boolresult;
476 477 autostats = theScheduler.getSpecializedPoolStats(this.task_class_name);
478 stats.visit(
479 ( ) {
480 verify(false, "Specialized task pool throttler initalized " ~
481 "with missing task class name");
482 },
483 (refIScheduler.SpecializedPoolStatss) {
484 result = s.used_fibers <= this.resume_point;
485 }
486 );
487 488 debug_trace("Throttler.resume -> {}", result);
489 returnresult;
490 }
491 }
492 493 /*******************************************************************************
494 495 Debug trace output when builing with the TaskScheduler debug flag.
496 497 Params:
498 format = Format for variadic argument output.
499 args = Variadic arguments for output.
500 501 *******************************************************************************/502 503 privatevoiddebug_trace ( T... ) ( cstringformat, Targs )
504 {
505 debug ( TaskScheduler )
506 {
507 Stdout.formatln( "[ocean.task.ThrottledTaskPool] " ~ format, args )
508 .flush();
509 }
510 }