1 /*******************************************************************************
2 
3     Defines the fundamental task abstraction.
4 
5     `Task`s are responsible for:
6     - defining the actual function to execute as a task
7     - defining suspend/resume semantics on top of the core Fiber semantics
8 
9     The `TaskWith` class, derived from `Task`, provides the facility of tasks
10     with customised suspend/resume semantics, as specified by one of more
11     extensions.
12 
13     Copyright:
14         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
15         All rights reserved.
16 
17     License:
18         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
19         Alternatively, this file may be distributed under the terms of the Tango
20         3-Clause BSD License (see LICENSE_BSD.txt for details).
21 
22 *******************************************************************************/
23 
24 module ocean.task.Task;
25 
26 
27 static import core.thread;
28 
29 import ocean.meta.types.Qualifiers;
30 import ocean.core.array.Mutation : moveToEnd, reverse;
31 import ocean.core.Buffer;
32 import ocean.core.Verify;
33 import ocean.task.internal.TaskExtensionMixins;
34 import ocean.task.IScheduler;
35 
36 version (unittest)
37 {
38     import ocean.core.Test;
39 }
40 
41 debug (TaskScheduler)
42 {
43     import ocean.io.Stdout;
44 }
45 
46 /*******************************************************************************
47 
48     Fiber sub-class used by the scheduler to run tasks in. In addition to the
49     functionality of the base fiber, it also:
50         1. stores a reference to the task currently being executed
51         2. can be stored in an object pool
52 
53 *******************************************************************************/
54 
55 public class WorkerFiber : core.thread.Fiber
56 {
57     /***************************************************************************
58 
59         Allows smooth integration of WorkerFiber with object pool
60 
61     ***************************************************************************/
62 
63     public size_t object_pool_index;
64 
65     /***************************************************************************
66 
67         If not null, refers to the Task object currently being executed in this
68         fiber
69 
70     ***************************************************************************/
71 
72     package Task active_task;
73 
74     /***************************************************************************
75 
76         Returns:
77             the task object currently being executed in this fiber or null if
78             there isn't one
79 
80     ***************************************************************************/
81 
82     public Task activeTask ( )
83     {
84         return this.active_task;
85     }
86 
87     /***************************************************************************
88 
89         Terminates itself by killing underlying task
90 
91     ***************************************************************************/
92 
93     public void kill ( )
94     {
95         this.active_task.kill();
96     }
97 
98     /***************************************************************************
99 
100         Constructor
101 
102         Worker fibers are always created with a dummy empty function as an
103         entry point and are intented to be reset this later to the real task to
104         be executed.
105 
106         Params:
107             stack_size = stack size of the new fiber to allocate
108 
109     ***************************************************************************/
110 
111     public this ( size_t stack_size )
112     {
113         super(() {} , stack_size);
114         // Calls itself once to get into the TERM state. The D2 runtime doesn't
115         // allow creating a fiber with no function attached and neither runtime
116         // allows resetting a fiber which is not in the TERM state
117         this.call();
118         assert (this.state() == core.thread.Fiber.State.TERM);
119     }
120 }
121 
122 /*******************************************************************************
123 
124     Exception class that indicates that current task must be terminated. It
125     can be used to forcefully kill the task while still properly cleaning
126     current stack frame.
127 
128 *******************************************************************************/
129 
130 public class TaskKillException : Exception
131 {
132     public this ( istring file = __FILE__, int line = __LINE__ )
133     {
134         super("Task was killed", file, line);
135     }
136 }
137 
138 /*******************************************************************************
139 
140     Minimal usable Task class.
141 
142     Serves as a base for all other task classes. Provides the following
143     functionality:
144       * assigning a task to an arbitrary fiber (`assignTo()`)
145       * wraps the abstract `run()` method in a try/catch statement which
146         rethrows any unhandled exceptions
147 
148 *******************************************************************************/
149 
150 public abstract class Task
151 {
152     /***************************************************************************
153 
154         Thrown from within the task to force early termination. One instance
155         is used by all tasks as this exception must never be caught.
156 
157     ***************************************************************************/
158 
159     private static TaskKillException kill_exception;
160 
161     /***************************************************************************
162 
163         Static constructor. Initializes the exception that indicates the task
164         must be terminated.
165 
166     ***************************************************************************/
167 
168     public static this ( )
169     {
170         Task.kill_exception = new TaskKillException;
171     }
172 
173     /***************************************************************************
174 
175         Bitmask representing internal task state
176 
177     ***************************************************************************/
178 
179     private TaskState state_bitmask;
180 
181     /***************************************************************************
182 
183         List of hooks that needs to be fired after Task has been terminated.
184 
185         Delegates will be called both if task terminates routinely and if
186         it terminates dues to unhandled exception / gets killed.
187 
188     ***************************************************************************/
189 
190     private Buffer!(void delegate()) termination_hooks;
191 
192     /***************************************************************************
193 
194         Reserved index field which ensures that any Task derivative can be
195         used with ObjectPool. That comes at minor cost of one unused size_t
196         per Task instance if not needed which is not a problem.
197 
198     ***************************************************************************/
199 
200     public size_t object_pool_index;
201 
202     /***************************************************************************
203 
204         Fiber this task executes in
205 
206         This field is declared as public because qualified package protection
207         is only available in D2. Please don't use it in applications
208         directly.
209 
210     ***************************************************************************/
211 
212     /* package(ocean.task) */
213     public WorkerFiber fiber;
214 
215     /***************************************************************************
216 
217         Returns:
218             current task reference if there is one running, null otherwise
219 
220     ***************************************************************************/
221 
222     public static Task getThis ( )
223     {
224         auto worker = cast(WorkerFiber) core.thread.Fiber.getThis();
225         if (worker !is null)
226             return worker.activeTask();
227         else
228             return null;
229     }
230 
231     /***************************************************************************
232 
233         Constructor. Used only to insert debug trace message.
234 
235     ***************************************************************************/
236 
237     public this ( )
238     {
239         debug_trace("'{}' <{}> has been created", this.classinfo.name,
240             cast(void*) this);
241     }
242 
243     /***************************************************************************
244 
245         Assigns the task to a fiber. In most cases you need to use
246         `Scheduler.schedule` instead.
247 
248         In simple applications there tends to be 1-to-1 relation between task
249         and fiber it executes in. However in highly concurrent server apps
250         it may be necessary to maintain a separate task queue because of
251         memory consumption reasons (fiber has to allocate a stack for itself
252         which doesn't allow having too many of them). Such functionality
253         is provided by `ocean.task.Scheduler`.
254 
255         Params:
256             fiber = the fiber to assign the task to
257             entry_point = optional custom entry point to replace
258                 `this.entryPoint`
259 
260     ***************************************************************************/
261 
262     public void assignTo ( WorkerFiber fiber,
263         scope void delegate() entry_point = null )
264     {
265         this.state_bitmask = TaskState.None;
266 
267         this.fiber = fiber;
268         this.fiber.active_task = this;
269         if (fiber.state == fiber.state.TERM)
270         {
271             // cast to ignore return value
272             this.fiber.reset(entry_point ? entry_point
273                 : cast(void delegate()) &this.entryPoint);
274         }
275     }
276 
277     /***************************************************************************
278 
279         Suspends execution of this task.
280 
281     ***************************************************************************/
282 
283     public void suspend ( )
284     {
285         assert (this.fiber !is null);
286         verify (this.fiber is core.thread.Fiber.getThis());
287         verify (this.fiber.state == this.fiber.state.EXEC);
288 
289         debug_trace("<{}> is suspending itself", cast(void*) this);
290         this.fiber.yield();
291 
292         if (this.state_bitmask & TaskState.ToKill)
293             throw Task.kill_exception;
294     }
295 
296     /***************************************************************************
297 
298         Resumes execution of this task. If task has not been started yet,
299         starts it.
300 
301     ***************************************************************************/
302 
303     public void resume ( )
304     {
305         assert (this.fiber !is null);
306         assert (this.fiber.state != this.fiber.state.EXEC);
307 
308         auto resumer = Task.getThis();
309         assert (resumer !is this);
310         verify(
311             resumer is null || !resumer.finished(),
312             "Use `theScheduler.resume(task)` to resume a task from termination"
313                 ~ " hook of another task"
314         );
315 
316         debug (TaskScheduler)
317         {
318             if (resumer is null)
319             {
320                 debug_trace("<{}> has been resumed by main thread or event loop",
321                     cast(void*) this);
322             }
323             else
324             {
325                 debug_trace("<{}> has been resumed by <{}>",
326                     cast(void*) this, cast(void*) resumer);
327             }
328         }
329 
330         this.fiber.call();
331     }
332 
333     /***************************************************************************
334 
335         Registers a termination hook that will be executed when the Task is
336         killed.
337 
338         Params:
339             hook = delegate to be called after the task terminates
340 
341     ***************************************************************************/
342 
343     public void terminationHook (scope void delegate() hook)
344     {
345         this.termination_hooks ~= hook;
346     }
347 
348     /***************************************************************************
349 
350         Unregisters a termination hook that would be executed when the Task is
351         killed.
352 
353         Params:
354             hook = delegate that would be called when the task terminates
355 
356     ***************************************************************************/
357 
358     public void removeTerminationHook (scope void delegate() hook)
359     {
360         this.termination_hooks.length = .moveToEnd(this.termination_hooks[], hook);
361     }
362 
363     /***************************************************************************
364 
365         Returns:
366             true if the fiber is suspended
367 
368     ***************************************************************************/
369 
370     final public bool suspended ( )
371     {
372         if (this.fiber is null)
373             return false;
374         return this.fiber.state() == core.thread.Fiber.State.HOLD;
375     }
376 
377     /***************************************************************************
378 
379         Returns:
380             true if this task was started and reached end of `run` method
381 
382     ***************************************************************************/
383 
384     final public bool finished ( )
385     {
386         return (this.state_bitmask & TaskState.Finished) > 0;
387     }
388 
389     /***************************************************************************
390 
391         Forces abnormal termination for the task by throwing special
392         exception instance.
393 
394     ***************************************************************************/
395 
396     public void kill ( istring file = __FILE__, int line = __LINE__ )
397     {
398         Task.kill_exception.file = file;
399         Task.kill_exception.line = line;
400 
401         if (this is Task.getThis())
402             throw Task.kill_exception;
403         else
404         {
405             this.state_bitmask |= TaskState.ToKill;
406             this.resume();
407         }
408     }
409 
410     /***************************************************************************
411 
412         Method that will be run by scheduler when task finishes. Must be
413         overridden by specific task class to reset reusable resources.
414 
415         It is public so that both scheduler can access it and derivatives can
416         override it. No one but scheduler must call this method.
417 
418     ***************************************************************************/
419 
420     public void recycle ( ) { }
421 
422     /***************************************************************************
423 
424         Method that must be overridden in actual application/library task
425         classes to provide entry point.
426 
427     ***************************************************************************/
428 
429     protected abstract void run ( );
430 
431     /***************************************************************************
432 
433         Internal wrapper around `this.run()` which is used as primary fiber
434         entry point and ensures any uncaught exception propagates to the
435         context that has started this task.
436 
437         Returns:
438             'true' if the task terminated with unhandled exception, 'false'
439             otherwise
440 
441     ***************************************************************************/
442 
443     /* package(ocean.task) */
444     public final bool entryPoint ( )
445     {
446         debug_trace("<{}> start of main function", cast(void*) this);
447 
448         scope(exit)
449         {
450             this.state_bitmask |= TaskState.Finished;
451 
452             if (this.termination_hooks.length)
453             {
454                 debug_trace("Calling {} termination_hooks for task <{}>",
455                     this.termination_hooks.length, cast(void*) this);
456 
457                 auto hooks = reverse(this.termination_hooks[]);
458                 this.termination_hooks.reset();
459 
460                 foreach (hook; hooks)
461                 {
462                     hook();
463                     assert(
464                         this.termination_hooks.length == 0,
465                         "Adding new hooks while running existing " ~
466                             "ones is not supported"
467                     );
468                 }
469             }
470 
471             // allow task to recycle any shared resources it may have
472             // (or recycle task instance itself)
473             //
474             // NB: this must be the final part of the method and relies
475             // on assumption that no other fiber will have any chance
476             // to start executing this task until it actually reaches
477             // end of the method
478             debug_trace("Recycling task <{}>", cast(void*) this);
479             this.recycle();
480         }
481 
482         try
483         {
484             assert (this.fiber is core.thread.Fiber.getThis());
485             assert (this       is Task.getThis());
486             this.run();
487         }
488         catch (TaskKillException)
489         {
490             debug_trace("<{}> termination (killed)", cast(void*) this);
491             this.state_bitmask &= ~cast(int) TaskState.ToKill;
492             return false;
493         }
494         catch (Exception e)
495         {
496             debug_trace("<{}> termination (uncaught exception): {} ({}:{})",
497                 cast(void*) this, e.message(), e.file, e.line);
498 
499             // After yielding to rethrow exception this task has to be resumed
500             // by someone to finish cleanup and keep reusing its worker fiber.
501             // If scheduler is available, it can do it automatically, otherwise
502             // it becomes reponsibility of the caller.
503 
504             if (isSchedulerUsed())
505                 theScheduler.delayedResume(this);
506             this.fiber.yieldAndThrow(e);
507 
508             // if task was resumed with `kill` from 'pending cleanup' state
509             // just quit the method as if it was originally terminated by kill
510             if (this.state_bitmask & TaskState.ToKill)
511             {
512                 debug_trace("<{}> termination (killed)", cast(void*) this);
513                 return false;
514             }
515             else
516                 return true;
517         }
518 
519         debug_trace("<{}> termination (end of main function)", cast(void*) this);
520         return false;
521     }
522 }
523 
524 ///
525 unittest
526 {
527     // represents some limited resource used by this task (e.g. memory or a
528     // file handle)
529     class LimitedResourceHandle { }
530     LimitedResourceHandle getResource ( ) { return null; }
531     void releaseResource ( LimitedResourceHandle ) { }
532 
533     // example custom task class
534     class MyTask : Task
535     {
536         LimitedResourceHandle resource;
537 
538         override public void run ( )
539         {
540             this.resource = getResource();
541         }
542 
543         override public void recycle ( )
544         {
545             releaseResource(this.resource);
546         }
547     }
548 
549     // Example of running a task by manually spawning a worker fiber.
550     // More commonly, it is instead done via ocean.task.Scheduler.
551     auto task = new MyTask;
552     // a large stack size is important for debug traces to not crash tests:
553     task.assignTo(new WorkerFiber(10240));
554     task.resume();
555 }
556 
557 unittest
558 {
559     // test killing
560 
561     class MyTask : Task
562     {
563         bool clean_finish = false;
564 
565         override protected void run ( )
566         {
567             this.suspend();
568             this.clean_finish = true;
569         }
570     }
571 
572     auto task = new MyTask;
573     task.assignTo(new WorkerFiber(10240));
574     task.resume();
575 
576     task.kill();
577     test(!task.clean_finish);
578     test!("==")(task.fiber.state, core.thread.Fiber.State.TERM);
579     test(task.finished());
580 }
581 
582 unittest
583 {
584     // test context sanity
585 
586     class TestTask : Task
587     {
588         Task task;
589         WorkerFiber fiber;
590 
591         override public void run ( )
592         {
593             this.fiber = cast(WorkerFiber) WorkerFiber.getThis();
594             this.suspend();
595             this.task = Task.getThis();
596         }
597     }
598 
599     test(Task.getThis() is null); // outside of task
600 
601     auto task = new TestTask;
602     auto worker = new WorkerFiber(10240);
603 
604     task.assignTo(worker);
605     test(worker.activeTask() is task);
606 
607     task.resume();
608     test(task.fiber is worker);
609     test(task.task is null);
610 
611     task.resume();
612     test(task.task is task);
613 }
614 
615 unittest
616 {
617     // test exception forwarding semantics
618 
619     // If the scheduler was initialized before, it will have one pending task from
620     // the thrown exception. Drop it before proceeding.
621 
622     dropScheduler();
623 
624     class ExceptionInternal : Exception
625     {
626         this ( )
627         {
628             super("internal");
629         }
630     }
631 
632     class TestTask : Task
633     {
634         override public void run ( )
635         {
636             throw new ExceptionInternal;
637         }
638     }
639 
640     auto task = new TestTask;
641     auto worker = new WorkerFiber(10240);
642 
643     task.assignTo(worker);
644     testThrown!(ExceptionInternal)(task.resume());
645     test!("==")(task.fiber.state, core.thread.Fiber.State.HOLD);
646     task.resume();
647     test!("==")(task.fiber.state, core.thread.Fiber.State.TERM);
648 }
649 
650 unittest
651 {
652     // test TaskKillException
653 
654     class TestTask : Task
655     {
656         override public void run ( )
657         {
658             throw new TaskKillException;
659         }
660     }
661 
662     auto task = new TestTask;
663     auto worker = new WorkerFiber(10240);
664 
665     task.assignTo(worker);
666     task.resume();
667     test!("==")(task.fiber.state, core.thread.Fiber.State.TERM);
668 }
669 
670 /*******************************************************************************
671 
672     `Task` descendant which supports extensions that alter the semantics of
673     suspending and resuming the task. An arbitrary number of extensions may be
674     specified (see Params).
675 
676     Each extension must be a struct which defines one or more of the following
677     methods:
678     - void onBeforeSuspend ( )
679     - void onBeforeResume ( )
680     - void onResumed ( ) // called right after execution gets back to task
681 
682     There is no `onSuspended` hook because it would be executed in the context
683     of the caller fiber, right after the current task yields. Such a context
684     tends to be neither well-defined nor useful in practice.
685 
686     The relevant extension methods are called before `this.suspend` /
687     `this.resume` in the same order as they are supplied via the template
688     argument list.
689 
690     The relevant extension methods are called after `this.suspend` /
691     `this.resume` in the reverse order that they are supplied via the template
692     argument list.
693 
694     Params:
695         Extensions = variadic template argument list of extensions to use
696 
697 *******************************************************************************/
698 
699 public class TaskWith ( Extensions... ) : Task
700 {
701     mixin (genExtensionAggregate!(Extensions)());
702 
703     /***************************************************************************
704 
705         Constructor
706 
707         Allows extensions to get information about host task if they have
708         matching `reset` method defined.
709 
710     ***************************************************************************/
711 
712     this ( )
713     {
714         foreach (ref extension; this.extensions.tupleof)
715         {
716             static if (is(typeof(extension.reset(this))))
717                 extension.reset(this);
718         }
719     }
720 
721     /***************************************************************************
722 
723         Suspends this task, calls extension methods before and after
724         suspending (if there are any).
725 
726     ***************************************************************************/
727 
728     override public void suspend ( )
729     {
730         foreach (ref extension; this.extensions.tupleof)
731         {
732             static if (is(typeof(extension.onBeforeSuspend())))
733                 extension.onBeforeSuspend();
734         }
735 
736         super.suspend();
737 
738         foreach_reverse (ref extension; this.extensions.tupleof)
739         {
740             static if (is(typeof(extension.onResumed())))
741                 extension.onResumed();
742         }
743     }
744 
745     /***************************************************************************
746 
747         Resumes this task, calls extension methods before resuming
748         (if there are any).
749 
750     ***************************************************************************/
751 
752     override public void resume ( )
753     {
754         foreach (ref extension; this.extensions.tupleof)
755         {
756             static if (is(typeof(extension.onBeforeResume())))
757                 extension.onBeforeResume();
758         }
759 
760         super.resume();
761     }
762 
763     /***************************************************************************
764 
765         Ensures extensions are reset to initial state when task is assigned
766         to new worker fiber.
767 
768     ***************************************************************************/
769 
770     override public void assignTo ( WorkerFiber fiber,
771         scope void delegate() entry_point = null )
772     {
773         super.assignTo(fiber, entry_point);
774 
775         foreach (ref extension; this.extensions.tupleof)
776         {
777             static if (is(typeof(extension.reset(this))))
778                 extension.reset(this);
779             else
780                 extension = extension.init;
781         }
782     }
783 }
784 
785 ///
786 unittest
787 {
788     // see tests/examples in ocean.task.extensions.*
789 }
790 
791 /// Values for task state bitmask
792 private enum TaskState : ubyte
793 {
794     None   = 0b0000_0000,
795 
796     /// If this flag is set, task will try to kill itself as soon at is
797     /// resumed by throwing TaskKillException.
798     ToKill     = 0b0000_0001,
799     /// This flag is set when `run` method of the task finishes
800     Finished   = 0b0000_0010,
801 }
802 
803 private void debug_trace ( T... ) ( cstring format, T args )
804 {
805     debug ( TaskScheduler )
806     {
807         Stdout.formatln( "[ocean.task.Task] " ~ format, args ).flush();
808     }
809 }