1 /*******************************************************************************
2 
3     Collection of task waiting / timer utilities wrapped in an easy to use,
4     pseudo-blocking API.
5 
6     Uses a private static `ocean.io.select.client.TimerSet` instance for fiber
7     resuming.
8 
9     Usage example:
10         See the documented unittest of the `wait()` function
11 
12     Copyright:
13         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
14         All rights reserved.
15 
16     License:
17         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
18         Alternatively, this file may be distributed under the terms of the Tango
19         3-Clause BSD License (see LICENSE_BSD.txt for details).
20 
21 *******************************************************************************/
22 
23 module ocean.task.util.Timer;
24 
25 import ocean.meta.types.Qualifiers;
26 import ocean.io.select.client.TimerSet;
27 import ocean.util.container.pool.ObjectPool;
28 import ocean.task.Task;
29 import ocean.task.IScheduler;
30 
31 debug (TaskScheduler)
32 {
33     import ocean.io.Stdout;
34 }
35 
36 version (unittest)
37 {
38     debug = TaskTimerInvariant;
39 }
40 
41 /*******************************************************************************
42 
43     Suspends the current fiber/task and resumes it again after `micro_seconds`
44     microseconds.
45 
46     Params:
47         micro_seconds = amount of microseconds to suspend for
48 
49 *******************************************************************************/
50 
51 public void wait ( uint micro_seconds )
52 {
53     if (micro_seconds == 0)
54         return;
55 
56     auto task = Task.getThis();
57     assert (task !is null);
58 
59     auto scheduled_event = registerResumeEvent(task, micro_seconds);
60     task.terminationHook(&scheduled_event.unregister);
61 
62     debug_trace("Suspending task <{}> for {} microseconds",
63         cast(void*) task, micro_seconds);
64     task.suspend();
65 
66     task.removeTerminationHook(&scheduled_event.unregister);
67 }
68 
69 unittest
70 {
71     void example ( )
72     {
73         class SimpleTask : Task
74         {
75             override public void run ( )
76             {
77                 for (int i = 0; i < 10; ++i)
78                     .wait(10);
79             }
80         }
81 
82         auto task = new SimpleTask;
83         theScheduler.schedule(task);
84         theScheduler.eventLoop();
85     }
86 }
87 
88 /*******************************************************************************
89 
90     Similar to `theScheduler.await` but also has waiting timeout. Calling task
91     will be resumed either if awaited task finished or timeout is hit, whichever
92     happens first.
93 
94     If `task` is already scheduled, it will not be re-scheduled again but
95     awaiting will still occur.
96 
97     Params:
98         task = task to await
99         micro_seconds = timeout duration
100 
101     Returns:
102         'true' if resumed via timeout, 'false' otherwise
103 
104 *******************************************************************************/
105 
106 public bool awaitOrTimeout ( Task task, uint micro_seconds )
107 {
108     auto context = Task.getThis();
109     assert (context !is null);
110 
111     auto scheduled_event = registerResumeEvent(context, micro_seconds);
112     task.terminationHook(&scheduled_event.unregister);
113     scope resumer = {
114         if (context.suspended())
115             theScheduler.delayedResume(context);
116     };
117     task.terminationHook(resumer);
118 
119     if (!task.suspended())
120         theScheduler.schedule(task);
121 
122     // suspend if not finished immediately
123     if (!task.finished())
124         context.suspend();
125 
126     if (task.finished())
127     {
128         // resumed because awaited task has finished
129         // timer was already unregistered by its termination hook, just quit
130         return false;
131     }
132     else
133     {
134         // resumed because of timeout, need to clean up termination hooks of
135         // awaited task to avoid double resume
136         task.removeTerminationHook(resumer);
137         task.removeTerminationHook(&scheduled_event.unregister);
138         return true;
139     }
140 }
141 
142 unittest
143 {
144     void example ( )
145     {
146         static class InfiniteTask : Task
147         {
148             override public void run ( )
149             {
150                 for (;;) .wait(100);
151             }
152         }
153 
154         static class RootTask : Task
155         {
156             Task to_wait_for;
157 
158             override public void run ( )
159             {
160                 bool timeout = .awaitOrTimeout(this.to_wait_for, 200);
161 
162                 // `awaitOrTimeout` itself won't terminate awaited task on
163                 // timeout, it will only "detach" it from the current context.
164                 // If former is
165                 // desired, it can be trivially done at the call site:
166                 if (timeout)
167                     this.to_wait_for.kill();
168             }
169         }
170 
171         auto root = new RootTask;
172         root.to_wait_for = new InfiniteTask;
173 
174         theScheduler.schedule(root);
175         theScheduler.eventLoop();
176     }
177 }
178 
179 
180 /*******************************************************************************
181 
182     Implements timer event pool together with logic to handle arbitrary
183     amount of events using single file descriptor. Allocated by
184     `registerResumeEvent` on access.
185 
186 *******************************************************************************/
187 
188 package TaskTimerSet timer;
189 
190 /*******************************************************************************
191 
192     Event data to be used with timer scheduler. Simply contains reference
193     to heap-allocated resumer closure (which is necessary to keep it valid
194     after fiber suspends).
195 
196 *******************************************************************************/
197 
198 private struct EventData
199 {
200     Task to_resume;
201 }
202 
203 /*******************************************************************************
204 
205     Derivative from generic ocean TimerSet which augments it with task-specific
206     sanity checks.
207 
208 *******************************************************************************/
209 
210 private class TaskTimerSet : TimerSet!(EventData)
211 {
212     debug (TaskTimerInvariant)
213     {
214         invariant ( )
215         {
216             auto _this = cast(TaskTimerSet) this;
217             scope iterator = _this.events..new BusyItemsIterator;
218             foreach (event; iterator)
219             {
220                 assert (event.data.to_resume.fiber !is null);
221             }
222         }
223     }
224 }
225 
226 /*******************************************************************************
227 
228     Helper function providing common code to schedule new timer event in a
229     global `.timer` timer set.
230 
231     Allocates global `.timer` if not done already.
232 
233     Params:
234         to_resume = task to resume when event fires
235         micro_seconds = time to wait before event fires
236 
237     Returns:
238         registered event interface
239 
240 *******************************************************************************/
241 
242 private TimerSet!(EventData).IEvent registerResumeEvent (
243     Task to_resume, uint micro_seconds )
244 {
245     if (timer is null)
246         timer = new typeof(timer);
247 
248     assert(to_resume !is null);
249 
250     return .timer.schedule(
251         // EventData setup is run from the same fiber so it is ok to reference
252         // variable from this function stack
253         ( ref EventData event )
254         {
255             event.to_resume = to_resume;
256         },
257         // Callback of fired timer is run from epoll context and here it is
258         // only legal to use data captured as EventData field (or other heap
259         // allocated data)
260         ( ref EventData event )
261         {
262             debug_trace("Resuming task <{}> by timer event",
263                 cast(void*) event.to_resume);
264             event.to_resume.resume();
265         },
266         micro_seconds
267     );
268 }
269 
270 private void debug_trace ( T... ) ( cstring format, T args )
271 {
272     debug ( TaskScheduler )
273     {
274         Stdout.formatln( "[ocean.task.util.Timer] " ~ format, args ).flush();
275     }
276 }