1 /******************************************************************************
2 
3     Fiber/coroutine based non-blocking I/O select client base class
4 
5     Base class for a non-blocking I/O select client using a fiber/coroutine to
6     suspend operation while waiting for the I/O event and resume on that event.
7 
8     Copyright:
9         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
10         All rights reserved.
11 
12     License:
13         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
14         Alternatively, this file may be distributed under the terms of the Tango
15         3-Clause BSD License (see LICENSE_BSD.txt for details).
16 
17  ******************************************************************************/
18 
19 module ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
20 
21 
22 import ocean.core.MessageFiber;
23 
24 import ocean.core.Verify;
25 
26 import ocean.io.select.client.model.IFiberSelectClient;
27 
28 import ocean.io.select.fiber.SelectFiber;
29 
30 import ocean.io.select.protocol.generic.ErrnoIOException: IOError, IOWarning;
31 
32 import ocean.meta.types.Qualifiers;
33 
34 debug ( SelectFiber ) import ocean.io.Stdout : Stderr;
35 
36 
37 /******************************************************************************/
38 
39 abstract class IFiberSelectProtocol : IFiberSelectClient
40 {
41     /***************************************************************************
42 
43         Token used when suspending / resuming fiber.
44 
45     ***************************************************************************/
46 
47     static private MessageFiber.Token IOReady;
48 
49     /***************************************************************************
50 
51         Static ctor. Initialises fiber token.
52 
53     ***************************************************************************/
54 
55     static this ( )
56     {
57         IOReady = MessageFiber.Token("io_ready");
58     }
59 
60     /**************************************************************************
61 
62         Local aliases
63 
64      **************************************************************************/
65 
66     protected alias .SelectFiber            SelectFiber;
67 
68     public alias .IOWarning IOWarning;
69     public alias .IOError   IOError;
70 
71     /**************************************************************************
72 
73         I/O device
74 
75      **************************************************************************/
76 
77     protected ISelectable conduit;
78 
79     /**************************************************************************
80 
81         Events to register the I/O device for.
82 
83      **************************************************************************/
84 
85     protected Event events_;
86 
87     /**************************************************************************
88 
89         IOWarning exception instance
90 
91      **************************************************************************/
92 
93     protected IOWarning warning_e;
94 
95     /**************************************************************************
96 
97         IOError exception instance
98 
99      **************************************************************************/
100 
101     protected IOError error_e;
102 
103     /**************************************************************************
104 
105         Events reported to handle()
106 
107      **************************************************************************/
108 
109     private Event events_reported;
110 
111     /**************************************************************************
112 
113         Constructor
114 
115         Params:
116             conduit   = I/O device
117             events    = the epoll events to register the device for
118             fiber     = fiber to use to suspend and resume operation
119 
120      **************************************************************************/
121 
122     protected this ( ISelectable conduit, Event events, SelectFiber fiber )
123     {
124         this(conduit, events, fiber, new IOWarning(conduit), new IOError(conduit));
125     }
126 
127     /**************************************************************************
128 
129         Constructor
130 
131         Note: If distinguishing between warnings and errors is not desired or
132               required, pass the same object for warning_e and error_e.
133 
134 
135         Params:
136             conduit   = I/O device
137             events    = the epoll events to register the device for
138             fiber     = fiber to use to suspend and resume operation
139             warning_e = Exception instance to throw for warnings
140             error_e   = Exception instance to throw on errors and to query
141                         device specific error codes if possible
142 
143      **************************************************************************/
144 
145     protected this ( ISelectable conduit, Event events, SelectFiber fiber,
146                      IOWarning warning_e, IOError error_e )
147     {
148         verify(conduit !is null);
149         verify(warning_e !is null);
150         verify(error_e !is null);
151 
152         super(fiber);
153         this.conduit   = conduit;
154         this.events_   = events;
155         this.warning_e = warning_e;
156         this.error_e   = error_e;
157     }
158 
159     /**************************************************************************
160 
161         Constructor
162 
163         Uses the conduit, fiber and exceptions from the other instance. This is
164         useful when instances of several subclasses share the same conduit and
165         fiber.
166 
167         Params:
168             other  = other instance of this class
169             events    = the epoll events to register the device for
170 
171      **************************************************************************/
172 
173     protected this ( typeof (this) other, Event events )
174     {
175         this(other.conduit, events, other.fiber, other.warning_e, other.error_e);
176     }
177 
178     /**************************************************************************
179 
180         Returns:
181             the I/O device file handle.
182 
183      **************************************************************************/
184 
185     public override Handle fileHandle ( )
186     {
187         return this.conduit.fileHandle();
188     }
189 
190     /**************************************************************************
191 
192         Returns:
193             the events to register the I/O device for.
194 
195      **************************************************************************/
196 
197     public override Event events ( )
198     {
199         return this.events_;
200     }
201 
202     /**************************************************************************
203 
204         Returns:
205             current socket error code, if available, or 0 otherwise.
206 
207      **************************************************************************/
208 
209     public override int error_code ( )
210     {
211         return this.error_e.error_code;
212     }
213 
214     /**************************************************************************
215 
216         Resumes the fiber coroutine and handle the events reported for the
217         conduit. The fiber must be suspended (HOLD state).
218 
219         Note that the fiber coroutine keeps going after this method has finished
220         if there is another instance of this class which shares the fiber with
221         this instance and is invoked in the coroutine after this instance has
222         done its job.
223 
224         Returns:
225             false if the fiber is finished or true if it keeps going
226 
227         Throws:
228             IOException on I/O error
229 
230      **************************************************************************/
231 
232     final override protected bool handle ( Event events )
233     {
234         verify(this.fiber.waiting);
235 
236         this.events_reported = events;
237 
238         debug ( SelectFiber ) Stderr.formatln("{}.handle: fd {} fiber resumed",
239                 typeof(this).stringof, this.conduit.fileHandle);
240         SelectFiber.Message message = this.fiber.resume(IOReady, this); // SmartUnion
241         debug ( SelectFiber ) Stderr.formatln("{}.handle: fd {} fiber yielded, message type = {}",
242                 typeof(this).stringof, this.conduit.fileHandle, message.active);
243 
244         return (message.active == message.active.num)? message.num != 0 : false;
245     }
246 
247     /**************************************************************************
248 
249         Registers this instance in the select dispatcher and repeatedly calls
250         transmit() until the transmission is finished.
251 
252         Throws:
253             IOException on I/O error, KillableFiber.KilledException if the
254             fiber was killed.
255 
256         In:
257             The fiber must be running.
258 
259      **************************************************************************/
260 
261     protected void transmitLoop ( )
262     {
263         verify(this.fiber.running);
264 
265         // The reported events are reset at this point to avoid using the events
266         // set by a previous run of this method.
267 
268         try for (bool more = this.transmit(this.events_reported = this.events_reported.init);
269                       more;
270                       more = this.transmit(this.events_reported))
271         {
272             super.fiber.register(this);
273 
274             // Calling suspend() triggers an epoll wait, which will in turn call
275             // handle_() (above) when an event fires for this client. handle_()
276             // sets this.events_reported to the event reported by epoll.
277             super.fiber.suspend(IOReady, this, fiber.Message(true));
278 
279             this.error_e.enforce(!(this.events_reported & Event.EPOLLERR), "I/O error");
280         }
281         catch (SelectFiber.KilledException e)
282         {
283             throw e;
284         }
285         catch (Exception e)
286         {
287             if (super.fiber.isRegistered(this))
288             {
289                 debug ( SelectFiber) Stderr.formatln("{}.transmitLoop: suspending fd {} fiber ({} @ {}:{})",
290                     typeof(this).stringof, this.conduit.fileHandle, e.message(), e.file, e.line);
291 
292                 // Exceptions thrown by transmit() or in the case of the Error
293                 // event are passed to the fiber resume() to be rethrown in
294                 // handle_(), above.
295                 super.fiber.suspend(IOReady, e);
296 
297                 debug ( SelectFiber) Stderr.formatln("{}.transmitLoop: resumed fd {} fiber, rethrowing ({} @ {}:{})",
298                     typeof(this).stringof, this.conduit.fileHandle, e.message(), e.file, e.line);
299             }
300 
301             throw e;
302         }
303     }
304 
305     /**************************************************************************
306 
307         Reads/writes data from/to super.conduit for which events have been
308         reported.
309 
310         Params:
311             events = events reported for super.conduit
312 
313         Returns:
314             true to be invoked again (after an epoll wait) or false if finished
315 
316      **************************************************************************/
317 
318     abstract protected bool transmit ( Event events );
319 }