1 /******************************************************************************* 2 3 Adds functionality to suspend/resume registered ISuspendable instances 4 based on the number of active tasks in the task pool. 5 6 Copyright: 7 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 8 All rights reserved. 9 10 License: 11 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 12 Alternatively, this file may be distributed under the terms of the Tango 13 3-Clause BSD License (see LICENSE_BSD.txt for details). 14 15 *******************************************************************************/ 16 17 module ocean.task.ThrottledTaskPool; 18 19 import ocean.task.TaskPool; 20 import ocean.task.Task; 21 import ocean.task.IScheduler; 22 import ocean.text.convert.Formatter; 23 24 import ocean.io.model.ISuspendableThrottler; 25 import ocean.util.container.pool.model.IPoolInfo; 26 import ocean.util.container.pool.model.ILimitable; 27 28 import ocean.meta.traits.Aggregates /* : hasMethod */; 29 import ocean.meta.types.Function /* ParametersOf */; 30 31 import ocean.transition; 32 import ocean.core.Verify; 33 34 debug (TaskScheduler) 35 import ocean.io.Stdout; 36 37 /******************************************************************************* 38 39 Special modified version of task pool to enhance `outer` context of task 40 with reference to throttler. 41 42 *******************************************************************************/ 43 44 public class ThrottledTaskPool ( TaskT ) : TaskPool!(TaskT) 45 { 46 import ocean.core.Enforce; 47 48 /*************************************************************************** 49 50 Indicates that throttling hook has already been registered for a next 51 epoll cycle. 52 53 ***************************************************************************/ 54 55 private bool hook_registered; 56 57 /*************************************************************************** 58 59 Throttler used to control tempo of data consumption from streams. By 60 default internally defined PoolThrottler is used which is bound by 61 task pool size limit. 62 63 ***************************************************************************/ 64 65 public ISuspendableThrottler throttler; 66 67 /*************************************************************************** 68 69 Constructor 70 71 If this constructor is used, one must call `useThrottler` method before 72 actually using the pool itself. Typical use case for that is deriving 73 from the default `PoolThrottler` class (defined in this module) which 74 requires reference to the pool as its constructor argument. 75 76 ***************************************************************************/ 77 78 public this ( ) 79 { 80 this.throttler = null; 81 } 82 83 /*************************************************************************** 84 85 Constructor 86 87 Params: 88 throttler = custom throttler to use. 89 90 ***************************************************************************/ 91 92 public this ( ISuspendableThrottler throttler ) 93 { 94 assert(throttler !is null); 95 this.throttler = throttler; 96 } 97 98 /*************************************************************************** 99 100 Constructor 101 102 Params: 103 suspend_point = when number of busy tasks reaches this count, 104 processing will get suspended 105 resume_point = when number of busy tasks reaches this count, 106 processing will get resumed 107 108 ***************************************************************************/ 109 110 public this ( size_t suspend_point, size_t resume_point ) 111 { 112 auto total = theScheduler.getStats().task_queue_total; 113 114 enforce(suspend_point < total, format( 115 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~ 116 "larger or equal to task queue size {}", 117 suspend_point, total)); 118 119 enforce(resume_point < total, format( 120 "Trying to configure ThrottledTaskPool with suspend point ({}) " ~ 121 "larger or equal to task queue size {}", 122 suspend_point, total)); 123 124 auto name = TaskT.classinfo.name; 125 126 if (theScheduler.getSpecializedPoolStats(name).isDefined()) 127 { 128 this.throttler = new SpecializedPoolThrottler(this, 129 suspend_point, resume_point, name); 130 } 131 else 132 { 133 this.throttler = new PoolThrottler(this, suspend_point, resume_point); 134 } 135 } 136 137 /*************************************************************************** 138 139 Sets or replaces current throttler instance 140 141 Params: 142 throttler = throttler to use 143 144 ***************************************************************************/ 145 146 public void useThrottler ( ISuspendableThrottler throttler ) 147 { 148 this.throttler = throttler; 149 } 150 151 /*************************************************************************** 152 153 Rewrite of TaskPool.start changed to use `ProcessingTask` as actual 154 task type instead of plain OwnedTask. Right now it is done by dumb 155 copy-paste, if that pattern will appear more often, TaskPool base 156 class may need a slight refactoring to support it. 157 158 Params: 159 args = same set of args as defined by `copyArguments` method of 160 user-supplied task class, will be forwarded to it. 161 162 Returns: 163 False if the pool is at maximum capacity; 164 165 ***************************************************************************/ 166 167 override public bool start ( ParametersOf!(TaskT.copyArguments) args ) 168 { 169 assert (this.throttler !is null); 170 171 if (this.num_busy() >= this.limit()) 172 return false; 173 174 auto task = cast(TaskT) this.get(new TaskT); 175 assert (task !is null); 176 177 task.copyArguments(args); 178 this.registerThrottlingHook(); 179 this.startImpl(task); 180 181 this.throttler.throttledSuspend(); 182 183 return true; 184 } 185 186 static if (hasMethod!(TaskT, "deserialize", void delegate(void[]))) 187 { 188 /*********************************************************************** 189 190 Starts a task in the same manner as `start` but instead calls 191 a restore method on the derived task with a serialized buffer of the 192 state. This is to support dumping and loading tasks from disk. 193 194 Params: 195 serialized = same set of args as defined by `serialized` method 196 of user-supplied task class, will be forwarded to it. 197 198 Returns: 199 'false' if new task can't be started because pool limit is 200 reached for now, 'true' otherwise 201 202 ***********************************************************************/ 203 204 override public bool restore ( void[] serialized ) 205 { 206 assert (this.throttler !is null); 207 208 if (this.num_busy() >= this.limit()) 209 return false; 210 211 auto task = cast(TaskT) this.get(new TaskT); 212 assert (task !is null); 213 214 task.deserialize(serialized); 215 this.registerThrottlingHook(); 216 this.startImpl(task); 217 218 this.throttler.throttledSuspend(); 219 220 return true; 221 } 222 } 223 224 /*************************************************************************** 225 226 Registers throttling hook if not already present 227 228 ***************************************************************************/ 229 230 void registerThrottlingHook ( ) 231 { 232 if (!this.hook_registered) 233 { 234 this.hook_registered = true; 235 theScheduler.epoll.onCycleEnd(&this.throttlingHook); 236 } 237 } 238 239 /*************************************************************************** 240 241 Called upon owned task termination 242 243 ***************************************************************************/ 244 245 private void throttlingHook ( ) 246 { 247 this.throttler.throttledResume(); 248 249 if (this.num_busy() > 0) 250 theScheduler.epoll.onCycleEnd(&this.throttlingHook); 251 else 252 this.hook_registered = false; 253 } 254 } 255 256 /******************************************************************************* 257 258 Default throttler implementation used if no external one is supplied 259 via constructor. It throttles on amount of busy tasks in internal 260 task pool. 261 262 *******************************************************************************/ 263 264 public class PoolThrottler : ISuspendableThrottler 265 { 266 /*************************************************************************** 267 268 Reference to the throttled pool 269 270 ***************************************************************************/ 271 272 protected IPoolInfo pool; 273 274 /*************************************************************************** 275 276 When amount of total queued tasks is >= this value, the input 277 will be suspended. 278 279 ***************************************************************************/ 280 281 protected size_t suspend_point; 282 283 /*************************************************************************** 284 285 When amount of total queued tasks is <= this value, the input 286 will be resumed. 287 288 ***************************************************************************/ 289 290 protected size_t resume_point; 291 292 /*************************************************************************** 293 294 Constructor 295 296 Params: 297 pool = pool to base throttling decision on 298 suspend_point = when number of busy tasks reaches this count, 299 processing will get suspended 300 resume_point = when number of busy tasks reaches this count, 301 processing will get resumed 302 303 ***************************************************************************/ 304 305 public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point ) 306 { 307 assert(suspend_point > resume_point); 308 assert(suspend_point < pool.limit()); 309 310 this.pool = pool; 311 this.suspend_point = suspend_point; 312 this.resume_point = resume_point; 313 } 314 315 /*************************************************************************** 316 317 Check if the total number of active tasks has reached the desired 318 limit to suspend. 319 320 Checks both amount of unused tasks in this pool and amount of unused 321 tasks in global scheduler queue. 322 323 ***************************************************************************/ 324 325 override protected bool suspend ( ) 326 { 327 auto stats = theScheduler.getStats(); 328 auto total = stats.task_queue_total; 329 auto used = stats.task_queue_busy; 330 331 auto result = used >= this.suspend_point 332 || (this.pool.num_busy() >= this.pool.limit() - 1); 333 334 debug_trace("Throttler.suspend -> {}", result); 335 336 return result; 337 } 338 339 /*************************************************************************** 340 341 Check if the total number of active tasks is below the desired 342 limit to resume. 343 344 Checks both amount of unused tasks in this pool and amount of unused 345 tasks in global scheduler queue. 346 347 ***************************************************************************/ 348 349 override protected bool resume ( ) 350 { 351 auto stats = theScheduler.getStats(); 352 auto total = stats.task_queue_total; 353 auto used = stats.task_queue_busy; 354 355 auto result = used <= this.resume_point 356 && (this.pool.num_busy() < this.pool.limit()); 357 358 debug_trace("Throttler.resume -> {}", result); 359 360 return result; 361 } 362 } 363 364 /******************************************************************************* 365 366 Throttler implementation intended to be used with a specialized task 367 pools. 368 369 *******************************************************************************/ 370 371 public class SpecializedPoolThrottler : ISuspendableThrottler 372 { 373 /*************************************************************************** 374 375 Reference to the throttled pool 376 377 ***************************************************************************/ 378 379 protected IPoolInfo pool; 380 381 /*************************************************************************** 382 383 When amount of total queued tasks is >= this value, the input 384 will be suspended. 385 386 ***************************************************************************/ 387 388 protected size_t suspend_point; 389 390 /*************************************************************************** 391 392 When amount of total queued tasks is <= this value, the input 393 will be resumed. 394 395 ***************************************************************************/ 396 397 protected size_t resume_point; 398 399 /*************************************************************************** 400 401 String representation of the class name of the task handled by the host 402 task pool. 403 404 ***************************************************************************/ 405 406 protected istring task_class_name; 407 408 /*************************************************************************** 409 410 Constructor 411 412 Params: 413 pool = pool to base throttling decision on 414 suspend_point = when number of busy tasks reaches this count, 415 processing will get suspended 416 resume_point = when number of busy tasks reaches this count, 417 processing will get resumed 418 name = class name for the task type handled by the `pool` 419 420 ***************************************************************************/ 421 422 public this ( IPoolInfo pool, size_t suspend_point, size_t resume_point, 423 istring name ) 424 { 425 assert(suspend_point > resume_point); 426 assert(suspend_point < pool.limit()); 427 428 this.pool = pool; 429 this.suspend_point = suspend_point; 430 this.resume_point = resume_point; 431 this.task_class_name = name; 432 } 433 434 /*************************************************************************** 435 436 Check if the total number of active tasks has reached the desired 437 limit to suspend. 438 439 Checks both amount of unused tasks in this pool and amount of unused 440 tasks in global scheduler queue. 441 442 ***************************************************************************/ 443 444 override protected bool suspend ( ) 445 { 446 bool result; 447 448 auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name); 449 stats.visit( 450 ( ) { 451 verify(false, "Specialized task pool throttler initalized " ~ 452 "with missing task class name"); 453 }, 454 (ref IScheduler.SpecializedPoolStats s) { 455 result = s.used_fibers >= this.suspend_point; 456 } 457 ); 458 459 debug_trace("Throttler.suspend -> {}", result); 460 return result; 461 } 462 463 /*************************************************************************** 464 465 Check if the total number of active tasks is below the desired 466 limit to resume. 467 468 Checks both amount of unused tasks in this pool and amount of unused 469 tasks in global scheduler queue. 470 471 ***************************************************************************/ 472 473 override protected bool resume ( ) 474 { 475 bool result; 476 477 auto stats = theScheduler.getSpecializedPoolStats(this.task_class_name); 478 stats.visit( 479 ( ) { 480 verify(false, "Specialized task pool throttler initalized " ~ 481 "with missing task class name"); 482 }, 483 (ref IScheduler.SpecializedPoolStats s) { 484 result = s.used_fibers <= this.resume_point; 485 } 486 ); 487 488 debug_trace("Throttler.resume -> {}", result); 489 return result; 490 } 491 } 492 493 /******************************************************************************* 494 495 Debug trace output when builing with the TaskScheduler debug flag. 496 497 Params: 498 format = Format for variadic argument output. 499 args = Variadic arguments for output. 500 501 *******************************************************************************/ 502 503 private void debug_trace ( T... ) ( cstring format, T args ) 504 { 505 debug ( TaskScheduler ) 506 { 507 Stdout.formatln( "[ocean.task.ThrottledTaskPool] " ~ format, args ) 508 .flush(); 509 } 510 }