1 /*******************************************************************************
2 
3     Task based epoll select dispatcher client.
4 
5     Manages the epoll registration of a file descriptor, suspends a task to wait
6     for I/O events to occur for that file descriptor, and resumes the task to
7     handle the occurred events.
8 
9     Copyright:
10         Copyright (c) 2017 dunnhumby Germany GmbH.
11         All rights reserved.
12 
13     License:
14         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
15         Alternatively, this file may be distributed under the terms of the Tango
16         3-Clause BSD License (see LICENSE_BSD.txt for details).
17 
18 *******************************************************************************/
19 
20 module ocean.io.select.protocol.task.TaskSelectClient;
21 
22 import ocean.core.Verify;
23 import ocean.io.select.client.model.ISelectClient;
24 
25 class TaskSelectClient: ISelectClient
26 {
27     import ocean.task.Task: Task;
28     import ocean.task.IScheduler: theScheduler;
29     import ocean.io.select.protocol.task.TimeoutException;
30     import ocean.util.log.Logger;
31     import ocean.meta.types.Qualifiers;
32 
33     debug (SelectFiber) import ocean.io.Stdout: Stdout;
34 
35     /***************************************************************************
36 
37         The I/O device.
38 
39     ***************************************************************************/
40 
41     private ISelectable iodev;
42 
43     /***************************************************************************
44 
45         The I/O events passed to `epoll.register()/modify()`.
46         Also used in logic to avoid unnecessary unregister/register sequences:
47         A non-zero value means the I/O device is, zero that it is not registered
48         with epoll.
49 
50     ***************************************************************************/
51 
52     private Event events_expected;
53 
54     /***************************************************************************
55 
56         The task that is suspended while waiting for events.
57 
58     ***************************************************************************/
59 
60     private Task task;
61 
62     /***************************************************************************
63 
64         The I/O events reported by epoll.
65 
66     ***************************************************************************/
67 
68     private Event events_reported;
69 
70     /***************************************************************************
71 
72         `true` if the I/O device timed out before an event occurred.
73 
74     ***************************************************************************/
75 
76     private bool timeout_reported;
77 
78     /***************************************************************************
79 
80         Reusable timeout exception.
81 
82     ***************************************************************************/
83 
84     private TimeoutException e_timeout;
85 
86     /***************************************************************************
87 
88         Obtains the current error status of the I/O device, such as socket
89         error.
90 
91     ***************************************************************************/
92 
93     private int delegate ( ) error_code_dg;
94 
95     /***************************************************************************
96 
97         Constructor.
98 
99         Params:
100             iodev         = the I/O device
101             error_code_dg = delegate to query the I/O device error status
102 
103     ***************************************************************************/
104 
105     public this ( ISelectable iodev, scope int delegate ( ) error_code_dg )
106     {
107         this.iodev = iodev;
108         this.error_code_dg = error_code_dg;
109     }
110 
111     /**************************************************************************
112 
113         Suspends the current task until epoll reports any event in
114         `events_expected` for the I/O device or the I/O device times out.
115 
116         Params:
117             events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR`
118                               are always implicitly added)
119 
120         Returns:
121             the events reported by epoll.
122 
123         Throws:
124             `EpollException` if registering with epoll failed,
125             `TimeoutException` on timeout waiting for I/O events.
126 
127      **************************************************************************/
128 
129     public Event ioWait ( Event events_expected )
130     out
131     {
132         assert(!this.task);
133     }
134     do
135     {
136         verify(events_expected != Event.init);
137         verify(this.task is null);
138 
139         // Based on the current value of this.events_expected do the following:
140         // - If this.events_expected is 0 then the I/O device is not registered
141         //   with epoll so register it.
142         // - If this.events_expected is non-zero then the I/O device is still
143         //   registered from a previous call of this method so
144         //   - if this.events_expected differs from events_expected then change
145         //     the events of the existing registration,
146         //   - otherwise don't touch the existing registration.
147 
148         if (this.events_expected != events_expected)
149         {
150             bool already_registered = !!this.events_expected;
151             this.events_expected = events_expected;
152             auto epoll = theScheduler.epoll;
153             if (already_registered)
154                 epoll.modify(this);
155             else
156                 epoll.register(this);
157         }
158 
159         try
160         {
161             this.task = task.getThis();
162             verify(this.task !is null);
163             this.task.suspend();
164             verify(this.task is null);
165 
166             // At this point handle() has resumed the task and is blocked until
167             // the task is suspended again (or terminates). In order to avoid
168             // unecessary epoll unregister/register sequences we stay registered
169             // with epoll. Now there are two possible scenarios:
170 
171             // 1. This method is called again without suspending in the mean
172             // time the task so handle() is still waiting. We will find
173             // events_expected as we left it i.e. not 0, meaning the I/O device
174             // is still registered. Then we set this.task to the current task
175             // and suspend again. This will resume handle(), which will see
176             // this.task is not null and therefore return true to stay
177             // registered with epoll and wait for events.
178 
179             // 2. The task is suspended or terminates outside the reach of this
180             // class. handle() will be resumed and see this.task is null so it
181             // will set this.events_expected = 0 and return false to request
182             // being unregistered. If this method is called again, it will see
183             // events_expected == 0, meaning the I/O device is not registered,
184             // and register it.
185 
186             if (this.events_reported & events_reported.EPOLLERR)
187                 // Reset events_expected because with EPOLLERR the I/O device is
188                 // automatically unregistered.
189                 this.events_expected = events_expected.init;
190 
191             if (this.timeout_reported)
192             {
193                 this.timeout_reported = false;
194                 if (this.e_timeout is null)
195                     this.e_timeout = new TimeoutException;
196                 throw this.e_timeout;
197             }
198 
199             return this.events_reported;
200         }
201         finally
202             this.events_reported = this.events_reported.init;
203     }
204 
205     /**************************************************************************
206 
207         Unregisters the I/O device.
208 
209         You need to call this method if you close and then reopen or otherwise
210         reassign the I/O device's file descriptor *without* suspending and
211         resuming or terminating and restarting the task in between. You may call
212         this method at any time, even if the I/O device is not (yet or any more)
213         usable, in which case it will not throw.
214 
215         Calling `unregister` is needed if the file descriptor is closed and
216         reopened because when closing the OS unregisters it from epoll behind
217         this instance's back. However,
218           - suspending or terminating the task has the same effect as calling
219             `unregister` and
220           - if the file descriptor is not going to be closed then unregistering
221             is harmless, as `ioWait` this will re-register it on the next call.
222 
223         Returns:
224             0 if everything worked as expected or
225              - `ENOENT` if the client was already unregistered or
226              - `EBADF` if the I/O device file descriptor does not refer to a
227                useable device (i.e. it was closed already or not opened yet).
228             In any case the I/O device is not registered with epoll.
229 
230         Throws:
231             `EpollException` on error.
232 
233      **************************************************************************/
234 
235     public int unregister ( )
236     {
237         this.events_expected = events_expected.init;
238         return theScheduler.epoll.unregister(this);
239     }
240 
241     /**************************************************************************
242 
243         Resumes the task to handle `events`.
244 
245         Params:
246             events = the events reported by epoll
247 
248         Returns:
249             true to stay registered with epoll or false to be unregistered.
250 
251      **************************************************************************/
252 
253     override public bool handle ( Event events )
254     {
255         // This handler should only be called while ioWait() is blocked in the
256         // task.suspend() call so this.task should always refer to the task to
257         // be resumed.
258         if (Task task = this.task)
259         {
260             this.task = null;
261 
262             debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~
263                 ".handle: fd {} task resumed", this.fd);
264 
265             this.events_reported = events;
266             task.resume();
267 
268             debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~
269                 ".handle: fd {} task yielded", this.fd);
270 
271             if (this.task)
272             {
273                 // After the task has been resumed, ioWait() has returned, then
274                 // it was called again and suspended the task again so stay
275                 // registered with epoll.
276                 return true;
277             }
278             else
279             {
280                 // The task was suspended outside ioWait() or has terminated so
281                 // it is not waiting for events for this.iodev.
282                 this.events_expected = this.events_expected.init;
283                 return false;
284             }
285         }
286         else
287         {
288             // Should not happen, unregister to make the event we cannot handle
289             // firing.
290             Log.lookup(typeof(this).stringof).error(
291                 "handle {} fd {} events {:X}: no task to resume, unregistering",
292                 this.iodev, this.fd, events
293             );
294             debug ( SelectFiber )
295                 Stdout.formatln(typeof(this).stringof ~
296                 ".handle: fd {} no task to resume, unregistering", this.fd);
297             return false;
298         }
299     }
300 
301     /**************************************************************************
302 
303         Returns:
304             the I/O device file handle.
305 
306      **************************************************************************/
307 
308     override public Handle fileHandle ( ) { return this.iodev.fileHandle; }
309 
310     /**************************************************************************
311 
312         Returns:
313             the events to register the I/O device for.
314 
315      **************************************************************************/
316 
317     override public Event events ( ) { return this.events_expected; }
318 
319     /**************************************************************************
320 
321         Returns:
322             current I/O (e.g. socket) error code, if available, or 0 otherwise.
323 
324      **************************************************************************/
325 
326     override public int error_code ( ) { return this.error_code_dg(); }
327 
328     /***************************************************************************
329 
330         Called if `EPOLLERR` was reported for the I/O device.
331 
332         Future direction: This class will handle `EPOLLERR` through `handle()`
333         when error reporting and finalizing are removed from `ISelectClient`.
334 
335         Params:
336             exception = not the issue here
337             event     = the reported events including `EPOLLERR`
338 
339     ***************************************************************************/
340 
341     override protected void error_ ( Exception exception, Event events )
342     {
343         this.events_reported = events;
344     }
345 
346     /**************************************************************************
347 
348         Called after this instance was unregistered from epoll because either
349          - `handle()` returned `false` (`status.Success`) or
350          - the I/O device timed out (`status.Timeout`) or
351          - `EPOLLERR` has been reported for the I/O device (`status.Error`).
352 
353         Note that for `status` other than `Success` `handle()` is not called.
354 
355         Future direction: When error reporting and finalizing are removed from
356         `ISelectClient`,
357          - timeouts can be handled by overriding `void timeout()`,
358          - this class will handle `EPOLLERR` through `handle()`,
359          - finalization with `status.Success` doesn't need to be handled.
360 
361         Params:
362             event     = the events reported including `EPOLLERR`
363 
364      **************************************************************************/
365 
366     override public void finalize ( FinalizeStatus status )
367     {
368         if (Task task = this.task)
369         {
370             this.task = null;
371             this.events_expected = events_expected.init;
372 
373             /* D2: final */ switch (status)
374             {
375                 case status.Success:
376                     break;
377 
378                 case status.Timeout:
379                     this.timeout_reported = true;
380                     task.resume();
381                     break;
382 
383                 case status.Error:
384                     // error_() has been called before so ioWait() will simply
385                     // return the reported events which include EPOLLERR.
386                     task.resume();
387                     break;
388 
389                 default: assert(false);
390             }
391         }
392     }
393 
394     /***************************************************************************
395 
396         Returns the file descriptor of the I/O device as `int`, for logging.
397 
398         Returns:
399             the file descriptor of the I/O device.
400 
401     ***************************************************************************/
402 
403     private int fd ( )
404     {
405         auto handle = this.iodev.fileHandle;
406 
407         static if (!is(typeof(handle) == int))
408         {
409             static assert(is(handle.IsTypedef));
410             static assert(is(typeof(handle.value) == int));
411         }
412 
413         return cast(int)handle;
414     }
415 }