1 /*******************************************************************************
2
3 Collection of task waiting / timer utilities wrapped in an easy to use,
4 pseudo-blocking API.
5
6 Uses a private static `ocean.io.select.client.TimerSet` instance for fiber
7 resuming.
8
9 Usage example:
10 See the documented unittest of the `wait()` function
11
12 Copyright:
13 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
14 All rights reserved.
15
16 License:
17 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
18 Alternatively, this file may be distributed under the terms of the Tango
19 3-Clause BSD License (see LICENSE_BSD.txt for details).
20
21 *******************************************************************************/
22
23 module ocean.task.util.Timer;
24
25 import ocean.meta.types.Qualifiers;
26 import ocean.io.select.client.TimerSet;
27 import ocean.util.container.pool.ObjectPool;
28 import ocean.task.Task;
29 import ocean.task.IScheduler;
30
31 debug (TaskScheduler)
32 {
33 import ocean.io.Stdout;
34 }
35
36 version (unittest)
37 {
38 debug = TaskTimerInvariant;
39 }
40
41 /*******************************************************************************
42
43 Suspends the current fiber/task and resumes it again after `micro_seconds`
44 microseconds.
45
46 Params:
47 micro_seconds = amount of microseconds to suspend for
48
49 *******************************************************************************/
50
51 public void wait ( uint micro_seconds )
52 {
53 if (micro_seconds == 0)
54 return;
55
56 auto task = Task.getThis();
57 assert (task !is null);
58
59 auto scheduled_event = registerResumeEvent(task, micro_seconds);
60 task.terminationHook(&scheduled_event.unregister);
61
62 debug_trace("Suspending task <{}> for {} microseconds",
63 cast(void*) task, micro_seconds);
64 task.suspend();
65
66 task.removeTerminationHook(&scheduled_event.unregister);
67 }
68
69 unittest
70 {
71 void example ( )
72 {
73 class SimpleTask : Task
74 {
75 override public void run ( )
76 {
77 for (int i = 0; i < 10; ++i)
78 .wait(10);
79 }
80 }
81
82 auto task = new SimpleTask;
83 theScheduler.schedule(task);
84 theScheduler.eventLoop();
85 }
86 }
87
88 /*******************************************************************************
89
90 Similar to `theScheduler.await` but also has waiting timeout. Calling task
91 will be resumed either if awaited task finished or timeout is hit, whichever
92 happens first.
93
94 If `task` is already scheduled, it will not be re-scheduled again but
95 awaiting will still occur.
96
97 Params:
98 task = task to await
99 micro_seconds = timeout duration
100
101 Returns:
102 'true' if resumed via timeout, 'false' otherwise
103
104 *******************************************************************************/
105
106 public bool awaitOrTimeout ( Task task, uint micro_seconds )
107 {
108 auto context = Task.getThis();
109 assert (context !is null);
110
111 auto scheduled_event = registerResumeEvent(context, micro_seconds);
112 task.terminationHook(&scheduled_event.unregister);
113 scope resumer = {
114 if (context.suspended())
115 theScheduler.delayedResume(context);
116 };
117 task.terminationHook(resumer);
118
119 if (!task.suspended())
120 theScheduler.schedule(task);
121
122 // suspend if not finished immediately
123 if (!task.finished())
124 context.suspend();
125
126 if (task.finished())
127 {
128 // resumed because awaited task has finished
129 // timer was already unregistered by its termination hook, just quit
130 return false;
131 }
132 else
133 {
134 // resumed because of timeout, need to clean up termination hooks of
135 // awaited task to avoid double resume
136 task.removeTerminationHook(resumer);
137 task.removeTerminationHook(&scheduled_event.unregister);
138 return true;
139 }
140 }
141
142 unittest
143 {
144 void example ( )
145 {
146 static class InfiniteTask : Task
147 {
148 override public void run ( )
149 {
150 for (;;) .wait(100);
151 }
152 }
153
154 static class RootTask : Task
155 {
156 Task to_wait_for;
157
158 override public void run ( )
159 {
160 bool timeout = .awaitOrTimeout(this.to_wait_for, 200);
161
162 // `awaitOrTimeout` itself won't terminate awaited task on
163 // timeout, it will only "detach" it from the current context.
164 // If former is
165 // desired, it can be trivially done at the call site:
166 if (timeout)
167 this.to_wait_for.kill();
168 }
169 }
170
171 auto root = new RootTask;
172 root.to_wait_for = new InfiniteTask;
173
174 theScheduler.schedule(root);
175 theScheduler.eventLoop();
176 }
177 }
178
179
180 /*******************************************************************************
181
182 Implements timer event pool together with logic to handle arbitrary
183 amount of events using single file descriptor. Allocated by
184 `registerResumeEvent` on access.
185
186 *******************************************************************************/
187
188 package TaskTimerSet timer;
189
190 /*******************************************************************************
191
192 Event data to be used with timer scheduler. Simply contains reference
193 to heap-allocated resumer closure (which is necessary to keep it valid
194 after fiber suspends).
195
196 *******************************************************************************/
197
198 private struct EventData
199 {
200 Task to_resume;
201 }
202
203 /*******************************************************************************
204
205 Derivative from generic ocean TimerSet which augments it with task-specific
206 sanity checks.
207
208 *******************************************************************************/
209
210 private class TaskTimerSet : TimerSet!(EventData)
211 {
212 debug (TaskTimerInvariant)
213 {
214 invariant ( )
215 {
216 auto _this = cast(TaskTimerSet) this;
217 scope iterator = _this.events..new BusyItemsIterator;
218 foreach (event; iterator)
219 {
220 assert (event.data.to_resume.fiber !is null);
221 }
222 }
223 }
224 }
225
226 /*******************************************************************************
227
228 Helper function providing common code to schedule new timer event in a
229 global `.timer` timer set.
230
231 Allocates global `.timer` if not done already.
232
233 Params:
234 to_resume = task to resume when event fires
235 micro_seconds = time to wait before event fires
236
237 Returns:
238 registered event interface
239
240 *******************************************************************************/
241
242 private TimerSet!(EventData).IEvent registerResumeEvent (
243 Task to_resume, uint micro_seconds )
244 {
245 if (timer is null)
246 timer = new typeof(timer);
247
248 assert(to_resume !is null);
249
250 return .timer.schedule(
251 // EventData setup is run from the same fiber so it is ok to reference
252 // variable from this function stack
253 ( ref EventData event )
254 {
255 event.to_resume = to_resume;
256 },
257 // Callback of fired timer is run from epoll context and here it is
258 // only legal to use data captured as EventData field (or other heap
259 // allocated data)
260 ( ref EventData event )
261 {
262 debug_trace("Resuming task <{}> by timer event",
263 cast(void*) event.to_resume);
264 event.to_resume.resume();
265 },
266 micro_seconds
267 );
268 }
269
270 private void debug_trace ( T... ) ( cstring format, T args )
271 {
272 debug ( TaskScheduler )
273 {
274 Stdout.formatln( "[ocean.task.util.Timer] " ~ format, args ).flush();
275 }
276 }