1 /*******************************************************************************
2 
3     This module provides the standard task scheduler and a singleton object
4     instance of it. The developer must call the `initScheduler` function to get
5     the singleton into a usable state.
6 
7     Usage example:
8         See the documented unittest of the `Scheduler` class
9 
10     Copyright:
11         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
12         All rights reserved.
13 
14     License:
15         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
16         Alternatively, this file may be distributed under the terms of the Tango
17         3-Clause BSD License (see LICENSE_BSD.txt for details).
18 
19 *******************************************************************************/
20 
21 module ocean.task.Scheduler;
22 
23 
24 import core.thread;
25 
26 import ocean.meta.types.Qualifiers;
27 import ocean.core.Enforce;
28 import ocean.core.Verify;
29 import ocean.core.TypeConvert;
30 import ocean.core.Optional;
31 import ocean.io.select.EpollSelectDispatcher;
32 import ocean.util.container.queue.FixedRingQueue;
33 import ocean.meta.traits.Indirections;
34 
35 import ocean.task.Task;
36 import ocean.task.IScheduler;
37 import ocean.task.internal.FiberPoolWithQueue;
38 import ocean.task.internal.SpecializedPools;
39 import ocean.task.util.Timer;
40 
41 version (unittest)
42 {
43     import ocean.core.Test;
44     import ocean.io.Stdout;
45     import core.stdc.stdlib : abort;
46 }
47 
48 debug (TaskScheduler)
49 {
50     import ocean.io.Stdout;
51 }
52 
53 public alias IScheduler.Configuration SchedulerConfiguration;
54 public alias IScheduler.Stats SchedulerStats;
55 
56 /*******************************************************************************
57 
58     Implementation of the IScheduler API
59 
60 *******************************************************************************/
61 
62 final class Scheduler : IScheduler
63 {
64     /***************************************************************************
65 
66         Tracks scheduler internal state to safeguard against harmful operations
67 
68     ***************************************************************************/
69 
70     private enum State
71     {
72         /// Initial configured state
73         Initial,
74         /// Set when `eventLoop` method starts
75         Running,
76         /// Set when shutdown sequence is initiated and normal mode of operations
77         /// has to be prevented.
78         Shutdown
79     }
80 
81     /// ditto
82     private State state = State.Initial;
83 
84     /***************************************************************************
85 
86         Worker fiber pool, allows reusing fiber memory to run different tasks
87 
88     ***************************************************************************/
89 
90     private FiberPoolWithQueue fiber_pool;
91 
92     /***************************************************************************
93 
94         Indicates if this.selectCycleHook is already registered for the next
95         epoll cycle.
96 
97     ***************************************************************************/
98 
99     bool select_cycle_hook_registered;
100 
101     /***************************************************************************
102 
103         Used internally by scheduler to do all event handling
104 
105     ***************************************************************************/
106 
107     private EpollSelectDispatcher _epoll;
108 
109     /***************************************************************************
110 
111         Optional mapping from some Task ClasInfo's to dedicated worker
112         fiber pools.
113 
114     ***************************************************************************/
115 
116     private SpecializedPools specialized_pools;
117 
118     /***************************************************************************
119 
120         Tracks how many tasks are currently pending resume via epoll
121         cycle callbacks.
122 
123     ***************************************************************************/
124 
125     private size_t cycle_pending_task_count;
126 
127     /***************************************************************************
128 
129         Getter for scheduler epoll instance. Necessary for integration with
130         `ISelectClient` utilities so that new select client can be registered.
131 
132         Returns:
133             internally used epoll instance
134 
135     ***************************************************************************/
136 
137     public EpollSelectDispatcher epoll ( )
138     {
139         assert (this._epoll !is null);
140         return this._epoll;
141     }
142 
143     /***************************************************************************
144 
145         Set delegate to call each time task is attempted to be queue but size
146         limit is reached. Both the queue and the task will be supplied as
147         arguments.
148 
149     ***************************************************************************/
150 
151     public void task_queue_full_cb ( scope TaskQueueFullCB dg )
152     {
153         this.fiber_pool.task_queue_full_cb = dg;
154     }
155 
156     /***************************************************************************
157 
158         Gets current callback for case of full queue
159 
160     ***************************************************************************/
161 
162     public TaskQueueFullCB task_queue_full_cb ( )
163     {
164         return this.fiber_pool.task_queue_full_cb;
165     }
166 
167     /***************************************************************************
168 
169         Called each time task terminates with an exception when being run in
170         context of the scheduler or the event loop.
171 
172         NB: the task reference will be null when the delegate is called from the
173         EpollSelectDispatcher context (i.e. if task threw after resuming from
174         an event callback)
175 
176     ***************************************************************************/
177 
178     public void delegate ( Task, Exception ) exception_handler;
179 
180     /***************************************************************************
181 
182         Temporary storage used to pass currently used worker into the
183         fiber function so that it can make copy of it on stack and handle the
184         recycling in the end of fiber function.
185 
186         Content is undefined in all situation but starting new task.
187 
188     ***************************************************************************/
189 
190     private WorkerFiber last_used_worker;
191 
192     /***************************************************************************
193 
194         Temporary storage used to pass currently used worker into the
195         fiber function so that it can make copy of it on stack and handle the
196         recycling in the end of fiber function.
197 
198         Content is undefined in all situation but starting new task.
199 
200     ***************************************************************************/
201 
202     private Task last_scheduled_task;
203 
204     /***************************************************************************
205 
206         Constructor
207 
208         Params:
209             config = see `.SchedulerConfiguration`
210             epoll  = epoll instance to use for internal event loop. If null,
211                 Scheduler will create new instance with default arguments
212 
213     ***************************************************************************/
214 
215     private this ( SchedulerConfiguration config,
216         EpollSelectDispatcher epoll = null )
217     {
218         debug_trace(
219             "Creating new Scheduler with following configuration:\n" ~
220                 "\tworker_fiber_stack_size = {}\n" ~
221                 "\tworker_fiber_limit = {}\n" ~
222                 "\ttask_queue_limit = {}\n",
223             config.worker_fiber_stack_size,
224             config.worker_fiber_limit,
225             config.task_queue_limit
226         );
227 
228         verify(
229             config.task_queue_limit >= config.worker_fiber_limit,
230             "Must configure task queue size at least equal to worker fiber " ~
231                 "count for optimal task scheduler performance."
232         );
233 
234         if (epoll is null)
235             this._epoll = new EpollSelectDispatcher;
236         else
237             this._epoll = epoll;
238 
239         this.fiber_pool = new FiberPoolWithQueue(
240             config.task_queue_limit,
241             config.worker_fiber_stack_size,
242             config.worker_fiber_limit
243         );
244 
245         this.specialized_pools = new SpecializedPools(config.specialized_pools);
246     }
247 
248     /***************************************************************************
249 
250         Forces early termination of all active tasks and shuts down
251         event loop.
252 
253         After this method has been called any attempt to interact with
254         the scheduler will kill the calling task.
255 
256     ***************************************************************************/
257 
258     public void shutdown ( )
259     {
260         // no-op if already shutting down
261         if (this.state == State.Shutdown)
262             return;
263 
264         debug_trace(
265             "Shutting down initiated. {} queued tasks will be discarded",
266             this.fiber_pool.queued_tasks.length()
267         );
268 
269         this.state = State.Shutdown;
270         this.fiber_pool.queued_tasks.clear();
271         this.epoll.shutdown();
272 
273         auto task = Task.getThis();
274         if (task !is null)
275             task.kill();
276     }
277 
278     /***************************************************************************
279 
280         Provides load stats for the scheduler
281 
282         Common usage examples would be load throttling and load stats recording.
283 
284         Returns:
285             struct instance which aggregates all stats
286 
287     ***************************************************************************/
288 
289     public SchedulerStats getStats ( )
290     {
291         SchedulerStats stats =  {
292             task_queue_busy : this.fiber_pool.queued_tasks.length(),
293             task_queue_total : this.fiber_pool.queued_tasks.maxItems(),
294             suspended_tasks : this.cycle_pending_task_count,
295             worker_fiber_busy : this.fiber_pool.num_busy(),
296             worker_fiber_total : this.fiber_pool.limit()
297         };
298         return stats;
299     }
300 
301     /***************************************************************************
302 
303         Returns:
304             Stats struct for a specialized pool defined by `name` if there is
305             such pool. Empty Optional otherwise.
306 
307     ***************************************************************************/
308 
309     public Optional!(SpecializedPoolStats) getSpecializedPoolStats ( cstring name )
310     {
311         Optional!(SpecializedPoolStats) result;
312 
313         this.specialized_pools.findPool(name).visit(
314             ( ) { },
315             (ref SpecializedPools.SpecializedPool descr) {
316                 result = optional(SpecializedPoolStats(
317                     descr.pool.num_busy(),
318                     descr.pool.length()
319                 ));
320             }
321         );
322 
323         return result;
324     }
325 
326     /***************************************************************************
327 
328         Method used to execute new task.
329 
330         If there are idle worker fibers, the task will be executed immediately
331         and this method will only return when that task first calls `suspend`.
332 
333         If all workers are busy, the task will be added to the queue and this
334         method will return immediately.
335 
336         Params:
337             task = derivative from `ocean.task.Task` defining some application
338                 task to execute
339 
340         Throws:
341             TaskQueueFullException if task queue is at full capacity
342 
343     ***************************************************************************/
344 
345     public void schedule ( Task task )
346     {
347         if (this.state == State.Shutdown)
348         {
349             // Simply returning here would be generally sufficient to make sure
350             // no new tasks get added after shutdown. However, it is of some
351             // merit to try to kill everything as soon as possible thus
352             // scheduler kills the caller tasks on any attempt to schedule a new
353             // one.
354             auto caller_task = Task.getThis();
355             if (caller_task !is null)
356                 caller_task.kill();
357             return;
358         }
359 
360         try
361         {
362             if (!this.specialized_pools.run(task))
363             {
364                 this.fiber_pool.runOrQueue(task);
365                 this.registerCycleCallback();
366             }
367         }
368         catch (Exception e)
369         {
370             if (this.exception_handler !is null)
371                 this.exception_handler(task, e);
372             else
373                 throw e;
374         }
375     }
376 
377     /***************************************************************************
378 
379         Method used to queue the task for later execution.
380 
381         Will always put the task into the queue, even if there are idle worker
382         fibers. This method is mostly useful when implementing advanced library
383         facilities to ensure that no immediate execution takes place.
384 
385         Will result in starting the task in the next event loop cycle at the
386         earliest.
387 
388         Params:
389             task = derivative from `ocean.task.Task` defining some application
390                 task to execute
391 
392         Throws:
393             TaskQueueFullException if task queue is at full capacity AND
394             if no custom `task_queue_full_cb` is set.
395 
396     ***************************************************************************/
397 
398     public void queue ( Task task )
399     {
400         this.fiber_pool.queue(task);
401         this.registerCycleCallback();
402     }
403 
404     /***************************************************************************
405 
406         Schedules the argument and suspends calling task until the argument
407         finishes.
408 
409         This method must not be called outside of a task.
410 
411         If `task` is already scheduled, it will not be re-scheduled again but
412         awaiting will still occur.
413 
414         Params:
415             task = task to schedule and wait for
416             finished_dg = optional delegate called after task finishes but
417                 before it gets recycled, can be used to copy some data into
418                 caller fiber context
419 
420     ***************************************************************************/
421 
422     public void await ( Task task, scope void delegate (Task) finished_dg = null )
423     {
424         auto context = Task.getThis();
425         assert (context !is null);
426         assert (context !is task);
427 
428         // this methods stack is guaranteed to still be valid by the time
429         // task finishes, so we can reference `task` from delegate
430         task.terminationHook({
431             if (context.suspended())
432                 theScheduler.delayedResume(context);
433         });
434 
435         if (finished_dg !is null)
436             task.terminationHook({ finished_dg(task); });
437 
438         if (!task.suspended())
439             this.schedule(task);
440 
441         if (!task.finished())
442             context.suspend();
443     }
444 
445     ///
446     unittest
447     {
448         void example ( )
449         {
450             static class ExampleTask : Task
451             {
452                 mstring data;
453 
454                 override void run ( )
455                 {
456                     // do things that may result in suspending ...
457                     data = "abcd".dup;
458                 }
459 
460                 override void recycle ( )
461                 {
462                     this.data = null;
463                 }
464             }
465 
466             mstring data;
467 
468             theScheduler.await(
469                 new ExampleTask,
470                 (Task t) {
471                     // copy required data before tasks gets recycled
472                     auto task = cast(ExampleTask) t;
473                     data = task.data.dup;
474                 }
475             );
476 
477             test!("==")(data, "abcd");
478         }
479     }
480 
481     /***************************************************************************
482 
483         Convenience shortcut on top of `await` to await for a task and return
484         some value type as a result.
485 
486         If `task` is already scheduled, it will not be re-scheduled again but
487         awaiting will still occur.
488 
489         Params:
490             task = any task that defines `result` public field  of type with no
491                 indirections
492 
493         Returns:
494             content of `result` field of the task read right after that task
495             finishes
496 
497     ***************************************************************************/
498 
499     public typeof(TaskT.result) awaitResult ( TaskT : Task ) ( TaskT task )
500     {
501         static assert (
502             !hasIndirections!(typeof(task.result)),
503             "'awaitResult' can only work with result types with no indirection"
504         );
505         typeof(task.result) result;
506         this.await(task, (Task t) { result = (cast(TaskT) t).result; });
507         return result;
508     }
509 
510     ///
511     unittest
512     {
513         void example ( )
514         {
515             static class ExampleTask : Task
516             {
517                 int result;
518 
519                 override void run ( )
520                 {
521                     // do things that may result in suspending ...
522                     this.result = 42;
523                 }
524 
525                 override void recycle ( )
526                 {
527                     this.result = 43;
528                 }
529             }
530 
531             auto data = theScheduler.awaitResult(new ExampleTask);
532             test!("==")(data, 42);
533         }
534     }
535 
536     /***************************************************************************
537 
538         Similar to `await` but also has waiting timeout. Calling task will be
539         resumed either if awaited task finished or timeout is hit, whichever
540         happens first.
541 
542         If `task` is already scheduled, it will not be re-scheduled again but
543         awaiting will still occur.
544 
545         Params:
546             task = task to await
547             micro_seconds = timeout duration
548 
549         Returns:
550             'true' if resumed via timeout, 'false' otherwise
551 
552     ***************************************************************************/
553 
554     public bool awaitOrTimeout ( Task task, uint micro_seconds )
555     {
556         return ocean.task.util.Timer.awaitOrTimeout(task, micro_seconds);
557     }
558 
559     /***************************************************************************
560 
561         Starts pseudo-infinite event loop. Event loop will keep running as long
562         as there is at least one event registered.
563 
564         Throws:
565             SanityException if there are some active worker fibers
566             left in the pool by the time there are not events left
567 
568     ***************************************************************************/
569 
570     public void eventLoop ( )
571     {
572         assert (this.state != State.Shutdown);
573         this.state = State.Running;
574 
575         debug_trace("Starting scheduler event loop");
576 
577         do
578         {
579             this.epoll.eventLoop(
580                 null,
581                 this.exception_handler is null ? null :
582                     &this.exceptionHandlerForEpoll
583             );
584 
585             debug_trace(
586                 "end of scheduler internal event loop cycle " ~
587                     "({} worker fibers still suspended, " ~
588                     "{} pending tasks to resume)",
589                 this.fiber_pool.num_busy(),
590                 this.cycle_pending_task_count
591             );
592 
593         }
594         while ((this.fiber_pool.queued_tasks.length ||
595             this.cycle_pending_task_count) && this.state != State.Shutdown);
596 
597         // cleans up any stalled worker fibers left after deregistration
598         // of all events.
599         scope iterator = this.fiber_pool..new BusyItemsIterator;
600         foreach (ref fiber; iterator)
601             fiber.active_task.kill();
602         this.specialized_pools.kill();
603 
604         verify(this.fiber_pool.num_busy() == 0);
605         verify(this.fiber_pool.queued_tasks.length() == 0);
606     }
607 
608     /***************************************************************************
609 
610         Orders scheduler to resume given task unconditionally after current
611         epoll cycle. Must be used instead of plain `Task.resume` from
612         termination hooks of other tasks.
613 
614         Params:
615             task = task object to resume on next cycle
616 
617     ***************************************************************************/
618 
619     public void delayedResume ( Task task )
620     {
621         static void resumer ( void* task_ )
622         {
623             auto task = cast(Task) task_;
624             theScheduler.cycle_pending_task_count--;
625 
626             try
627             {
628                 task.resume();
629             }
630             catch (Exception e)
631             {
632                 if (theScheduler.exception_handler !is null)
633                     theScheduler.exception_handler(task, e);
634                 else
635                     throw e;
636             }
637         }
638 
639         auto cb = toContextDg!(resumer)(cast(void*) task);
640         this.cycle_pending_task_count++;
641         this.epoll.onCycleEnd(cb);
642 
643         debug_trace("task <{}> will be resumed after current epoll cycle",
644             cast(void*) task);
645     }
646 
647     /***************************************************************************
648 
649         Suspends current fiber temporarily, allowing pending events to be
650         processed. Current fiber will be resumed as soon as no immediate events
651         are left.
652 
653         Throws:
654             SuspendQueueFullException if suspending is not possible because
655             resuming queue is full
656 
657     ***************************************************************************/
658 
659     public void processEvents ( )
660     {
661         auto task = Task.getThis();
662         if (this.state == State.Shutdown)
663             task.kill();
664 
665         this.delayedResume(task);
666         task.suspend();
667     }
668 
669     /***************************************************************************
670 
671         Registers cycle callback if it is not already present (to avoid
672         duplicates)
673 
674     ***************************************************************************/
675 
676     private void registerCycleCallback ( )
677     {
678         if (!this.select_cycle_hook_registered)
679         {
680             this.select_cycle_hook_registered = true;
681             this.epoll.onCycleEnd(&this.selectCycleHook);
682         }
683     }
684 
685     /***************************************************************************
686 
687         This method gets called each time `this._epoll.select()` cycle
688         finishes. It takes care of suspended task queue
689         ensuring everything will get resumed/run eventually.
690 
691     ***************************************************************************/
692 
693     private void selectCycleHook ( )
694     {
695         // if there are tasks in the queue AND free worker fibers, process some
696 
697         while (this.fiber_pool.num_busy() < this.fiber_pool.limit()
698             && this.fiber_pool.queued_tasks.length)
699         {
700             Task task;
701             auto success = this.fiber_pool.queued_tasks.pop(task);
702             assert(success);
703             this.schedule(task);
704         }
705 
706         if (this.fiber_pool.queued_tasks.length)
707             this.epoll.onCycleEnd(&this.selectCycleHook);
708         else
709             this.select_cycle_hook_registered = false;
710     }
711 
712     /***************************************************************************
713 
714         Wraps configured `exception_handler` into API that doesn't refer to
715         tasks and thus is usable by EpollSelectDispatcher
716 
717         Params:
718             e = unhandled exception instance
719 
720         Returns:
721             'true` if 'this.exception_handler' is not null, 'false' otherwise
722 
723     ***************************************************************************/
724 
725     private bool exceptionHandlerForEpoll ( Exception e )
726     {
727         if (this.exception_handler !is null)
728         {
729             this.exception_handler(null, e);
730             return true;
731         }
732         else
733             return false;
734     }
735 }
736 
737 ///
738 unittest
739 {
740     // mandatory scheduler initialziation, should happen only once in the app
741     SchedulerConfiguration config;
742     initScheduler(config);
743 
744     // example custom task type - its `run` method will be executed
745     // within the context of the worker fiber to which it is assigned
746     class TestTask : Task
747     {
748         static size_t started;
749         static size_t recycled;
750 
751         override public void run ( )
752         {
753             ++TestTask.started;
754 
755             static immutable very_long_loop = 5;
756 
757             for (int i = 0; i < very_long_loop; ++i)
758             {
759                 // sometimes a task has to do lengthy computations
760                 // without any I/O like requests to remote servers. To avoid
761                 // completely blocking all other tasks, it should call the
762                 // `processEvents` method to pause briefly:
763                 theScheduler.processEvents();
764             }
765         }
766 
767         override public void recycle ( )
768         {
769             ++TestTask.recycled;
770         }
771     }
772 
773     // a task that can be processed by an idle worker fiber from the scheduler's
774     // pool will be run immediately upon scheduling and yield on a call to
775     // `processEvents`, keeping their assigned worker fibers busy:
776     for (int i = 0; i < config.worker_fiber_limit; ++i)
777         theScheduler.schedule(new TestTask);
778 
779     // new tasks scheduled while all worker fibers are busy will get pushed into
780     // the scheduler's task queue to be run later on, when a worker fiber
781     // finishes its current job:
782     for (int i = 0; i < config.task_queue_limit; ++i)
783         theScheduler.schedule(new TestTask);
784 
785     // when the task queue is full, any new scheduling attempt will result
786     // in a TaskQueueFullException being thrown. However, it is possible to
787     // specify a custom callback to handle the situation instead:
788     theScheduler.task_queue_full_cb = ( Task, FixedRingQueue!(Task) ) { };
789     theScheduler.schedule(new TestTask); // will do nothing now
790 
791     // all worker fibers are still waiting, suspended, now as the scheduler loop
792     // isn't running, so there was nothing to resume them
793     test!("==")(TestTask.started, config.worker_fiber_limit);
794     test!("==")(TestTask.recycled, 0);
795 
796     // in a real application, this call will block the main thread for the
797     // duration of the application and most resuming will be done by epoll
798     // events
799     theScheduler.eventLoop();
800     test!("==")(TestTask.recycled,
801         config.worker_fiber_limit + config.task_queue_limit);
802 }
803 
804 unittest
805 {
806     SchedulerConfiguration config;
807     config.worker_fiber_limit = 1;
808     config.task_queue_limit = 1;
809     initScheduler(config);
810 
811     int queue_full_hits = 0;
812     theScheduler.exception_handler = (Task t, Exception e) {
813         if (cast(TaskQueueFullException) e)
814             queue_full_hits++;
815     };
816 
817     class DummyTask : Task
818     {
819         override public void run ( ) { theScheduler.processEvents(); }
820     }
821 
822     // goes to worker fiber ..
823     theScheduler.schedule(new DummyTask);
824     // goes to queue ..
825     theScheduler.schedule(new DummyTask);
826     // boom!
827     test!("==")(queue_full_hits, 0);
828     theScheduler.schedule(new DummyTask);
829     test!("==")(queue_full_hits, 1);
830 
831     // cleanup remaining state before proceeding to other tests
832     theScheduler.eventLoop();
833 }
834 
835 unittest
836 {
837     initScheduler(SchedulerConfiguration.init);
838 
839     class DummyTask : Task
840     {
841         override public void run ( ) { theScheduler.processEvents(); }
842     }
843 
844     int result;
845     auto task = new DummyTask;
846 
847     // use dummy dg to pre-allocate memory in hook array
848     void delegate() dummy = { };
849 
850     task.terminationHook(dummy);
851     task.terminationHook(dummy);
852     task.removeTerminationHook(dummy);
853 
854     // test with real delegates, make sure closure is not allocated in D2
855     testNoAlloc({
856         task.terminationHook({ result = 1; });
857         task.terminationHook({ result = 2; });
858     }());
859 
860     theScheduler.schedule(task);
861     theScheduler.eventLoop();
862 
863     test!("==")(result, 1);
864 }
865 
866 /*******************************************************************************
867 
868     Singleton scheduler instance.
869 
870     `initScheduler` must be called before the singleton instance can be used.
871 
872     Returns:
873         the global scheduler instance
874 
875 *******************************************************************************/
876 
877 public Scheduler theScheduler ( )
878 {
879     assert(_scheduler !is null, "Scheduler is null, initScheduler must be called before using it");
880 
881     return _scheduler;
882 }
883 
884 /*******************************************************************************
885 
886     Returns:
887         'true' if scheduler system was initialized, 'false' otherwise
888 
889 *******************************************************************************/
890 
891 public bool isSchedulerUsed ( )
892 {
893     return _scheduler !is null;
894 }
895 
896 /*******************************************************************************
897 
898     Creates or re-creates the scheduler instance.
899 
900     Re-creating of scheduler is only allowed if previous one doesn't have
901     any work remaining and is intended exclusively for ease of writing
902     unittest blocks.
903 
904     Params:
905         config = see `.SchedulerConfiguration`
906         epoll  = existing epoll instance to use, if set to null, scheduler
907             will create a new one
908 
909 *******************************************************************************/
910 
911 public void initScheduler ( SchedulerConfiguration config,
912     EpollSelectDispatcher epoll = null )
913 {
914     static bool is_scheduler_unused ( )
915     {
916         return _scheduler.fiber_pool.num_busy() == 0
917             && _scheduler.fiber_pool.queued_tasks.length() == 0;
918     }
919 
920     if (_scheduler !is null)
921         assert (is_scheduler_unused());
922 
923     _scheduler = new Scheduler(config, epoll);
924 
925     version (unittest)
926     {
927         _scheduler.exception_handler = (Task t, Exception e) {
928             if (t !is null)
929             {
930                 Stderr.formatln(
931                     "Unhandled exception in task {} ({})",
932                     cast(void*) t, t.classinfo.name
933                 );
934             }
935             else
936             {
937                 Stderr.formatln("Unhandled exception in epoll/scheduler");
938             }
939 
940             Stderr.formatln("\t{} ({}:{})", e.message(), e.file, e.line)
941                 .flush();
942 
943             abort();
944         };
945     }
946 
947     // set interface-based global scheduler getter in IScheduler module:
948     ocean.task.IScheduler._scheduler = _scheduler;
949 }
950 
951 /*******************************************************************************
952 
953     Private variable that stores the singleton object
954 
955 *******************************************************************************/
956 
957 private Scheduler _scheduler;
958 
959 public alias ocean.task.IScheduler.TaskQueueFullException
960     TaskQueueFullException;
961 
962 private void debug_trace ( T... ) ( cstring format, T args )
963 {
964     debug ( TaskScheduler )
965     {
966         Stdout.formatln( "[ocean.task.Scheduler] " ~ format, args ).flush();
967     }
968 }