1 /*******************************************************************************
2 
3     Extends reusable fiber pool with task queue. Used in task scheduler
4     as "default" way to schedule tasks.
5 
6     Copyright:
7         Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module ocean.task.internal.FiberPoolWithQueue;
17 
18 import core.thread : Fiber;
19 
20 import ocean.meta.types.Qualifiers;
21 import ocean.core.Enforce;
22 import ocean.task.IScheduler;
23 import ocean.task.internal.FiberPool;
24 import ocean.task.Task;
25 import ocean.util.container.queue.FixedRingQueue;
26 
27 debug (TaskScheduler)
28     import ocean.io.Stdout;
29 
30 /// Ditto
31 public class FiberPoolWithQueue : FiberPool
32 {
33     /// Queue of tasks awaiting idle worker fiber to be executed in
34     /* package(ocean.task) */
35     public FixedRingQueue!(Task) queued_tasks;
36 
37     /// Thrown when all fibers are busy, task queue is full and no custom
38     /// delegate to handle queue overflow is supplied.
39     /* package(ocean.task) */
40     public TaskQueueFullException queue_full_e;
41 
42     /***************************************************************************
43 
44         Called each time task is attempted to be queue but size limit is
45         reached. Both the queue and the task will be supplied as arguments.
46 
47     ***************************************************************************/
48 
49     public TaskQueueFullCB task_queue_full_cb;
50 
51     /**************************************************************************
52 
53         Constructor
54 
55         Params:
56             queue_limit = max size of task queue, recommened to be at least as
57                 large as pool limit
58             stack_size = fiber stack size to use in this pool
59             limit = limit to pool size. If set to 0 (default), there is no
60                 app limit and pool growth will be limited only by OS
61                 resources
62 
63     **************************************************************************/
64 
65     public this ( size_t queue_limit, size_t stack_size, size_t limit )
66     {
67         super(stack_size, limit);
68         this.queued_tasks = new FixedRingQueue!(Task)(queue_limit);
69         this.queue_full_e = new TaskQueueFullException;
70     }
71 
72     /***************************************************************************
73 
74         Method used to queue the task for later execution.
75 
76         Will always put the task into the queue, even if there are idle worker
77         fibers. This method is mostly useful when implementing advanced library
78         facilities to ensure that no immediate execution takes place.
79 
80         Will result in starting the task in the next event loop cycle at the
81         earliest.
82 
83         Params:
84             task = derivative from `ocean.task.Task` defining some application
85                 task to execute
86 
87         Throws:
88             TaskQueueFullException if task queue is at full capacity AND
89             if no custom `task_queue_full_cb` is set.
90 
91     ***************************************************************************/
92 
93     public void queue ( Task task )
94     {
95         if (!this.queued_tasks.push(task))
96         {
97             debug_trace("trying to queue a task while task queue is full");
98 
99             if (this.task_queue_full_cb !is null)
100                 this.task_queue_full_cb(task, this.queued_tasks);
101             else
102                 enforce(this.queue_full_e, false);
103         }
104         else
105         {
106             debug_trace(
107                 "task '{}' queued for delayed execution",
108                 cast(void*) task
109             );
110         }
111     }
112 
113     /***************************************************************************
114 
115         Method used to execute a task.
116 
117         If there are idle worker fibers, the task will be executed immediately
118         and this method will only return when that task first calls `suspend`.
119 
120         If all workers are busy, the task will be added to the queue and this
121         method will return immediately.
122 
123         Params:
124             task = derivative from `ocean.task.Task` defining some application
125                 task to execute
126 
127         Throws:
128             TaskQueueFullException if task queue is at full capacity AND
129             if no custom `task_queue_full_cb` is set.
130 
131     ***************************************************************************/
132 
133     public void runOrQueue ( Task task )
134     {
135         if (this.num_busy() >= this.limit())
136         {
137             this.queue(task);
138             return;
139         }
140 
141         auto fiber = this.get();
142         debug_trace("running task <{}> via worker fiber <{}>",
143             cast(void*) task, cast(void*) fiber);
144         // `Task.entryPoint` is supposed to be entry method for worker fiber
145         // but it does not allow to reuse worker fiber immediately for new tasks
146         // waititing in the queue. Because of that a custom method is used as
147         // entry point instead which will internally call `Task.entryPoint`
148         // directly:
149         task.assignTo(fiber, &this.workerFiberMethod);
150         task.resume();
151     }
152 
153     /***************************************************************************
154 
155         Set in runOrQueue() as a "real" fiber entry method when a task is
156         assigned to a worker fiber.
157 
158         Takes care of:
159         - recycling both task and worker fiber after main task method finishes
160         - reusing current worker fiber to run new scheduled tasks if there are
161             any
162 
163     ***************************************************************************/
164 
165     private void workerFiberMethod ( )
166     {
167         auto fiber = cast(WorkerFiber) Fiber.getThis();
168         enforce(fiber !is null);
169         auto task = fiber.activeTask();
170         enforce(task !is null);
171 
172         void runTask ( )
173         {
174             bool had_exception = task.entryPoint();
175 
176             // in case task was resumed after unhandled exception, delay further
177             // execution for one cycle to avoid situation where exception handler
178             // calls `Task.continueAfterThrow()` and that throws again
179             if (had_exception)
180                 theScheduler.processEvents();
181 
182             // makes impossible to use the task by an accident in the period
183             // between finishing it here and getting it started anew after
184             // recycling
185             task.fiber = null;
186         }
187 
188         runTask();
189 
190         while (this.queued_tasks.pop(task))
191         {
192             // there are some scheduled tasks in the queue. it is best for
193             // latency and performance to start one of those immediately in
194             // current fiber instead of going through recycle+get again
195             debug_trace("Reusing worker fiber <{}> to run scheduled task <{}>",
196                 cast(void*) fiber, cast(void*) task);
197 
198             task.assignTo(fiber);
199             runTask();
200         }
201 
202         // there are no scheduled tasks right now, can simply recycle
203         // worker fiber for future usage
204         debug_trace("Recycling worker fiber <{}>", cast(void*) fiber);
205         this.recycle(fiber);
206     }
207 }
208 
209 /******************************************************************************
210 
211     See `FiberPoolWithQueue.task_queue_full_cb`
212 
213 ******************************************************************************/
214 
215 public alias void delegate(Task, FixedRingQueue!(Task)) TaskQueueFullCB;
216 
217 
218 private void debug_trace ( T... ) ( cstring format, T args )
219 {
220     debug ( TaskScheduler )
221     {
222         Stdout.formatln( "[ocean.task.internal.FiberPoolWithQueue] "
223             ~ format, args ).flush();
224     }
225 }