1 /******************************************************************************* 2 3 This module defines public API for `ocean.task.Scheduler` together with 4 all involved data types. It is intended to be used in other modules instead 5 of direct `ocean.task.Scheduler` import to untangle complex dependency chain 6 the latter brings. 7 8 Copyright: 9 Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved. 10 11 License: 12 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 13 Alternatively, this file may be distributed under the terms of the Tango 14 3-Clause BSD License (see LICENSE_BSD.txt for details). 15 16 *******************************************************************************/ 17 18 module ocean.task.IScheduler; 19 20 import core.thread; 21 22 import ocean.task.Task; 23 import ocean.meta.traits.Indirections : hasIndirections; 24 import ocean.meta.types.Qualifiers; 25 import ocean.io.select.EpollSelectDispatcher; 26 import ocean.core.Optional; 27 28 /******************************************************************************* 29 30 Scheduler handles concurrent execution of all application tasks, 31 assigning them to limited amount of worker fibers. Scheduler functionality 32 can be split into 3 major blocks: 33 34 1. executing arbitrary `ocean.task.Task` using a worker fiber from a pool 35 2. keeping queue of tasks to execute in cases when all worker fibers are 36 busy 37 3. allowing any task to suspend to allow event processing by epoll in a way 38 that it will be automatically resumed by scheduler again when there are 39 no more immediate pending events 40 41 *******************************************************************************/ 42 43 public interface IScheduler 44 { 45 /*************************************************************************** 46 47 Aggregation of various size limits used internally by scheduler. 48 Instance of this struct is used as `initScheduler` argument. 49 50 Default values of scheduler configuration are picked for convenient 51 usage of `SchedulerConfiguration.init` in tests - large stack size but 52 small queue limits. 53 54 ***************************************************************************/ 55 56 public struct Configuration 57 { 58 /// stack size allocated for all worker fibers created by the scheduler 59 size_t worker_fiber_stack_size = 102400; 60 61 /// maximum amount of simultaneous worker fibers in the scheduler pool 62 size_t worker_fiber_limit = 5; 63 64 /// maximum amount of tasks awaiting scheduling in the queue while all 65 /// worker fibers are busy 66 size_t task_queue_limit = 10; 67 68 /// optional array that defines specialized worker fiber pools to be 69 /// used for handling specific task kinds. Scheduled task is checked 70 /// against this array every time thus it is not recommended to configure 71 /// it to more than a few dedicated extra pools 72 PoolDescription[] specialized_pools; 73 74 /// Defines single mapping of `ClassInfo` to worker fiber pool in 75 /// configuration 76 public struct PoolDescription 77 { 78 /// fully qualified name (same as `Task.classinfo.name()`) for task 79 /// type which is to be handled by this pool 80 istring task_name; 81 82 /// worker fiber allocated stack size 83 size_t stack_size; 84 } 85 } 86 87 /*************************************************************************** 88 89 Aggregate of various statistics that indicate overall scheduler load 90 and performance. 91 92 ***************************************************************************/ 93 94 public struct Stats 95 { 96 size_t task_queue_busy; 97 size_t task_queue_total; 98 size_t suspended_tasks; 99 size_t worker_fiber_busy; 100 size_t worker_fiber_total; 101 } 102 103 /*************************************************************************** 104 105 Usage stats of a single specialized task pool 106 107 ***************************************************************************/ 108 109 public struct SpecializedPoolStats 110 { 111 size_t used_fibers; 112 size_t total_fibers; 113 } 114 115 /*************************************************************************** 116 117 Getter for scheduler epoll instance. Necessary for integration with 118 `ISelectClient` utilities so that new select client can be registered. 119 120 Returns: 121 internally used epoll instance 122 123 ***************************************************************************/ 124 125 public EpollSelectDispatcher epoll ( ); 126 127 /*************************************************************************** 128 129 Forces early termination of all active tasks and shuts down 130 event loop. 131 132 After this method has been called any attempt to interact with 133 the scheduler will kill the calling task. 134 135 ***************************************************************************/ 136 137 public void shutdown ( ); 138 139 /*************************************************************************** 140 141 Provides load stats for the scheduler 142 143 Common usage examples would be load throttling and load stats recording. 144 145 Returns: 146 struct instance which aggregates all stats 147 148 ***************************************************************************/ 149 150 public Stats getStats ( ); 151 152 /*************************************************************************** 153 154 Returns: 155 Stats struct for a specialized pool defined by `name` if there is 156 such pool. Empty Optional otherwise. 157 158 ***************************************************************************/ 159 160 public Optional!(SpecializedPoolStats) getSpecializedPoolStats ( cstring name ); 161 162 /*************************************************************************** 163 164 Method used to execute new task. 165 166 If there are idle worker fibers, the task will be executed immediately 167 and this method will only return when that task first calls `suspend`. 168 169 If all workers are busy, the task will be added to the queue and this 170 method will return immediately. 171 172 Params: 173 task = derivative from `ocean.task.Task` defining some application 174 task to execute 175 176 Throws: 177 TaskQueueFullException if task queue is at full capacity 178 179 ***************************************************************************/ 180 181 public void schedule ( Task task ); 182 183 /*************************************************************************** 184 185 Method used to queue the task for later execution. 186 187 Will always put the task into the queue, even if there are idle worker 188 fibers. This method is mostly useful when implementing advanced library 189 facilities to ensure that no immediate execution takes place. 190 191 Will result in starting the task in the next event loop cycle at the 192 earliest. 193 194 Params: 195 task = derivative from `ocean.task.Task` defining some application 196 task to execute 197 198 Throws: 199 TaskQueueFullException if task queue is at full capacity AND 200 if no custom `task_queue_full_cb` is set. 201 202 ***************************************************************************/ 203 204 public void queue ( Task task ); 205 206 /*************************************************************************** 207 208 Schedules the argument and suspends calling task until the argument 209 finishes. 210 211 This method must not be called outside of a task. 212 213 Because of how termination hooks are implemented, by the time `await` 214 returns, the task object is not yet completely recycled - it will only 215 happen during next context switch. Caller of `await` must either ensure 216 that the task object lives long enough for that or call 217 `theScheduler.processEvents` right after `await` to ensure immediate 218 recycle (at the performance cost of an extra context switch). 219 220 Params: 221 task = task to schedule and wait for 222 finished_dg = optional delegate called after task finishes but 223 before it gets recycled, can be used to copy some data into 224 caller fiber context 225 226 ***************************************************************************/ 227 228 public void await ( Task task, scope void delegate (Task) finished_dg = null ); 229 230 /*************************************************************************** 231 232 Convenience shortcut on top of `await` to await for a task and return 233 some value type as a result. 234 235 Params: 236 task = any task that defines `result` public field of type with no 237 indirections 238 239 Returns: 240 content of `result` field of the task read right after that task 241 finishes 242 243 ***************************************************************************/ 244 245 public final typeof(TaskT.result) awaitResult ( TaskT : Task ) ( TaskT task ) 246 { 247 static assert ( 248 !hasIndirections!(typeof(task.result)), 249 "'awaitResult' can only work with result types with no indirection" 250 ); 251 typeof(task.result) result; 252 this.await(task, (Task t) { result = (cast(TaskT) t).result; }); 253 return result; 254 } 255 256 /*************************************************************************** 257 258 Similar to `await` but also has waiting timeout. Calling task will be 259 resumed either if awaited task finished or timeout is hit, whichever 260 happens first. 261 262 Params: 263 task = task to await 264 micro_seconds = timeout duration 265 266 Returns: 267 'true' if resumed via timeout, 'false' otherwise 268 269 ***************************************************************************/ 270 271 public bool awaitOrTimeout ( Task task, uint micro_seconds ); 272 273 /*************************************************************************** 274 275 Orders scheduler to resume given task unconditionally after current 276 epoll cycle. Must be used instead of plain `Task.resume` from 277 termination hooks of other tasks. 278 279 Params: 280 task = task object to resume on next cycle 281 282 Throws: 283 SuspendQueueFullException if resuming queue is full 284 285 ***************************************************************************/ 286 287 public void delayedResume ( Task task ); 288 289 /*************************************************************************** 290 291 Suspends current fiber temporarily, allowing pending events to be 292 processed. Current fiber will be resumed as soon as no immediate events 293 are left. 294 295 Throws: 296 SuspendQueueFullException if suspending is not possible because 297 resuming queue is full 298 299 ***************************************************************************/ 300 301 public void processEvents ( ); 302 303 /*************************************************************************** 304 305 Starts pseudo-infinite event loop. Event loop will keep running as long 306 as there is at least one event registered. 307 308 Throws: 309 SanityException if there are some active worker fibers 310 left in the pool by the time there are not events left 311 312 ***************************************************************************/ 313 314 public void eventLoop ( ); 315 } 316 317 /******************************************************************************* 318 319 Singleton scheduler instance, same as `ocean.task.Scheduler.theScheduler` 320 but returns that object as `IScheduler` interface. 321 322 Returns: 323 the global scheduler instance 324 325 *******************************************************************************/ 326 327 public IScheduler theScheduler ( ) 328 { 329 assert(_scheduler !is null, "Scheduler is null, initScheduler must be called before using it"); 330 331 return _scheduler; 332 } 333 334 /******************************************************************************* 335 336 Returns: 337 'true' if scheduler system was initialized, 'false' otherwise 338 339 *******************************************************************************/ 340 341 public bool isSchedulerUsed ( ) 342 { 343 return _scheduler !is null; 344 } 345 346 version (unittest) 347 { 348 /*************************************************************************** 349 350 Occasionally useful in tests to drop reference to already initialized 351 scheduler and test some code as if scheduler is not present. 352 353 ***************************************************************************/ 354 355 public void dropScheduler ( ) 356 { 357 _scheduler = null; 358 } 359 } 360 361 /******************************************************************************* 362 363 Initialized externally from `ocean.task.Scheduler` to reference the same 364 singleton object. 365 366 *******************************************************************************/ 367 368 package IScheduler _scheduler; 369 370 /****************************************************************************** 371 372 Exception thrown when scheduled task queue overflows 373 374 ******************************************************************************/ 375 376 public class TaskQueueFullException : Exception 377 { 378 this ( ) 379 { 380 super("Attempt to schedule a task when all worker fibers are busy " 381 ~ " and delayed execution task queue is full"); 382 } 383 }