1 /******************************************************************************* 2 3 This module provides the standard task scheduler and a singleton object 4 instance of it. The developer must call the `initScheduler` function to get 5 the singleton into a usable state. 6 7 Usage example: 8 See the documented unittest of the `Scheduler` class 9 10 Copyright: 11 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 12 All rights reserved. 13 14 License: 15 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 16 Alternatively, this file may be distributed under the terms of the Tango 17 3-Clause BSD License (see LICENSE_BSD.txt for details). 18 19 *******************************************************************************/ 20 21 module ocean.task.Scheduler; 22 23 24 import core.thread; 25 26 import ocean.meta.types.Qualifiers; 27 import ocean.core.Enforce; 28 import ocean.core.Verify; 29 import ocean.core.TypeConvert; 30 import ocean.core.Optional; 31 import ocean.io.select.EpollSelectDispatcher; 32 import ocean.util.container.queue.FixedRingQueue; 33 import ocean.meta.traits.Indirections; 34 35 import ocean.task.Task; 36 import ocean.task.IScheduler; 37 import ocean.task.internal.FiberPoolWithQueue; 38 import ocean.task.internal.SpecializedPools; 39 import ocean.task.util.Timer; 40 41 version (unittest) 42 { 43 import ocean.core.Test; 44 import ocean.io.Stdout; 45 import core.stdc.stdlib : abort; 46 } 47 48 debug (TaskScheduler) 49 { 50 import ocean.io.Stdout; 51 } 52 53 public alias IScheduler.Configuration SchedulerConfiguration; 54 public alias IScheduler.Stats SchedulerStats; 55 56 /******************************************************************************* 57 58 Implementation of the IScheduler API 59 60 *******************************************************************************/ 61 62 final class Scheduler : IScheduler 63 { 64 /*************************************************************************** 65 66 Tracks scheduler internal state to safeguard against harmful operations 67 68 ***************************************************************************/ 69 70 private enum State 71 { 72 /// Initial configured state 73 Initial, 74 /// Set when `eventLoop` method starts 75 Running, 76 /// Set when shutdown sequence is initiated and normal mode of operations 77 /// has to be prevented. 78 Shutdown 79 } 80 81 /// ditto 82 private State state = State.Initial; 83 84 /*************************************************************************** 85 86 Worker fiber pool, allows reusing fiber memory to run different tasks 87 88 ***************************************************************************/ 89 90 private FiberPoolWithQueue fiber_pool; 91 92 /*************************************************************************** 93 94 Indicates if this.selectCycleHook is already registered for the next 95 epoll cycle. 96 97 ***************************************************************************/ 98 99 bool select_cycle_hook_registered; 100 101 /*************************************************************************** 102 103 Used internally by scheduler to do all event handling 104 105 ***************************************************************************/ 106 107 private EpollSelectDispatcher _epoll; 108 109 /*************************************************************************** 110 111 Optional mapping from some Task ClasInfo's to dedicated worker 112 fiber pools. 113 114 ***************************************************************************/ 115 116 private SpecializedPools specialized_pools; 117 118 /*************************************************************************** 119 120 Tracks how many tasks are currently pending resume via epoll 121 cycle callbacks. 122 123 ***************************************************************************/ 124 125 private size_t cycle_pending_task_count; 126 127 /*************************************************************************** 128 129 Getter for scheduler epoll instance. Necessary for integration with 130 `ISelectClient` utilities so that new select client can be registered. 131 132 Returns: 133 internally used epoll instance 134 135 ***************************************************************************/ 136 137 public EpollSelectDispatcher epoll ( ) 138 { 139 assert (this._epoll !is null); 140 return this._epoll; 141 } 142 143 /*************************************************************************** 144 145 Set delegate to call each time task is attempted to be queue but size 146 limit is reached. Both the queue and the task will be supplied as 147 arguments. 148 149 ***************************************************************************/ 150 151 public void task_queue_full_cb ( scope TaskQueueFullCB dg ) 152 { 153 this.fiber_pool.task_queue_full_cb = dg; 154 } 155 156 /*************************************************************************** 157 158 Gets current callback for case of full queue 159 160 ***************************************************************************/ 161 162 public TaskQueueFullCB task_queue_full_cb ( ) 163 { 164 return this.fiber_pool.task_queue_full_cb; 165 } 166 167 /*************************************************************************** 168 169 Called each time task terminates with an exception when being run in 170 context of the scheduler or the event loop. 171 172 NB: the task reference will be null when the delegate is called from the 173 EpollSelectDispatcher context (i.e. if task threw after resuming from 174 an event callback) 175 176 ***************************************************************************/ 177 178 public void delegate ( Task, Exception ) exception_handler; 179 180 /*************************************************************************** 181 182 Temporary storage used to pass currently used worker into the 183 fiber function so that it can make copy of it on stack and handle the 184 recycling in the end of fiber function. 185 186 Content is undefined in all situation but starting new task. 187 188 ***************************************************************************/ 189 190 private WorkerFiber last_used_worker; 191 192 /*************************************************************************** 193 194 Temporary storage used to pass currently used worker into the 195 fiber function so that it can make copy of it on stack and handle the 196 recycling in the end of fiber function. 197 198 Content is undefined in all situation but starting new task. 199 200 ***************************************************************************/ 201 202 private Task last_scheduled_task; 203 204 /*************************************************************************** 205 206 Constructor 207 208 Params: 209 config = see `.SchedulerConfiguration` 210 epoll = epoll instance to use for internal event loop. If null, 211 Scheduler will create new instance with default arguments 212 213 ***************************************************************************/ 214 215 private this ( SchedulerConfiguration config, 216 EpollSelectDispatcher epoll = null ) 217 { 218 debug_trace( 219 "Creating new Scheduler with following configuration:\n" ~ 220 "\tworker_fiber_stack_size = {}\n" ~ 221 "\tworker_fiber_limit = {}\n" ~ 222 "\ttask_queue_limit = {}\n", 223 config.worker_fiber_stack_size, 224 config.worker_fiber_limit, 225 config.task_queue_limit 226 ); 227 228 verify( 229 config.task_queue_limit >= config.worker_fiber_limit, 230 "Must configure task queue size at least equal to worker fiber " ~ 231 "count for optimal task scheduler performance." 232 ); 233 234 if (epoll is null) 235 this._epoll = new EpollSelectDispatcher; 236 else 237 this._epoll = epoll; 238 239 this.fiber_pool = new FiberPoolWithQueue( 240 config.task_queue_limit, 241 config.worker_fiber_stack_size, 242 config.worker_fiber_limit 243 ); 244 245 this.specialized_pools = new SpecializedPools(config.specialized_pools); 246 } 247 248 /*************************************************************************** 249 250 Forces early termination of all active tasks and shuts down 251 event loop. 252 253 After this method has been called any attempt to interact with 254 the scheduler will kill the calling task. 255 256 ***************************************************************************/ 257 258 public void shutdown ( ) 259 { 260 // no-op if already shutting down 261 if (this.state == State.Shutdown) 262 return; 263 264 debug_trace( 265 "Shutting down initiated. {} queued tasks will be discarded", 266 this.fiber_pool.queued_tasks.length() 267 ); 268 269 this.state = State.Shutdown; 270 this.fiber_pool.queued_tasks.clear(); 271 this.epoll.shutdown(); 272 273 auto task = Task.getThis(); 274 if (task !is null) 275 task.kill(); 276 } 277 278 /*************************************************************************** 279 280 Provides load stats for the scheduler 281 282 Common usage examples would be load throttling and load stats recording. 283 284 Returns: 285 struct instance which aggregates all stats 286 287 ***************************************************************************/ 288 289 public SchedulerStats getStats ( ) 290 { 291 SchedulerStats stats = { 292 task_queue_busy : this.fiber_pool.queued_tasks.length(), 293 task_queue_total : this.fiber_pool.queued_tasks.maxItems(), 294 suspended_tasks : this.cycle_pending_task_count, 295 worker_fiber_busy : this.fiber_pool.num_busy(), 296 worker_fiber_total : this.fiber_pool.limit() 297 }; 298 return stats; 299 } 300 301 /*************************************************************************** 302 303 Returns: 304 Stats struct for a specialized pool defined by `name` if there is 305 such pool. Empty Optional otherwise. 306 307 ***************************************************************************/ 308 309 public Optional!(SpecializedPoolStats) getSpecializedPoolStats ( cstring name ) 310 { 311 Optional!(SpecializedPoolStats) result; 312 313 this.specialized_pools.findPool(name).visit( 314 ( ) { }, 315 (ref SpecializedPools.SpecializedPool descr) { 316 result = optional(SpecializedPoolStats( 317 descr.pool.num_busy(), 318 descr.pool.length() 319 )); 320 } 321 ); 322 323 return result; 324 } 325 326 /*************************************************************************** 327 328 Method used to execute new task. 329 330 If there are idle worker fibers, the task will be executed immediately 331 and this method will only return when that task first calls `suspend`. 332 333 If all workers are busy, the task will be added to the queue and this 334 method will return immediately. 335 336 Params: 337 task = derivative from `ocean.task.Task` defining some application 338 task to execute 339 340 Throws: 341 TaskQueueFullException if task queue is at full capacity 342 343 ***************************************************************************/ 344 345 public void schedule ( Task task ) 346 { 347 if (this.state == State.Shutdown) 348 { 349 // Simply returning here would be generally sufficient to make sure 350 // no new tasks get added after shutdown. However, it is of some 351 // merit to try to kill everything as soon as possible thus 352 // scheduler kills the caller tasks on any attempt to schedule a new 353 // one. 354 auto caller_task = Task.getThis(); 355 if (caller_task !is null) 356 caller_task.kill(); 357 return; 358 } 359 360 try 361 { 362 if (!this.specialized_pools.run(task)) 363 { 364 this.fiber_pool.runOrQueue(task); 365 this.registerCycleCallback(); 366 } 367 } 368 catch (Exception e) 369 { 370 if (this.exception_handler !is null) 371 this.exception_handler(task, e); 372 else 373 throw e; 374 } 375 } 376 377 /*************************************************************************** 378 379 Method used to queue the task for later execution. 380 381 Will always put the task into the queue, even if there are idle worker 382 fibers. This method is mostly useful when implementing advanced library 383 facilities to ensure that no immediate execution takes place. 384 385 Will result in starting the task in the next event loop cycle at the 386 earliest. 387 388 Params: 389 task = derivative from `ocean.task.Task` defining some application 390 task to execute 391 392 Throws: 393 TaskQueueFullException if task queue is at full capacity AND 394 if no custom `task_queue_full_cb` is set. 395 396 ***************************************************************************/ 397 398 public void queue ( Task task ) 399 { 400 this.fiber_pool.queue(task); 401 this.registerCycleCallback(); 402 } 403 404 /*************************************************************************** 405 406 Schedules the argument and suspends calling task until the argument 407 finishes. 408 409 This method must not be called outside of a task. 410 411 If `task` is already scheduled, it will not be re-scheduled again but 412 awaiting will still occur. 413 414 Params: 415 task = task to schedule and wait for 416 finished_dg = optional delegate called after task finishes but 417 before it gets recycled, can be used to copy some data into 418 caller fiber context 419 420 ***************************************************************************/ 421 422 public void await ( Task task, scope void delegate (Task) finished_dg = null ) 423 { 424 auto context = Task.getThis(); 425 assert (context !is null); 426 assert (context !is task); 427 428 // this methods stack is guaranteed to still be valid by the time 429 // task finishes, so we can reference `task` from delegate 430 task.terminationHook({ 431 if (context.suspended()) 432 theScheduler.delayedResume(context); 433 }); 434 435 if (finished_dg !is null) 436 task.terminationHook({ finished_dg(task); }); 437 438 if (!task.suspended()) 439 this.schedule(task); 440 441 if (!task.finished()) 442 context.suspend(); 443 } 444 445 /// 446 unittest 447 { 448 void example ( ) 449 { 450 static class ExampleTask : Task 451 { 452 mstring data; 453 454 override void run ( ) 455 { 456 // do things that may result in suspending ... 457 data = "abcd".dup; 458 } 459 460 override void recycle ( ) 461 { 462 this.data = null; 463 } 464 } 465 466 mstring data; 467 468 theScheduler.await( 469 new ExampleTask, 470 (Task t) { 471 // copy required data before tasks gets recycled 472 auto task = cast(ExampleTask) t; 473 data = task.data.dup; 474 } 475 ); 476 477 test!("==")(data, "abcd"); 478 } 479 } 480 481 /*************************************************************************** 482 483 Convenience shortcut on top of `await` to await for a task and return 484 some value type as a result. 485 486 If `task` is already scheduled, it will not be re-scheduled again but 487 awaiting will still occur. 488 489 Params: 490 task = any task that defines `result` public field of type with no 491 indirections 492 493 Returns: 494 content of `result` field of the task read right after that task 495 finishes 496 497 ***************************************************************************/ 498 499 public typeof(TaskT.result) awaitResult ( TaskT : Task ) ( TaskT task ) 500 { 501 static assert ( 502 !hasIndirections!(typeof(task.result)), 503 "'awaitResult' can only work with result types with no indirection" 504 ); 505 typeof(task.result) result; 506 this.await(task, (Task t) { result = (cast(TaskT) t).result; }); 507 return result; 508 } 509 510 /// 511 unittest 512 { 513 void example ( ) 514 { 515 static class ExampleTask : Task 516 { 517 int result; 518 519 override void run ( ) 520 { 521 // do things that may result in suspending ... 522 this.result = 42; 523 } 524 525 override void recycle ( ) 526 { 527 this.result = 43; 528 } 529 } 530 531 auto data = theScheduler.awaitResult(new ExampleTask); 532 test!("==")(data, 42); 533 } 534 } 535 536 /*************************************************************************** 537 538 Similar to `await` but also has waiting timeout. Calling task will be 539 resumed either if awaited task finished or timeout is hit, whichever 540 happens first. 541 542 If `task` is already scheduled, it will not be re-scheduled again but 543 awaiting will still occur. 544 545 Params: 546 task = task to await 547 micro_seconds = timeout duration 548 549 Returns: 550 'true' if resumed via timeout, 'false' otherwise 551 552 ***************************************************************************/ 553 554 public bool awaitOrTimeout ( Task task, uint micro_seconds ) 555 { 556 return ocean.task.util.Timer.awaitOrTimeout(task, micro_seconds); 557 } 558 559 /*************************************************************************** 560 561 Starts pseudo-infinite event loop. Event loop will keep running as long 562 as there is at least one event registered. 563 564 Throws: 565 SanityException if there are some active worker fibers 566 left in the pool by the time there are not events left 567 568 ***************************************************************************/ 569 570 public void eventLoop ( ) 571 { 572 assert (this.state != State.Shutdown); 573 this.state = State.Running; 574 575 debug_trace("Starting scheduler event loop"); 576 577 do 578 { 579 this.epoll.eventLoop( 580 null, 581 this.exception_handler is null ? null : 582 &this.exceptionHandlerForEpoll 583 ); 584 585 debug_trace( 586 "end of scheduler internal event loop cycle " ~ 587 "({} worker fibers still suspended, " ~ 588 "{} pending tasks to resume)", 589 this.fiber_pool.num_busy(), 590 this.cycle_pending_task_count 591 ); 592 593 } 594 while ((this.fiber_pool.queued_tasks.length || 595 this.cycle_pending_task_count) && this.state != State.Shutdown); 596 597 // cleans up any stalled worker fibers left after deregistration 598 // of all events. 599 scope iterator = this.fiber_pool..new BusyItemsIterator; 600 foreach (ref fiber; iterator) 601 fiber.active_task.kill(); 602 this.specialized_pools.kill(); 603 604 verify(this.fiber_pool.num_busy() == 0); 605 verify(this.fiber_pool.queued_tasks.length() == 0); 606 } 607 608 /*************************************************************************** 609 610 Orders scheduler to resume given task unconditionally after current 611 epoll cycle. Must be used instead of plain `Task.resume` from 612 termination hooks of other tasks. 613 614 Params: 615 task = task object to resume on next cycle 616 617 ***************************************************************************/ 618 619 public void delayedResume ( Task task ) 620 { 621 static void resumer ( void* task_ ) 622 { 623 auto task = cast(Task) task_; 624 theScheduler.cycle_pending_task_count--; 625 626 try 627 { 628 task.resume(); 629 } 630 catch (Exception e) 631 { 632 if (theScheduler.exception_handler !is null) 633 theScheduler.exception_handler(task, e); 634 else 635 throw e; 636 } 637 } 638 639 auto cb = toContextDg!(resumer)(cast(void*) task); 640 this.cycle_pending_task_count++; 641 this.epoll.onCycleEnd(cb); 642 643 debug_trace("task <{}> will be resumed after current epoll cycle", 644 cast(void*) task); 645 } 646 647 /*************************************************************************** 648 649 Suspends current fiber temporarily, allowing pending events to be 650 processed. Current fiber will be resumed as soon as no immediate events 651 are left. 652 653 Throws: 654 SuspendQueueFullException if suspending is not possible because 655 resuming queue is full 656 657 ***************************************************************************/ 658 659 public void processEvents ( ) 660 { 661 auto task = Task.getThis(); 662 if (this.state == State.Shutdown) 663 task.kill(); 664 665 this.delayedResume(task); 666 task.suspend(); 667 } 668 669 /*************************************************************************** 670 671 Registers cycle callback if it is not already present (to avoid 672 duplicates) 673 674 ***************************************************************************/ 675 676 private void registerCycleCallback ( ) 677 { 678 if (!this.select_cycle_hook_registered) 679 { 680 this.select_cycle_hook_registered = true; 681 this.epoll.onCycleEnd(&this.selectCycleHook); 682 } 683 } 684 685 /*************************************************************************** 686 687 This method gets called each time `this._epoll.select()` cycle 688 finishes. It takes care of suspended task queue 689 ensuring everything will get resumed/run eventually. 690 691 ***************************************************************************/ 692 693 private void selectCycleHook ( ) 694 { 695 // if there are tasks in the queue AND free worker fibers, process some 696 697 while (this.fiber_pool.num_busy() < this.fiber_pool.limit() 698 && this.fiber_pool.queued_tasks.length) 699 { 700 Task task; 701 auto success = this.fiber_pool.queued_tasks.pop(task); 702 assert(success); 703 this.schedule(task); 704 } 705 706 if (this.fiber_pool.queued_tasks.length) 707 this.epoll.onCycleEnd(&this.selectCycleHook); 708 else 709 this.select_cycle_hook_registered = false; 710 } 711 712 /*************************************************************************** 713 714 Wraps configured `exception_handler` into API that doesn't refer to 715 tasks and thus is usable by EpollSelectDispatcher 716 717 Params: 718 e = unhandled exception instance 719 720 Returns: 721 'true` if 'this.exception_handler' is not null, 'false' otherwise 722 723 ***************************************************************************/ 724 725 private bool exceptionHandlerForEpoll ( Exception e ) 726 { 727 if (this.exception_handler !is null) 728 { 729 this.exception_handler(null, e); 730 return true; 731 } 732 else 733 return false; 734 } 735 } 736 737 /// 738 unittest 739 { 740 // mandatory scheduler initialziation, should happen only once in the app 741 SchedulerConfiguration config; 742 initScheduler(config); 743 744 // example custom task type - its `run` method will be executed 745 // within the context of the worker fiber to which it is assigned 746 class TestTask : Task 747 { 748 static size_t started; 749 static size_t recycled; 750 751 override public void run ( ) 752 { 753 ++TestTask.started; 754 755 static immutable very_long_loop = 5; 756 757 for (int i = 0; i < very_long_loop; ++i) 758 { 759 // sometimes a task has to do lengthy computations 760 // without any I/O like requests to remote servers. To avoid 761 // completely blocking all other tasks, it should call the 762 // `processEvents` method to pause briefly: 763 theScheduler.processEvents(); 764 } 765 } 766 767 override public void recycle ( ) 768 { 769 ++TestTask.recycled; 770 } 771 } 772 773 // a task that can be processed by an idle worker fiber from the scheduler's 774 // pool will be run immediately upon scheduling and yield on a call to 775 // `processEvents`, keeping their assigned worker fibers busy: 776 for (int i = 0; i < config.worker_fiber_limit; ++i) 777 theScheduler.schedule(new TestTask); 778 779 // new tasks scheduled while all worker fibers are busy will get pushed into 780 // the scheduler's task queue to be run later on, when a worker fiber 781 // finishes its current job: 782 for (int i = 0; i < config.task_queue_limit; ++i) 783 theScheduler.schedule(new TestTask); 784 785 // when the task queue is full, any new scheduling attempt will result 786 // in a TaskQueueFullException being thrown. However, it is possible to 787 // specify a custom callback to handle the situation instead: 788 theScheduler.task_queue_full_cb = ( Task, FixedRingQueue!(Task) ) { }; 789 theScheduler.schedule(new TestTask); // will do nothing now 790 791 // all worker fibers are still waiting, suspended, now as the scheduler loop 792 // isn't running, so there was nothing to resume them 793 test!("==")(TestTask.started, config.worker_fiber_limit); 794 test!("==")(TestTask.recycled, 0); 795 796 // in a real application, this call will block the main thread for the 797 // duration of the application and most resuming will be done by epoll 798 // events 799 theScheduler.eventLoop(); 800 test!("==")(TestTask.recycled, 801 config.worker_fiber_limit + config.task_queue_limit); 802 } 803 804 unittest 805 { 806 SchedulerConfiguration config; 807 config.worker_fiber_limit = 1; 808 config.task_queue_limit = 1; 809 initScheduler(config); 810 811 int queue_full_hits = 0; 812 theScheduler.exception_handler = (Task t, Exception e) { 813 if (cast(TaskQueueFullException) e) 814 queue_full_hits++; 815 }; 816 817 class DummyTask : Task 818 { 819 override public void run ( ) { theScheduler.processEvents(); } 820 } 821 822 // goes to worker fiber .. 823 theScheduler.schedule(new DummyTask); 824 // goes to queue .. 825 theScheduler.schedule(new DummyTask); 826 // boom! 827 test!("==")(queue_full_hits, 0); 828 theScheduler.schedule(new DummyTask); 829 test!("==")(queue_full_hits, 1); 830 831 // cleanup remaining state before proceeding to other tests 832 theScheduler.eventLoop(); 833 } 834 835 unittest 836 { 837 initScheduler(SchedulerConfiguration.init); 838 839 class DummyTask : Task 840 { 841 override public void run ( ) { theScheduler.processEvents(); } 842 } 843 844 int result; 845 auto task = new DummyTask; 846 847 // use dummy dg to pre-allocate memory in hook array 848 void delegate() dummy = { }; 849 850 task.terminationHook(dummy); 851 task.terminationHook(dummy); 852 task.removeTerminationHook(dummy); 853 854 // test with real delegates, make sure closure is not allocated in D2 855 testNoAlloc({ 856 task.terminationHook({ result = 1; }); 857 task.terminationHook({ result = 2; }); 858 }()); 859 860 theScheduler.schedule(task); 861 theScheduler.eventLoop(); 862 863 test!("==")(result, 1); 864 } 865 866 /******************************************************************************* 867 868 Singleton scheduler instance. 869 870 `initScheduler` must be called before the singleton instance can be used. 871 872 Returns: 873 the global scheduler instance 874 875 *******************************************************************************/ 876 877 public Scheduler theScheduler ( ) 878 { 879 assert(_scheduler !is null, "Scheduler is null, initScheduler must be called before using it"); 880 881 return _scheduler; 882 } 883 884 /******************************************************************************* 885 886 Returns: 887 'true' if scheduler system was initialized, 'false' otherwise 888 889 *******************************************************************************/ 890 891 public bool isSchedulerUsed ( ) 892 { 893 return _scheduler !is null; 894 } 895 896 /******************************************************************************* 897 898 Creates or re-creates the scheduler instance. 899 900 Re-creating of scheduler is only allowed if previous one doesn't have 901 any work remaining and is intended exclusively for ease of writing 902 unittest blocks. 903 904 Params: 905 config = see `.SchedulerConfiguration` 906 epoll = existing epoll instance to use, if set to null, scheduler 907 will create a new one 908 909 *******************************************************************************/ 910 911 public void initScheduler ( SchedulerConfiguration config, 912 EpollSelectDispatcher epoll = null ) 913 { 914 static bool is_scheduler_unused ( ) 915 { 916 return _scheduler.fiber_pool.num_busy() == 0 917 && _scheduler.fiber_pool.queued_tasks.length() == 0; 918 } 919 920 if (_scheduler !is null) 921 assert (is_scheduler_unused()); 922 923 _scheduler = new Scheduler(config, epoll); 924 925 version (unittest) 926 { 927 _scheduler.exception_handler = (Task t, Exception e) { 928 if (t !is null) 929 { 930 Stderr.formatln( 931 "Unhandled exception in task {} ({})", 932 cast(void*) t, t.classinfo.name 933 ); 934 } 935 else 936 { 937 Stderr.formatln("Unhandled exception in epoll/scheduler"); 938 } 939 940 Stderr.formatln("\t{} ({}:{})", e.message(), e.file, e.line) 941 .flush(); 942 943 abort(); 944 }; 945 } 946 947 // set interface-based global scheduler getter in IScheduler module: 948 ocean.task.IScheduler._scheduler = _scheduler; 949 } 950 951 /******************************************************************************* 952 953 Private variable that stores the singleton object 954 955 *******************************************************************************/ 956 957 private Scheduler _scheduler; 958 959 public alias ocean.task.IScheduler.TaskQueueFullException 960 TaskQueueFullException; 961 962 private void debug_trace ( T... ) ( cstring format, T args ) 963 { 964 debug ( TaskScheduler ) 965 { 966 Stdout.formatln( "[ocean.task.Scheduler] " ~ format, args ).flush(); 967 } 968 }