1 /*******************************************************************************
2 
3     Adds functionality to suspend/resume registered ISuspendable instances
4     based on the number of active tasks in the task pool.
5 
6     Copyright:
7         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
8         All rights reserved.
9 
10     License:
11         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
12         Alternatively, this file may be distributed under the terms of the Tango
13         3-Clause BSD License (see LICENSE_BSD.txt for details).
14 
15 *******************************************************************************/
16 
17 module ocean.task.ThrottledTaskPool;
18 
19 import ocean.task.TaskPool;
20 import ocean.task.Task;
21 import ocean.task.IScheduler;
22 import ocean.text.convert.Formatter;
23 
24 import ocean.io.model.ISuspendableThrottler;
25 import ocean.util.container.pool.model.IPoolInfo;
26 import ocean.util.container.pool.model.ILimitable;
27 
28 import ocean.meta.traits.Aggregates /* : hasMethod */;
29 import ocean.meta.types.Function /* ParametersOf */;
30 
31 import ocean.transition;
32 import ocean.core.Verify;
33 
34 debug (TaskScheduler)
35     import ocean.io.Stdout;
36 
37 /*******************************************************************************
38 
39     Special modified version of task pool to enhance `outer` context of task
40     with reference to throttler.
41 
42 *******************************************************************************/
43 
44 public class ThrottledTaskPool ( TaskT ) : TaskPool!(TaskT)
45 {
46     import ocean.core.Enforce;
47 
48     /***************************************************************************
49 
50         Indicates that throttling hook has already been registered for a next
51         epoll cycle.
52 
53     ***************************************************************************/
54 
55     private bool hook_registered;
56 
57     /***************************************************************************
58 
59         Throttler used to control tempo of data consumption from streams. By
60         default internally defined PoolThrottler is used which is bound by
61         task pool size limit.
62 
63     ***************************************************************************/
64 
65     public ISuspendableThrottler throttler;
66 
67     /***************************************************************************
68 
69         Constructor
70 
71         If this constructor is used, one must call `useThrottler` method before
72         actually using the pool itself. Typical use case for that is deriving
73         from the default `PoolThrottler` class (defined in this module) which
74         requires reference to the pool as its constructor argument.
75 
76     ***************************************************************************/
77 
78     public this ( )
79     {
80         this.throttler = null;
81     }
82 
83     /***************************************************************************
84 
85         Constructor
86 
87         Params:
88             throttler = custom throttler to use.
89 
90     ***************************************************************************/
91 
92     public this ( ISuspendableThrottler throttler )
93     {
94         assert(throttler !is null);
95         this.throttler = throttler;
96     }
97 
98     /***************************************************************************
99 
100         Constructor
101 
102         Params:
103             suspend_point = when number of busy tasks reaches this count,
104                 processing will get suspended
105             resume_point = when number of busy tasks reaches this count,
106                 processing will get resumed
107 
108     ***************************************************************************/
109 
110     public this ( size_t suspend_point, size_t resume_point )
111     {
112         auto total = theScheduler.getStats().task_queue_total;
113 
114         enforce(suspend_point < total, format(
115             "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
116                 "larger or equal to task queue size {}",
117             suspend_point, total));
118 
119         enforce(resume_point < total, format(
120             "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
121                 "larger or equal to task queue size {}",
122             suspend_point, total));
123 
124         auto name = TaskT.classinfo.name;
125 
126         if (theScheduler.getSpecializedPoolStats(name).isDefined())
127         {
128             this.throttler = new SpecializedPoolThrottler(this,
129                 suspend_point, resume_point, name);
130         }
131         else
132         {
133             this.throttler = new PoolThrottler(this, suspend_point, resume_point);
134         }
135     }
136 
137     /***************************************************************************
138 
139         Sets or replaces current throttler instance
140 
141         Params:
142             throttler = throttler to use
143 
144     ***************************************************************************/
145 
146     public void useThrottler ( ISuspendableThrottler throttler )
147     {
148         this.throttler = throttler;
149     }
150 
151     /***************************************************************************
152 
153         Rewrite of TaskPool.start changed to use `ProcessingTask` as actual
154         task type instead of plain OwnedTask. Right now it is done by dumb
155         copy-paste, if that pattern will appear more often, TaskPool base
156         class may need a slight refactoring to support it.
157 
158         Params:
159             args = same set of args as defined by `copyArguments` method of
160                 user-supplied task class, will be forwarded to it.
161 
162         Returns:
163             False if the pool is at maximum capacity;
164 
165     ***************************************************************************/
166 
167     override public bool start ( ParametersOf!(TaskT.copyArguments) args )
168     {
169         assert (this.throttler !is null);
170 
171         if (this.num_busy() >= this.limit())
172             return false;
173 
174         auto task = cast(TaskT) this.get(new TaskT);
175         assert (task !is null);
176 
177         task.copyArguments(args);
178         this.registerThrottlingHook();
179         this.startImpl(task);
180 
181         this.throttler.throttledSuspend();
182 
183         return true;
184     }
185 
186     static if (hasMethod!(TaskT, "deserialize", void delegate(void[])))
187     {
188         /***********************************************************************
189 
190             Starts a task in the same manner as `start` but instead calls
191             a restore method on the derived task with a serialized buffer of the
192             state. This is to support dumping and loading tasks from disk.
193 
194             Params:
195                 serialized = same set of args as defined by `serialized` method
196                     of user-supplied task class, will be forwarded to it.
197 
198             Returns:
199                 'false' if new task can't be started because pool limit is
200                 reached for now, 'true' otherwise
201 
202         ***********************************************************************/
203 
204         override public bool restore ( void[] serialized )
205         {
206             assert (this.throttler !is null);
207 
208             if (this.num_busy() >= this.limit())
209                 return false;
210 
211             auto task = cast(TaskT) this.get(new TaskT);
212             assert (task !is null);
213 
214             task.deserialize(serialized);
215             this.registerThrottlingHook();
216             this.startImpl(task);
217 
218             this.throttler.throttledSuspend();
219 
220             return true;
221         }
222     }
223 
224     /***************************************************************************
225 
226         Registers throttling hook if not already present
227 
228     ***************************************************************************/
229 
230     void registerThrottlingHook ( )
231     {
232         if (!this.hook_registered)
233         {
234             this.hook_registered = true;
235             theScheduler.epoll.onCycleEnd(&this.throttlingHook);
236         }
237     }
238 
239     /***************************************************************************
240 
241         Called upon owned task termination
242 
243     ***************************************************************************/
244 
245     private void throttlingHook ( )
246     {
247         this.throttler.throttledResume();
248 
249         if (this.num_busy() > 0)
250             theScheduler.epoll.onCycleEnd(&this.throttlingHook);
251         else
252             this.hook_registered = false;
253     }
254 }
255 
256 /*******************************************************************************
257 
258     Default throttler implementation used if no external one is supplied
259     via constructor. It throttles on amount of busy tasks in internal
260     task pool.
261 
262 *******************************************************************************/
263 
264 public class PoolThrottler : ISuspendableThrottler
265 {
266     /***************************************************************************
267 
268         Reference to the throttled pool
269 
270     ***************************************************************************/
271 
272     protected IPoolInfo pool;
273 
274     /***************************************************************************
275 
276       When amount of total queued tasks is >= this value, the input
277       will be suspended.
278 
279     ***************************************************************************/
280 
281     protected size_t suspend_point;
282 
283     /***************************************************************************
284 
285       When amount of total queued tasks is <= this value, the input
286       will be resumed.
287 
288     ***************************************************************************/
289 
290     protected size_t resume_point;
291 
292     /***************************************************************************
293 
294         Constructor
295 
296         Params:
297             pool = pool to base throttling decision on
298             suspend_point = when number of busy tasks reaches this count,
299                 processing will get suspended
300             resume_point = when number of busy tasks reaches this count,
301                 processing will get resumed
302 
303     ***************************************************************************/
304 
305     public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point )
306     {
307         assert(suspend_point > resume_point);
308         assert(suspend_point < pool.limit());
309 
310         this.pool = pool;
311         this.suspend_point = suspend_point;
312         this.resume_point = resume_point;
313     }
314 
315     /***************************************************************************
316 
317         Check if the total number of active tasks has reached the desired
318         limit to suspend.
319 
320         Checks both amount of unused tasks in this pool and amount of unused
321         tasks in global scheduler queue.
322 
323     ***************************************************************************/
324 
325     override protected bool suspend ( )
326     {
327         auto stats = theScheduler.getStats();
328         auto total = stats.task_queue_total;
329         auto used = stats.task_queue_busy;
330 
331         auto result = used >= this.suspend_point
332             || (this.pool.num_busy() >= this.pool.limit() - 1);
333 
334         debug_trace("Throttler.suspend -> {}", result);
335 
336         return result;
337     }
338 
339     /***************************************************************************
340 
341         Check if the total number of active tasks is below the desired
342         limit to resume.
343 
344         Checks both amount of unused tasks in this pool and amount of unused
345         tasks in global scheduler queue.
346 
347     ***************************************************************************/
348 
349     override protected bool resume ( )
350     {
351         auto stats = theScheduler.getStats();
352         auto total = stats.task_queue_total;
353         auto used = stats.task_queue_busy;
354 
355         auto result = used <= this.resume_point
356             && (this.pool.num_busy() < this.pool.limit());
357 
358         debug_trace("Throttler.resume -> {}", result);
359 
360         return result;
361     }
362 }
363 
364 /*******************************************************************************
365 
366     Throttler implementation intended to be used with a specialized task
367     pools.
368 
369 *******************************************************************************/
370 
371 public class SpecializedPoolThrottler : ISuspendableThrottler
372 {
373     /***************************************************************************
374 
375         Reference to the throttled pool
376 
377     ***************************************************************************/
378 
379     protected IPoolInfo pool;
380 
381     /***************************************************************************
382 
383       When amount of total queued tasks is >= this value, the input
384       will be suspended.
385 
386     ***************************************************************************/
387 
388     protected size_t suspend_point;
389 
390     /***************************************************************************
391 
392       When amount of total queued tasks is <= this value, the input
393       will be resumed.
394 
395     ***************************************************************************/
396 
397     protected size_t resume_point;
398 
399     /***************************************************************************
400 
401         String representation of the class name of the task handled by the host
402         task pool.
403 
404     ***************************************************************************/
405 
406     protected istring task_class_name;
407 
408     /***************************************************************************
409 
410         Constructor
411 
412         Params:
413             pool = pool to base throttling decision on
414             suspend_point = when number of busy tasks reaches this count,
415                 processing will get suspended
416             resume_point = when number of busy tasks reaches this count,
417                 processing will get resumed
418             name = class name for the task type handled by the `pool`
419 
420     ***************************************************************************/
421 
422     public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point,
423         istring name )
424     {
425         assert(suspend_point > resume_point);
426         assert(suspend_point < pool.limit());
427 
428         this.pool = pool;
429         this.suspend_point = suspend_point;
430         this.resume_point = resume_point;
431         this.task_class_name = name;
432     }
433 
434     /***************************************************************************
435 
436         Check if the total number of active tasks has reached the desired
437         limit to suspend.
438 
439         Checks both amount of unused tasks in this pool and amount of unused
440         tasks in global scheduler queue.
441 
442     ***************************************************************************/
443 
444     override protected bool suspend ( )
445     {
446         bool result;
447 
448         auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name);
449         stats.visit(
450             ( ) {
451                 verify(false, "Specialized task pool throttler initalized " ~
452                     "with missing task class name");
453             },
454             (ref IScheduler.SpecializedPoolStats s) {
455                 result = s.used_fibers >= this.suspend_point;
456             }
457         );
458 
459         debug_trace("Throttler.suspend -> {}", result);
460         return result;
461     }
462 
463     /***************************************************************************
464 
465         Check if the total number of active tasks is below the desired
466         limit to resume.
467 
468         Checks both amount of unused tasks in this pool and amount of unused
469         tasks in global scheduler queue.
470 
471     ***************************************************************************/
472 
473     override protected bool resume ( )
474     {
475         bool result;
476 
477         auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name);
478         stats.visit(
479             ( ) {
480                 verify(false, "Specialized task pool throttler initalized " ~
481                     "with missing task class name");
482             },
483             (ref IScheduler.SpecializedPoolStats s) {
484                 result = s.used_fibers <= this.resume_point;
485             }
486         );
487 
488         debug_trace("Throttler.resume -> {}", result);
489         return result;
490     }
491 }
492 
493 /*******************************************************************************
494 
495     Debug trace output when builing with the TaskScheduler debug flag.
496 
497     Params:
498         format = Format for variadic argument output.
499         args = Variadic arguments for output.
500 
501 *******************************************************************************/
502 
503 private void debug_trace ( T... ) ( cstring format, T args )
504 {
505     debug ( TaskScheduler )
506     {
507         Stdout.formatln( "[ocean.task.ThrottledTaskPool] " ~ format, args )
508             .flush();
509     }
510 }