1 /*******************************************************************************
2 
3     This module defines public API for `ocean.task.Scheduler` together with
4     all involved data types. It is intended to be used in other modules instead
5     of direct `ocean.task.Scheduler` import to untangle complex dependency chain
6     the latter brings.
7 
8     Copyright:
9         Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved.
10 
11     License:
12         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
13         Alternatively, this file may be distributed under the terms of the Tango
14         3-Clause BSD License (see LICENSE_BSD.txt for details).
15 
16 *******************************************************************************/
17 
18 module ocean.task.IScheduler;
19 
20 import core.thread;
21 
22 import ocean.task.Task;
23 import ocean.meta.traits.Indirections : hasIndirections;
24 import ocean.meta.types.Qualifiers;
25 import ocean.io.select.EpollSelectDispatcher;
26 import ocean.core.Optional;
27 
28 /*******************************************************************************
29 
30     Scheduler handles concurrent execution of all application tasks,
31     assigning them to limited amount of worker fibers. Scheduler functionality
32     can be split into 3 major blocks:
33 
34     1. executing arbitrary `ocean.task.Task` using a worker fiber from a pool
35     2. keeping queue of tasks to execute in cases when all worker fibers are
36        busy
37     3. allowing any task to suspend to allow event processing by epoll in a way
38        that it will be automatically resumed by scheduler again when there are
39        no more immediate pending events
40 
41 *******************************************************************************/
42 
43 public interface IScheduler
44 {
45     /***************************************************************************
46 
47         Aggregation of various size limits used internally by scheduler.
48         Instance of this struct is used as `initScheduler` argument.
49 
50         Default values of scheduler configuration are picked for convenient
51         usage of `SchedulerConfiguration.init` in tests - large stack size but
52         small queue limits.
53 
54     ***************************************************************************/
55 
56     public struct Configuration
57     {
58         /// stack size allocated for all worker fibers created by the scheduler
59         size_t worker_fiber_stack_size = 102400;
60 
61         /// maximum amount of simultaneous worker fibers in the scheduler pool
62         size_t worker_fiber_limit = 5;
63 
64         /// maximum amount of tasks awaiting scheduling in the queue while all
65         /// worker fibers are busy
66         size_t task_queue_limit = 10;
67 
68         /// optional array that defines specialized worker fiber pools to be
69         /// used for handling specific task kinds. Scheduled task is checked
70         /// against this array every time thus it is not recommended to configure
71         /// it to more than a few dedicated extra pools
72         PoolDescription[] specialized_pools;
73 
74         /// Defines single mapping of `ClassInfo` to worker fiber pool in
75         /// configuration
76         public struct PoolDescription
77         {
78             /// fully qualified name (same as `Task.classinfo.name()`) for task
79             /// type which is to be handled by this pool
80             istring task_name;
81 
82             /// worker fiber allocated stack size
83             size_t stack_size;
84         }
85     }
86 
87     /***************************************************************************
88 
89         Aggregate of various statistics that indicate overall scheduler load
90         and performance.
91 
92     ***************************************************************************/
93 
94     public struct Stats
95     {
96         size_t task_queue_busy;
97         size_t task_queue_total;
98         size_t suspended_tasks;
99         size_t worker_fiber_busy;
100         size_t worker_fiber_total;
101     }
102 
103     /***************************************************************************
104 
105         Usage stats of a single specialized task pool
106 
107     ***************************************************************************/
108 
109     public struct SpecializedPoolStats
110     {
111         size_t used_fibers;
112         size_t total_fibers;
113     }
114 
115     /***************************************************************************
116 
117         Getter for scheduler epoll instance. Necessary for integration with
118         `ISelectClient` utilities so that new select client can be registered.
119 
120         Returns:
121             internally used epoll instance
122 
123     ***************************************************************************/
124 
125     public EpollSelectDispatcher epoll ( );
126 
127     /***************************************************************************
128 
129         Forces early termination of all active tasks and shuts down
130         event loop.
131 
132         After this method has been called any attempt to interact with
133         the scheduler will kill the calling task.
134 
135     ***************************************************************************/
136 
137     public void shutdown ( );
138 
139     /***************************************************************************
140 
141         Provides load stats for the scheduler
142 
143         Common usage examples would be load throttling and load stats recording.
144 
145         Returns:
146             struct instance which aggregates all stats
147 
148     ***************************************************************************/
149 
150     public Stats getStats ( );
151 
152     /***************************************************************************
153 
154         Returns:
155             Stats struct for a specialized pool defined by `name` if there is
156             such pool. Empty Optional otherwise.
157 
158     ***************************************************************************/
159 
160     public Optional!(SpecializedPoolStats) getSpecializedPoolStats ( cstring name );
161 
162     /***************************************************************************
163 
164         Method used to execute new task.
165 
166         If there are idle worker fibers, the task will be executed immediately
167         and this method will only return when that task first calls `suspend`.
168 
169         If all workers are busy, the task will be added to the queue and this
170         method will return immediately.
171 
172         Params:
173             task = derivative from `ocean.task.Task` defining some application
174                 task to execute
175 
176         Throws:
177             TaskQueueFullException if task queue is at full capacity
178 
179     ***************************************************************************/
180 
181     public void schedule ( Task task );
182 
183     /***************************************************************************
184 
185         Method used to queue the task for later execution.
186 
187         Will always put the task into the queue, even if there are idle worker
188         fibers. This method is mostly useful when implementing advanced library
189         facilities to ensure that no immediate execution takes place.
190 
191         Will result in starting the task in the next event loop cycle at the
192         earliest.
193 
194         Params:
195             task = derivative from `ocean.task.Task` defining some application
196                 task to execute
197 
198         Throws:
199             TaskQueueFullException if task queue is at full capacity AND
200             if no custom `task_queue_full_cb` is set.
201 
202     ***************************************************************************/
203 
204     public void queue ( Task task );
205 
206     /***************************************************************************
207 
208         Schedules the argument and suspends calling task until the argument
209         finishes.
210 
211         This method must not be called outside of a task.
212 
213         Because of how termination hooks are implemented, by the time `await`
214         returns, the task object is not yet completely recycled - it will only
215         happen during next context switch. Caller of `await` must either ensure
216         that the task object lives long enough for that or call
217         `theScheduler.processEvents` right after `await` to ensure immediate
218         recycle (at the performance cost of an extra context switch).
219 
220         Params:
221             task = task to schedule and wait for
222             finished_dg = optional delegate called after task finishes but
223                 before it gets recycled, can be used to copy some data into
224                 caller fiber context
225 
226     ***************************************************************************/
227 
228     public void await ( Task task, scope void delegate (Task) finished_dg = null );
229 
230     /***************************************************************************
231 
232         Convenience shortcut on top of `await` to await for a task and return
233         some value type as a result.
234 
235         Params:
236             task = any task that defines `result` public field  of type with no
237                 indirections
238 
239         Returns:
240             content of `result` field of the task read right after that task
241             finishes
242 
243     ***************************************************************************/
244 
245     public final typeof(TaskT.result) awaitResult ( TaskT : Task ) ( TaskT task )
246     {
247         static assert (
248             !hasIndirections!(typeof(task.result)),
249             "'awaitResult' can only work with result types with no indirection"
250         );
251         typeof(task.result) result;
252         this.await(task, (Task t) { result = (cast(TaskT) t).result; });
253         return result;
254     }
255 
256     /***************************************************************************
257 
258         Similar to `await` but also has waiting timeout. Calling task will be
259         resumed either if awaited task finished or timeout is hit, whichever
260         happens first.
261 
262         Params:
263             task = task to await
264             micro_seconds = timeout duration
265 
266         Returns:
267             'true' if resumed via timeout, 'false' otherwise
268 
269     ***************************************************************************/
270 
271     public bool awaitOrTimeout ( Task task, uint micro_seconds );
272 
273     /***************************************************************************
274 
275         Orders scheduler to resume given task unconditionally after current
276         epoll cycle. Must be used instead of plain `Task.resume` from
277         termination hooks of other tasks.
278 
279         Params:
280             task = task object to resume on next cycle
281 
282         Throws:
283             SuspendQueueFullException if resuming queue is full
284 
285     ***************************************************************************/
286 
287     public void delayedResume ( Task task );
288 
289     /***************************************************************************
290 
291         Suspends current fiber temporarily, allowing pending events to be
292         processed. Current fiber will be resumed as soon as no immediate events
293         are left.
294 
295         Throws:
296             SuspendQueueFullException if suspending is not possible because
297             resuming queue is full
298 
299     ***************************************************************************/
300 
301     public void processEvents ( );
302 
303     /***************************************************************************
304 
305         Starts pseudo-infinite event loop. Event loop will keep running as long
306         as there is at least one event registered.
307 
308         Throws:
309             SanityException if there are some active worker fibers
310             left in the pool by the time there are not events left
311 
312     ***************************************************************************/
313 
314     public void eventLoop ( );
315 }
316 
317 /*******************************************************************************
318 
319     Singleton scheduler instance, same as `ocean.task.Scheduler.theScheduler`
320     but returns that object as `IScheduler` interface.
321 
322     Returns:
323         the global scheduler instance
324 
325 *******************************************************************************/
326 
327 public IScheduler theScheduler ( )
328 {
329     assert(_scheduler !is null, "Scheduler is null, initScheduler must be called before using it");
330 
331     return _scheduler;
332 }
333 
334 /*******************************************************************************
335 
336     Returns:
337         'true' if scheduler system was initialized, 'false' otherwise
338 
339 *******************************************************************************/
340 
341 public bool isSchedulerUsed ( )
342 {
343     return _scheduler !is null;
344 }
345 
346 version (unittest)
347 {
348     /***************************************************************************
349 
350         Occasionally useful in tests to drop reference to already initialized
351         scheduler and test some code as if scheduler is not present.
352 
353     ***************************************************************************/
354 
355     public void dropScheduler ( )
356     {
357         _scheduler = null;
358     }
359 }
360 
361 /*******************************************************************************
362 
363     Initialized externally from `ocean.task.Scheduler` to reference the same
364     singleton object.
365 
366 *******************************************************************************/
367 
368 package IScheduler _scheduler;
369 
370 /******************************************************************************
371 
372     Exception thrown when scheduled task queue overflows
373 
374 ******************************************************************************/
375 
376 public class TaskQueueFullException : Exception
377 {
378     this ( )
379     {
380         super("Attempt to schedule a task when all worker fibers are busy "
381             ~ " and delayed execution task queue is full");
382     }
383 }