1 /******************************************************************************* 2 3 Adds functionality to suspend/resume registered ISuspendable instances 4 based on the number of active tasks in the task pool. 5 6 Copyright: 7 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 8 All rights reserved. 9 10 License: 11 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 12 Alternatively, this file may be distributed under the terms of the Tango 13 3-Clause BSD License (see LICENSE_BSD.txt for details). 14 15 *******************************************************************************/ 16 17 module ocean.task.ThrottledTaskPool; 18 19 import ocean.task.TaskPool; 20 import ocean.task.Task; 21 import ocean.task.IScheduler; 22 import ocean.text.convert.Formatter; 23 24 import ocean.io.model.ISuspendableThrottler; 25 import ocean.util.container.pool.model.IPoolInfo; 26 import ocean.util.container.pool.model.ILimitable; 27 28 import ocean.meta.traits.Aggregates /* : hasMethod */; 29 import ocean.meta.types.Function /* ParametersOf */; 30 31 import ocean.meta.types.Qualifiers; 32 import ocean.core.Verify; 33 34 debug (TaskScheduler) 35 import ocean.io.Stdout; 36 37 /******************************************************************************* 38 39 Special modified version of task pool to enhance `outer` context of task 40 with reference to throttler. 41 42 *******************************************************************************/ 43 44 public class ThrottledTaskPool ( TaskT ) : TaskPool!(TaskT) 45 { 46 import ocean.core.Enforce; 47 48 /*************************************************************************** 49 50 Indicates that throttling hook has already been registered for a next 51 epoll cycle. 52 53 ***************************************************************************/ 54 55 private bool hook_registered; 56 57 /*************************************************************************** 58 59 Throttler used to control tempo of data consumption from streams. By 60 default internally defined PoolThrottler is used which is bound by 61 task pool size limit. 62 63 ***************************************************************************/ 64 65 public ISuspendableThrottler throttler; 66 67 /*************************************************************************** 68 69 Constructor 70 71 If this constructor is used, one must call `useThrottler` method before 72 actually using the pool itself. Typical use case for that is deriving 73 from the default `PoolThrottler` class (defined in this module) which 74 requires reference to the pool as its constructor argument. 75 76 ***************************************************************************/ 77 78 public this ( ) 79 { 80 this.throttler = null; 81 } 82 83 /*************************************************************************** 84 85 Constructor 86 87 Params: 88 throttler = custom throttler to use. 89 90 ***************************************************************************/ 91 92 public this ( ISuspendableThrottler throttler ) 93 { 94 assert(throttler !is null); 95 this.throttler = throttler; 96 } 97 98 /*************************************************************************** 99 100 Constructor 101 102 Params: 103 suspend_point = when number of busy tasks reaches this count, 104 processing will get suspended 105 resume_point = when number of busy tasks reaches this count, 106 processing will get resumed 107 108 ***************************************************************************/ 109 110 public this ( size_t suspend_point, size_t resume_point ) 111 { 112 auto total = theScheduler.getStats().task_queue_total; 113 114 enforce(suspend_point < total, format( 115 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~ 116 "larger or equal to task queue size {}", 117 suspend_point, total)); 118 119 enforce(resume_point < total, format( 120 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~ 121 "larger or equal to task queue size {}", 122 suspend_point, total)); 123 124 auto name = TaskT.classinfo.name; 125 126 if (theScheduler.getSpecializedPoolStats(name).isDefined()) 127 { 128 this.throttler = new SpecializedPoolThrottler(this, 129 suspend_point, resume_point, name); 130 } 131 else 132 { 133 this.throttler = new PoolThrottler(this, suspend_point, resume_point); 134 } 135 } 136 137 /*************************************************************************** 138 139 Sets or replaces current throttler instance 140 141 Params: 142 throttler = throttler to use 143 144 ***************************************************************************/ 145 146 public void useThrottler ( ISuspendableThrottler throttler ) 147 { 148 this.throttler = throttler; 149 } 150 151 /*************************************************************************** 152 153 Register throttling hook and check for suspend when starting a task. 154 155 Params: 156 task = The task being started from the throttled task pool. 157 158 ***************************************************************************/ 159 160 override protected void startImpl ( Task task ) 161 { 162 this.registerThrottlingHook(); 163 super.startImpl(task); 164 this.throttler.throttledSuspend(); 165 } 166 167 /*************************************************************************** 168 169 Registers throttling hook if not already present 170 171 ***************************************************************************/ 172 173 void registerThrottlingHook ( ) 174 { 175 if (!this.hook_registered) 176 { 177 this.hook_registered = true; 178 theScheduler.epoll.onCycleEnd(&this.throttlingHook); 179 } 180 } 181 182 /*************************************************************************** 183 184 Called upon owned task termination 185 186 ***************************************************************************/ 187 188 private void throttlingHook ( ) 189 { 190 this.throttler.throttledResume(); 191 192 if (this.num_busy() > 0) 193 theScheduler.epoll.onCycleEnd(&this.throttlingHook); 194 else 195 this.hook_registered = false; 196 } 197 } 198 199 /******************************************************************************* 200 201 Default throttler implementation used if no external one is supplied 202 via constructor. It throttles on amount of busy tasks in internal 203 task pool. 204 205 *******************************************************************************/ 206 207 public class PoolThrottler : ISuspendableThrottler 208 { 209 /*************************************************************************** 210 211 Reference to the throttled pool 212 213 ***************************************************************************/ 214 215 protected IPoolInfo pool; 216 217 /*************************************************************************** 218 219 When amount of total queued tasks is >= this value, the input 220 will be suspended. 221 222 ***************************************************************************/ 223 224 protected size_t suspend_point; 225 226 /*************************************************************************** 227 228 When amount of total queued tasks is <= this value, the input 229 will be resumed. 230 231 ***************************************************************************/ 232 233 protected size_t resume_point; 234 235 /*************************************************************************** 236 237 Constructor 238 239 Params: 240 pool = pool to base throttling decision on 241 suspend_point = when number of busy tasks reaches this count, 242 processing will get suspended 243 resume_point = when number of busy tasks reaches this count, 244 processing will get resumed 245 246 ***************************************************************************/ 247 248 public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point ) 249 { 250 assert(suspend_point > resume_point); 251 assert(suspend_point < pool.limit()); 252 253 this.pool = pool; 254 this.suspend_point = suspend_point; 255 this.resume_point = resume_point; 256 } 257 258 /*************************************************************************** 259 260 Check if the total number of active tasks has reached the desired 261 limit to suspend. 262 263 Checks both amount of unused tasks in this pool and amount of unused 264 tasks in global scheduler queue. 265 266 ***************************************************************************/ 267 268 override protected bool suspend ( ) 269 { 270 auto stats = theScheduler.getStats(); 271 auto total = stats.task_queue_total; 272 auto used = stats.task_queue_busy; 273 274 auto result = used >= this.suspend_point 275 || (this.pool.num_busy() >= this.pool.limit() - 1); 276 277 debug_trace("Throttler.suspend -> {}", result); 278 279 return result; 280 } 281 282 /*************************************************************************** 283 284 Check if the total number of active tasks is below the desired 285 limit to resume. 286 287 Checks both amount of unused tasks in this pool and amount of unused 288 tasks in global scheduler queue. 289 290 ***************************************************************************/ 291 292 override protected bool resume ( ) 293 { 294 auto stats = theScheduler.getStats(); 295 auto total = stats.task_queue_total; 296 auto used = stats.task_queue_busy; 297 298 auto result = used <= this.resume_point 299 && (this.pool.num_busy() < this.pool.limit()); 300 301 debug_trace("Throttler.resume -> {}", result); 302 303 return result; 304 } 305 } 306 307 /******************************************************************************* 308 309 Throttler implementation intended to be used with a specialized task 310 pools. 311 312 *******************************************************************************/ 313 314 public class SpecializedPoolThrottler : ISuspendableThrottler 315 { 316 /*************************************************************************** 317 318 Reference to the throttled pool 319 320 ***************************************************************************/ 321 322 protected IPoolInfo pool; 323 324 /*************************************************************************** 325 326 When amount of total queued tasks is >= this value, the input 327 will be suspended. 328 329 ***************************************************************************/ 330 331 protected size_t suspend_point; 332 333 /*************************************************************************** 334 335 When amount of total queued tasks is <= this value, the input 336 will be resumed. 337 338 ***************************************************************************/ 339 340 protected size_t resume_point; 341 342 /*************************************************************************** 343 344 String representation of the class name of the task handled by the host 345 task pool. 346 347 ***************************************************************************/ 348 349 protected istring task_class_name; 350 351 /*************************************************************************** 352 353 Constructor 354 355 Params: 356 pool = pool to base throttling decision on 357 suspend_point = when number of busy tasks reaches this count, 358 processing will get suspended 359 resume_point = when number of busy tasks reaches this count, 360 processing will get resumed 361 name = class name for the task type handled by the `pool` 362 363 ***************************************************************************/ 364 365 public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point, 366 istring name ) 367 { 368 assert(suspend_point > resume_point); 369 assert(suspend_point < pool.limit()); 370 371 this.pool = pool; 372 this.suspend_point = suspend_point; 373 this.resume_point = resume_point; 374 this.task_class_name = name; 375 } 376 377 /*************************************************************************** 378 379 Check if the total number of active tasks has reached the desired 380 limit to suspend. 381 382 Checks both amount of unused tasks in this pool and amount of unused 383 tasks in global scheduler queue. 384 385 ***************************************************************************/ 386 387 override protected bool suspend ( ) 388 { 389 bool result; 390 391 auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name); 392 stats.visit( 393 ( ) { 394 verify(false, "Specialized task pool throttler initalized " ~ 395 "with missing task class name"); 396 }, 397 (ref IScheduler.SpecializedPoolStats s) { 398 result = s.used_fibers >= this.suspend_point; 399 } 400 ); 401 402 debug_trace("Throttler.suspend -> {}", result); 403 return result; 404 } 405 406 /*************************************************************************** 407 408 Check if the total number of active tasks is below the desired 409 limit to resume. 410 411 Checks both amount of unused tasks in this pool and amount of unused 412 tasks in global scheduler queue. 413 414 ***************************************************************************/ 415 416 override protected bool resume ( ) 417 { 418 bool result; 419 420 auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name); 421 stats.visit( 422 ( ) { 423 verify(false, "Specialized task pool throttler initalized " ~ 424 "with missing task class name"); 425 }, 426 (ref IScheduler.SpecializedPoolStats s) { 427 result = s.used_fibers <= this.resume_point; 428 } 429 ); 430 431 debug_trace("Throttler.resume -> {}", result); 432 return result; 433 } 434 } 435 436 /******************************************************************************* 437 438 Debug trace output when builing with the TaskScheduler debug flag. 439 440 Params: 441 format = Format for variadic argument output. 442 args = Variadic arguments for output. 443 444 *******************************************************************************/ 445 446 private void debug_trace ( T... ) ( cstring format, T args ) 447 { 448 debug ( TaskScheduler ) 449 { 450 Stdout.formatln( "[ocean.task.ThrottledTaskPool] " ~ format, args ) 451 .flush(); 452 } 453 }