1 /*******************************************************************************
2 3 Task based epoll select dispatcher client.
4 5 Manages the epoll registration of a file descriptor, suspends a task to wait
6 for I/O events to occur for that file descriptor, and resumes the task to
7 handle the occurred events.
8 9 Copyright:
10 Copyright (c) 2017 dunnhumby Germany GmbH.
11 All rights reserved.
12 13 License:
14 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
15 Alternatively, this file may be distributed under the terms of the Tango
16 3-Clause BSD License (see LICENSE_BSD.txt for details).
17 18 *******************************************************************************/19 20 moduleocean.io.select.protocol.task.TaskSelectClient;
21 22 importocean.core.Verify;
23 importocean.io.select.client.model.ISelectClient;
24 25 classTaskSelectClient: ISelectClient26 {
27 importocean.task.Task: Task;
28 importocean.task.IScheduler: theScheduler;
29 importocean.io.select.protocol.task.TimeoutException;
30 importocean.util.log.Logger;
31 importocean.transition;
32 33 debug (SelectFiber) importocean.io.Stdout: Stdout;
34 35 /***************************************************************************
36 37 The I/O device.
38 39 ***************************************************************************/40 41 privateISelectableiodev;
42 43 /***************************************************************************
44 45 The I/O events passed to `epoll.register()/modify()`.
46 Also used in logic to avoid unnecessary unregister/register sequences:
47 A non-zero value means the I/O device is, zero that it is not registered
48 with epoll.
49 50 ***************************************************************************/51 52 privateEventevents_expected;
53 54 /***************************************************************************
55 56 The task that is suspended while waiting for events.
57 58 ***************************************************************************/59 60 privateTasktask;
61 62 /***************************************************************************
63 64 The I/O events reported by epoll.
65 66 ***************************************************************************/67 68 privateEventevents_reported;
69 70 /***************************************************************************
71 72 `true` if the I/O device timed out before an event occurred.
73 74 ***************************************************************************/75 76 privatebooltimeout_reported;
77 78 /***************************************************************************
79 80 Reusable timeout exception.
81 82 ***************************************************************************/83 84 privateTimeoutExceptione_timeout;
85 86 /***************************************************************************
87 88 Obtains the current error status of the I/O device, such as socket
89 error.
90 91 ***************************************************************************/92 93 privateintdelegate ( ) error_code_dg;
94 95 /***************************************************************************
96 97 Constructor.
98 99 Params:
100 iodev = the I/O device
101 error_code_dg = delegate to query the I/O device error status
102 103 ***************************************************************************/104 105 publicthis ( ISelectableiodev, scopeintdelegate ( ) error_code_dg )
106 {
107 this.iodev = iodev;
108 this.error_code_dg = error_code_dg;
109 }
110 111 /**************************************************************************
112 113 Suspends the current task until epoll reports any event in
114 `events_expected` for the I/O device or the I/O device times out.
115 116 Params:
117 events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR`
118 are always implicitly added)
119 120 Returns:
121 the events reported by epoll.
122 123 Throws:
124 `EpollException` if registering with epoll failed,
125 `TimeoutException` on timeout waiting for I/O events.
126 127 **************************************************************************/128 129 publicEventioWait ( Eventevents_expected )
130 out131 {
132 assert(!this.task);
133 }
134 body135 {
136 verify(events_expected != Event.init);
137 verify(this.taskisnull);
138 139 // Based on the current value of this.events_expected do the following:140 // - If this.events_expected is 0 then the I/O device is not registered141 // with epoll so register it.142 // - If this.events_expected is non-zero then the I/O device is still143 // registered from a previous call of this method so144 // - if this.events_expected differs from events_expected then change145 // the events of the existing registration,146 // - otherwise don't touch the existing registration.147 148 if (this.events_expected != events_expected)
149 {
150 boolalready_registered = !!this.events_expected;
151 this.events_expected = events_expected;
152 autoepoll = theScheduler.epoll;
153 if (already_registered)
154 epoll.modify(this);
155 else156 epoll.register(this);
157 }
158 159 try160 {
161 this.task = task.getThis();
162 verify(this.task !isnull);
163 this.task.suspend();
164 verify(this.taskisnull);
165 166 // At this point handle() has resumed the task and is blocked until167 // the task is suspended again (or terminates). In order to avoid168 // unecessary epoll unregister/register sequences we stay registered169 // with epoll. Now there are two possible scenarios:170 171 // 1. This method is called again without suspending in the mean172 // time the task so handle() is still waiting. We will find173 // events_expected as we left it i.e. not 0, meaning the I/O device174 // is still registered. Then we set this.task to the current task175 // and suspend again. This will resume handle(), which will see176 // this.task is not null and therefore return true to stay177 // registered with epoll and wait for events.178 179 // 2. The task is suspended or terminates outside the reach of this180 // class. handle() will be resumed and see this.task is null so it181 // will set this.events_expected = 0 and return false to request182 // being unregistered. If this method is called again, it will see183 // events_expected == 0, meaning the I/O device is not registered,184 // and register it.185 186 if (this.events_reported & events_reported.EPOLLERR)
187 // Reset events_expected because with EPOLLERR the I/O device is188 // automatically unregistered.189 this.events_expected = events_expected.init;
190 191 if (this.timeout_reported)
192 {
193 this.timeout_reported = false;
194 if (this.e_timeoutisnull)
195 this.e_timeout = newTimeoutException;
196 throwthis.e_timeout;
197 }
198 199 returnthis.events_reported;
200 }
201 finally202 this.events_reported = this.events_reported.init;
203 }
204 205 /**************************************************************************
206 207 Unregisters the I/O device.
208 209 You need to call this method if you close and then reopen or otherwise
210 reassign the I/O device's file descriptor *without* suspending and
211 resuming or terminating and restarting the task in between. You may call
212 this method at any time, even if the I/O device is not (yet or any more)
213 usable, in which case it will not throw.
214 215 Calling `unregister` is needed if the file descriptor is closed and
216 reopened because when closing the OS unregisters it from epoll behind
217 this instance's back. However,
218 - suspending or terminating the task has the same effect as calling
219 `unregister` and
220 - if the file descriptor is not going to be closed then unregistering
221 is harmless, as `ioWait` this will re-register it on the next call.
222 223 Returns:
224 0 if everything worked as expected or
225 - `ENOENT` if the client was already unregistered or
226 - `EBADF` if the I/O device file descriptor does not refer to a
227 useable device (i.e. it was closed already or not opened yet).
228 In any case the I/O device is not registered with epoll.
229 230 Throws:
231 `EpollException` on error.
232 233 **************************************************************************/234 235 publicintunregister ( )
236 {
237 this.events_expected = events_expected.init;
238 returntheScheduler.epoll.unregister(this);
239 }
240 241 /**************************************************************************
242 243 Resumes the task to handle `events`.
244 245 Params:
246 events = the events reported by epoll
247 248 Returns:
249 true to stay registered with epoll or false to be unregistered.
250 251 **************************************************************************/252 253 overridepublicboolhandle ( Eventevents )
254 {
255 // This handler should only be called while ioWait() is blocked in the256 // task.suspend() call so this.task should always refer to the task to257 // be resumed.258 if (Tasktask = this.task)
259 {
260 this.task = null;
261 262 debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~
263 ".handle: fd {} task resumed", this.fd);
264 265 this.events_reported = events;
266 task.resume();
267 268 debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~
269 ".handle: fd {} task yielded", this.fd);
270 271 if (this.task)
272 {
273 // After the task has been resumed, ioWait() has returned, then274 // it was called again and suspended the task again so stay275 // registered with epoll.276 returntrue;
277 }
278 else279 {
280 // The task was suspended outside ioWait() or has terminated so281 // it is not waiting for events for this.iodev.282 this.events_expected = this.events_expected.init;
283 returnfalse;
284 }
285 }
286 else287 {
288 // Should not happen, unregister to make the event we cannot handle289 // firing.290 Log.lookup(typeof(this).stringof).error(
291 "handle {} fd {} events {:X}: no task to resume, unregistering",
292 this.iodev, this.fd, events293 );
294 debug ( SelectFiber )
295 Stdout.formatln(typeof(this).stringof ~
296 ".handle: fd {} no task to resume, unregistering", this.fd);
297 returnfalse;
298 }
299 }
300 301 /**************************************************************************
302 303 Returns:
304 the I/O device file handle.
305 306 **************************************************************************/307 308 overridepublicHandlefileHandle ( ) { returnthis.iodev.fileHandle; }
309 310 /**************************************************************************
311 312 Returns:
313 the events to register the I/O device for.
314 315 **************************************************************************/316 317 overridepublicEventevents ( ) { returnthis.events_expected; }
318 319 /**************************************************************************
320 321 Returns:
322 current I/O (e.g. socket) error code, if available, or 0 otherwise.
323 324 **************************************************************************/325 326 overridepublicinterror_code ( ) { returnthis.error_code_dg(); }
327 328 /***************************************************************************
329 330 Called if `EPOLLERR` was reported for the I/O device.
331 332 Future direction: This class will handle `EPOLLERR` through `handle()`
333 when error reporting and finalizing are removed from `ISelectClient`.
334 335 Params:
336 exception = not the issue here
337 event = the reported events including `EPOLLERR`
338 339 ***************************************************************************/340 341 overrideprotectedvoiderror_ ( Exceptionexception, Eventevents )
342 {
343 this.events_reported = events;
344 }
345 346 /**************************************************************************
347 348 Called after this instance was unregistered from epoll because either
349 - `handle()` returned `false` (`status.Success`) or
350 - the I/O device timed out (`status.Timeout`) or
351 - `EPOLLERR` has been reported for the I/O device (`status.Error`).
352 353 Note that for `status` other than `Success` `handle()` is not called.
354 355 Future direction: When error reporting and finalizing are removed from
356 `ISelectClient`,
357 - timeouts can be handled by overriding `void timeout()`,
358 - this class will handle `EPOLLERR` through `handle()`,
359 - finalization with `status.Success` doesn't need to be handled.
360 361 Params:
362 event = the events reported including `EPOLLERR`
363 364 **************************************************************************/365 366 overridepublicvoidfinalize ( FinalizeStatusstatus )
367 {
368 if (Tasktask = this.task)
369 {
370 this.task = null;
371 this.events_expected = events_expected.init;
372 373 /* D2: final */switch (status)
374 {
375 casestatus.Success:
376 break;
377 378 casestatus.Timeout:
379 this.timeout_reported = true;
380 task.resume();
381 break;
382 383 casestatus.Error:
384 // error_() has been called before so ioWait() will simply385 // return the reported events which include EPOLLERR.386 task.resume();
387 break;
388 389 default: assert(false);
390 }
391 }
392 }
393 394 /***************************************************************************
395 396 Returns the file descriptor of the I/O device as `int`, for logging.
397 398 Returns:
399 the file descriptor of the I/O device.
400 401 ***************************************************************************/402 403 privateintfd ( )
404 {
405 autohandle = this.iodev.fileHandle;
406 407 staticif (!is(typeof(handle) == int))
408 {
409 staticassert(is(handle.IsTypedef));
410 staticassert(is(typeof(handle.value) == int));
411 }
412 413 returncast(int)handle;
414 }
415 }