1 /******************************************************************************* 2 3 Collection of task waiting / timer utilities wrapped in an easy to use, 4 pseudo-blocking API. 5 6 Uses a private static `ocean.io.select.client.TimerSet` instance for fiber 7 resuming. 8 9 Usage example: 10 See the documented unittest of the `wait()` function 11 12 Copyright: 13 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 14 All rights reserved. 15 16 License: 17 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 18 Alternatively, this file may be distributed under the terms of the Tango 19 3-Clause BSD License (see LICENSE_BSD.txt for details). 20 21 *******************************************************************************/ 22 23 module ocean.task.util.Timer; 24 25 import ocean.meta.types.Qualifiers; 26 import ocean.io.select.client.TimerSet; 27 import ocean.util.container.pool.ObjectPool; 28 import ocean.task.Task; 29 import ocean.task.IScheduler; 30 31 debug (TaskScheduler) 32 { 33 import ocean.io.Stdout; 34 } 35 36 version (unittest) 37 { 38 debug = TaskTimerInvariant; 39 } 40 41 /******************************************************************************* 42 43 Suspends the current fiber/task and resumes it again after `micro_seconds` 44 microseconds. 45 46 Params: 47 micro_seconds = amount of microseconds to suspend for 48 49 *******************************************************************************/ 50 51 public void wait ( uint micro_seconds ) 52 { 53 if (micro_seconds == 0) 54 return; 55 56 auto task = Task.getThis(); 57 assert (task !is null); 58 59 auto scheduled_event = registerResumeEvent(task, micro_seconds); 60 task.terminationHook(&scheduled_event.unregister); 61 62 debug_trace("Suspending task <{}> for {} microseconds", 63 cast(void*) task, micro_seconds); 64 task.suspend(); 65 66 task.removeTerminationHook(&scheduled_event.unregister); 67 } 68 69 unittest 70 { 71 void example ( ) 72 { 73 class SimpleTask : Task 74 { 75 override public void run ( ) 76 { 77 for (int i = 0; i < 10; ++i) 78 .wait(10); 79 } 80 } 81 82 auto task = new SimpleTask; 83 theScheduler.schedule(task); 84 theScheduler.eventLoop(); 85 } 86 } 87 88 /******************************************************************************* 89 90 Similar to `theScheduler.await` but also has waiting timeout. Calling task 91 will be resumed either if awaited task finished or timeout is hit, whichever 92 happens first. 93 94 If `task` is already scheduled, it will not be re-scheduled again but 95 awaiting will still occur. 96 97 Params: 98 task = task to await 99 micro_seconds = timeout duration 100 101 Returns: 102 'true' if resumed via timeout, 'false' otherwise 103 104 *******************************************************************************/ 105 106 public bool awaitOrTimeout ( Task task, uint micro_seconds ) 107 { 108 auto context = Task.getThis(); 109 assert (context !is null); 110 111 auto scheduled_event = registerResumeEvent(context, micro_seconds); 112 task.terminationHook(&scheduled_event.unregister); 113 scope resumer = { 114 if (context.suspended()) 115 theScheduler.delayedResume(context); 116 }; 117 task.terminationHook(resumer); 118 119 if (!task.suspended()) 120 theScheduler.schedule(task); 121 122 // suspend if not finished immediately 123 if (!task.finished()) 124 context.suspend(); 125 126 if (task.finished()) 127 { 128 // resumed because awaited task has finished 129 // timer was already unregistered by its termination hook, just quit 130 return false; 131 } 132 else 133 { 134 // resumed because of timeout, need to clean up termination hooks of 135 // awaited task to avoid double resume 136 task.removeTerminationHook(resumer); 137 task.removeTerminationHook(&scheduled_event.unregister); 138 return true; 139 } 140 } 141 142 unittest 143 { 144 void example ( ) 145 { 146 static class InfiniteTask : Task 147 { 148 override public void run ( ) 149 { 150 for (;;) .wait(100); 151 } 152 } 153 154 static class RootTask : Task 155 { 156 Task to_wait_for; 157 158 override public void run ( ) 159 { 160 bool timeout = .awaitOrTimeout(this.to_wait_for, 200); 161 162 // `awaitOrTimeout` itself won't terminate awaited task on 163 // timeout, it will only "detach" it from the current context. 164 // If former is 165 // desired, it can be trivially done at the call site: 166 if (timeout) 167 this.to_wait_for.kill(); 168 } 169 } 170 171 auto root = new RootTask; 172 root.to_wait_for = new InfiniteTask; 173 174 theScheduler.schedule(root); 175 theScheduler.eventLoop(); 176 } 177 } 178 179 180 /******************************************************************************* 181 182 Implements timer event pool together with logic to handle arbitrary 183 amount of events using single file descriptor. Allocated by 184 `registerResumeEvent` on access. 185 186 *******************************************************************************/ 187 188 package TaskTimerSet timer; 189 190 /******************************************************************************* 191 192 Event data to be used with timer scheduler. Simply contains reference 193 to heap-allocated resumer closure (which is necessary to keep it valid 194 after fiber suspends). 195 196 *******************************************************************************/ 197 198 private struct EventData 199 { 200 Task to_resume; 201 } 202 203 /******************************************************************************* 204 205 Derivative from generic ocean TimerSet which augments it with task-specific 206 sanity checks. 207 208 *******************************************************************************/ 209 210 private class TaskTimerSet : TimerSet!(EventData) 211 { 212 debug (TaskTimerInvariant) 213 { 214 invariant ( ) 215 { 216 auto _this = cast(TaskTimerSet) this; 217 scope iterator = _this.events..new BusyItemsIterator; 218 foreach (event; iterator) 219 { 220 assert (event.data.to_resume.fiber !is null); 221 } 222 } 223 } 224 } 225 226 /******************************************************************************* 227 228 Helper function providing common code to schedule new timer event in a 229 global `.timer` timer set. 230 231 Allocates global `.timer` if not done already. 232 233 Params: 234 to_resume = task to resume when event fires 235 micro_seconds = time to wait before event fires 236 237 Returns: 238 registered event interface 239 240 *******************************************************************************/ 241 242 private TimerSet!(EventData).IEvent registerResumeEvent ( 243 Task to_resume, uint micro_seconds ) 244 { 245 if (timer is null) 246 timer = new typeof(timer); 247 248 assert(to_resume !is null); 249 250 return .timer.schedule( 251 // EventData setup is run from the same fiber so it is ok to reference 252 // variable from this function stack 253 ( ref EventData event ) 254 { 255 event.to_resume = to_resume; 256 }, 257 // Callback of fired timer is run from epoll context and here it is 258 // only legal to use data captured as EventData field (or other heap 259 // allocated data) 260 ( ref EventData event ) 261 { 262 debug_trace("Resuming task <{}> by timer event", 263 cast(void*) event.to_resume); 264 event.to_resume.resume(); 265 }, 266 micro_seconds 267 ); 268 } 269 270 private void debug_trace ( T... ) ( cstring format, T args ) 271 { 272 debug ( TaskScheduler ) 273 { 274 Stdout.formatln( "[ocean.task.util.Timer] " ~ format, args ).flush(); 275 } 276 }