1 /*******************************************************************************
2 
3     Adds functionality to suspend/resume registered ISuspendable instances
4     based on the number of active tasks in the task pool.
5 
6     Copyright:
7         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
8         All rights reserved.
9 
10     License:
11         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
12         Alternatively, this file may be distributed under the terms of the Tango
13         3-Clause BSD License (see LICENSE_BSD.txt for details).
14 
15 *******************************************************************************/
16 
17 module ocean.task.ThrottledTaskPool;
18 
19 import ocean.task.TaskPool;
20 import ocean.task.Task;
21 import ocean.task.IScheduler;
22 import ocean.text.convert.Formatter;
23 
24 import ocean.io.model.ISuspendableThrottler;
25 import ocean.util.container.pool.model.IPoolInfo;
26 import ocean.util.container.pool.model.ILimitable;
27 
28 import ocean.meta.traits.Aggregates /* : hasMethod */;
29 import ocean.meta.types.Function /* ParametersOf */;
30 
31 import ocean.meta.types.Qualifiers;
32 import ocean.core.Verify;
33 
34 debug (TaskScheduler)
35     import ocean.io.Stdout;
36 
37 /*******************************************************************************
38 
39     Special modified version of task pool to enhance `outer` context of task
40     with reference to throttler.
41 
42 *******************************************************************************/
43 
44 public class ThrottledTaskPool ( TaskT ) : TaskPool!(TaskT)
45 {
46     import ocean.core.Enforce;
47 
48     /***************************************************************************
49 
50         Indicates that throttling hook has already been registered for a next
51         epoll cycle.
52 
53     ***************************************************************************/
54 
55     private bool hook_registered;
56 
57     /***************************************************************************
58 
59         Throttler used to control tempo of data consumption from streams. By
60         default internally defined PoolThrottler is used which is bound by
61         task pool size limit.
62 
63     ***************************************************************************/
64 
65     public ISuspendableThrottler throttler;
66 
67     /***************************************************************************
68 
69         Constructor
70 
71         If this constructor is used, one must call `useThrottler` method before
72         actually using the pool itself. Typical use case for that is deriving
73         from the default `PoolThrottler` class (defined in this module) which
74         requires reference to the pool as its constructor argument.
75 
76     ***************************************************************************/
77 
78     public this ( )
79     {
80         this.throttler = null;
81     }
82 
83     /***************************************************************************
84 
85         Constructor
86 
87         Params:
88             throttler = custom throttler to use.
89 
90     ***************************************************************************/
91 
92     public this ( ISuspendableThrottler throttler )
93     {
94         assert(throttler !is null);
95         this.throttler = throttler;
96     }
97 
98     /***************************************************************************
99 
100         Constructor
101 
102         Params:
103             suspend_point = when number of busy tasks reaches this count,
104                 processing will get suspended
105             resume_point = when number of busy tasks reaches this count,
106                 processing will get resumed
107 
108     ***************************************************************************/
109 
110     public this ( size_t suspend_point, size_t resume_point )
111     {
112         auto total = theScheduler.getStats().task_queue_total;
113 
114         enforce(suspend_point < total, format(
115             "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
116                 "larger or equal to task queue size {}",
117             suspend_point, total));
118 
119         enforce(resume_point < total, format(
120             "Trying to configure ThrottledTaskPool with suspend point ({}) " ~
121                 "larger or equal to task queue size {}",
122             suspend_point, total));
123 
124         auto name = TaskT.classinfo.name;
125 
126         if (theScheduler.getSpecializedPoolStats(name).isDefined())
127         {
128             this.throttler = new SpecializedPoolThrottler(this,
129                 suspend_point, resume_point, name);
130         }
131         else
132         {
133             this.throttler = new PoolThrottler(this, suspend_point, resume_point);
134         }
135     }
136 
137     /***************************************************************************
138 
139         Sets or replaces current throttler instance
140 
141         Params:
142             throttler = throttler to use
143 
144     ***************************************************************************/
145 
146     public void useThrottler ( ISuspendableThrottler throttler )
147     {
148         this.throttler = throttler;
149     }
150 
151     /***************************************************************************
152 
153         Register throttling hook and check for suspend when starting a task.
154 
155         Params:
156             task = The task being started from the throttled task pool.
157 
158     ***************************************************************************/
159 
160     override protected void startImpl ( Task task )
161     {
162         this.registerThrottlingHook();
163         super.startImpl(task);
164         this.throttler.throttledSuspend();
165     }
166 
167     /***************************************************************************
168 
169         Registers throttling hook if not already present
170 
171     ***************************************************************************/
172 
173     void registerThrottlingHook ( )
174     {
175         if (!this.hook_registered)
176         {
177             this.hook_registered = true;
178             theScheduler.epoll.onCycleEnd(&this.throttlingHook);
179         }
180     }
181 
182     /***************************************************************************
183 
184         Called upon owned task termination
185 
186     ***************************************************************************/
187 
188     private void throttlingHook ( )
189     {
190         this.throttler.throttledResume();
191 
192         if (this.num_busy() > 0)
193             theScheduler.epoll.onCycleEnd(&this.throttlingHook);
194         else
195             this.hook_registered = false;
196     }
197 }
198 
199 /*******************************************************************************
200 
201     Default throttler implementation used if no external one is supplied
202     via constructor. It throttles on amount of busy tasks in internal
203     task pool.
204 
205 *******************************************************************************/
206 
207 public class PoolThrottler : ISuspendableThrottler
208 {
209     /***************************************************************************
210 
211         Reference to the throttled pool
212 
213     ***************************************************************************/
214 
215     protected IPoolInfo pool;
216 
217     /***************************************************************************
218 
219       When amount of total queued tasks is >= this value, the input
220       will be suspended.
221 
222     ***************************************************************************/
223 
224     protected size_t suspend_point;
225 
226     /***************************************************************************
227 
228       When amount of total queued tasks is <= this value, the input
229       will be resumed.
230 
231     ***************************************************************************/
232 
233     protected size_t resume_point;
234 
235     /***************************************************************************
236 
237         Constructor
238 
239         Params:
240             pool = pool to base throttling decision on
241             suspend_point = when number of busy tasks reaches this count,
242                 processing will get suspended
243             resume_point = when number of busy tasks reaches this count,
244                 processing will get resumed
245 
246     ***************************************************************************/
247 
248     public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point )
249     {
250         assert(suspend_point > resume_point);
251         assert(suspend_point < pool.limit());
252 
253         this.pool = pool;
254         this.suspend_point = suspend_point;
255         this.resume_point = resume_point;
256     }
257 
258     /***************************************************************************
259 
260         Check if the total number of active tasks has reached the desired
261         limit to suspend.
262 
263         Checks both amount of unused tasks in this pool and amount of unused
264         tasks in global scheduler queue.
265 
266     ***************************************************************************/
267 
268     override protected bool suspend ( )
269     {
270         auto stats = theScheduler.getStats();
271         auto total = stats.task_queue_total;
272         auto used = stats.task_queue_busy;
273 
274         auto result = used >= this.suspend_point
275             || (this.pool.num_busy() >= this.pool.limit() - 1);
276 
277         debug_trace("Throttler.suspend -> {}", result);
278 
279         return result;
280     }
281 
282     /***************************************************************************
283 
284         Check if the total number of active tasks is below the desired
285         limit to resume.
286 
287         Checks both amount of unused tasks in this pool and amount of unused
288         tasks in global scheduler queue.
289 
290     ***************************************************************************/
291 
292     override protected bool resume ( )
293     {
294         auto stats = theScheduler.getStats();
295         auto total = stats.task_queue_total;
296         auto used = stats.task_queue_busy;
297 
298         auto result = used <= this.resume_point
299             && (this.pool.num_busy() < this.pool.limit());
300 
301         debug_trace("Throttler.resume -> {}", result);
302 
303         return result;
304     }
305 }
306 
307 /*******************************************************************************
308 
309     Throttler implementation intended to be used with a specialized task
310     pools.
311 
312 *******************************************************************************/
313 
314 public class SpecializedPoolThrottler : ISuspendableThrottler
315 {
316     /***************************************************************************
317 
318         Reference to the throttled pool
319 
320     ***************************************************************************/
321 
322     protected IPoolInfo pool;
323 
324     /***************************************************************************
325 
326       When amount of total queued tasks is >= this value, the input
327       will be suspended.
328 
329     ***************************************************************************/
330 
331     protected size_t suspend_point;
332 
333     /***************************************************************************
334 
335       When amount of total queued tasks is <= this value, the input
336       will be resumed.
337 
338     ***************************************************************************/
339 
340     protected size_t resume_point;
341 
342     /***************************************************************************
343 
344         String representation of the class name of the task handled by the host
345         task pool.
346 
347     ***************************************************************************/
348 
349     protected istring task_class_name;
350 
351     /***************************************************************************
352 
353         Constructor
354 
355         Params:
356             pool = pool to base throttling decision on
357             suspend_point = when number of busy tasks reaches this count,
358                 processing will get suspended
359             resume_point = when number of busy tasks reaches this count,
360                 processing will get resumed
361             name = class name for the task type handled by the `pool`
362 
363     ***************************************************************************/
364 
365     public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point,
366         istring name )
367     {
368         assert(suspend_point > resume_point);
369         assert(suspend_point < pool.limit());
370 
371         this.pool = pool;
372         this.suspend_point = suspend_point;
373         this.resume_point = resume_point;
374         this.task_class_name = name;
375     }
376 
377     /***************************************************************************
378 
379         Check if the total number of active tasks has reached the desired
380         limit to suspend.
381 
382         Checks both amount of unused tasks in this pool and amount of unused
383         tasks in global scheduler queue.
384 
385     ***************************************************************************/
386 
387     override protected bool suspend ( )
388     {
389         bool result;
390 
391         auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name);
392         stats.visit(
393             ( ) {
394                 verify(false, "Specialized task pool throttler initalized " ~
395                     "with missing task class name");
396             },
397             (ref IScheduler.SpecializedPoolStats s) {
398                 result = s.used_fibers >= this.suspend_point;
399             }
400         );
401 
402         debug_trace("Throttler.suspend -> {}", result);
403         return result;
404     }
405 
406     /***************************************************************************
407 
408         Check if the total number of active tasks is below the desired
409         limit to resume.
410 
411         Checks both amount of unused tasks in this pool and amount of unused
412         tasks in global scheduler queue.
413 
414     ***************************************************************************/
415 
416     override protected bool resume ( )
417     {
418         bool result;
419 
420         auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name);
421         stats.visit(
422             ( ) {
423                 verify(false, "Specialized task pool throttler initalized " ~
424                     "with missing task class name");
425             },
426             (ref IScheduler.SpecializedPoolStats s) {
427                 result = s.used_fibers <= this.resume_point;
428             }
429         );
430 
431         debug_trace("Throttler.resume -> {}", result);
432         return result;
433     }
434 }
435 
436 /*******************************************************************************
437 
438     Debug trace output when builing with the TaskScheduler debug flag.
439 
440     Params:
441         format = Format for variadic argument output.
442         args = Variadic arguments for output.
443 
444 *******************************************************************************/
445 
446 private void debug_trace ( T... ) ( cstring format, T args )
447 {
448     debug ( TaskScheduler )
449     {
450         Stdout.formatln( "[ocean.task.ThrottledTaskPool] " ~ format, args )
451             .flush();
452     }
453 }