1 /******************************************************************************
2 
3     Fiber/coroutine based non-blocking input select client base class
4 
5     Base class for a non-blocking input select client using a fiber/coroutine to
6     suspend operation while waiting for the read event and resume on that event.
7     Provides a stream-like interface with consumer delegate invocation to
8     receive and consume data from the input until the consumer indicates it has
9     finished.
10 
11     Copyright:
12         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
13         All rights reserved.
14 
15     License:
16         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
17         Alternatively, this file may be distributed under the terms of the Tango
18         3-Clause BSD License (see LICENSE_BSD.txt for details).
19 
20  ******************************************************************************/
21 
22 module ocean.io.select.protocol.fiber.FiberSelectReader;
23 
24 
25 import ocean.core.Verify;
26 
27 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
28 
29 import ocean.io.device.IODevice: IInputDevice;
30 
31 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
32 
33 debug (Raw) import ocean.io.Stdout: Stderr;
34 
35 
36 /******************************************************************************/
37 
38 class FiberSelectReader : IFiberSelectProtocol
39 {
40     import ocean.core.Enforce: enforce;
41 
42     /**************************************************************************
43 
44         Default input buffer size (16 kB).
45 
46      **************************************************************************/
47 
48     public static immutable size_t default_buffer_size = 16 * 1024;
49 
50     /**************************************************************************
51 
52         Consumer callback delegate type
53 
54         Params:
55             data = data to consume
56 
57         Returns:
58             - if finished, a value of [0, data.length] reflecting the number of
59               elements (bytes) consumed or
60             - a value greater than data.length if more data is required.
61 
62      **************************************************************************/
63 
64     alias size_t delegate ( void[] data ) Consumer;
65 
66     /**************************************************************************
67 
68         Input device
69 
70      **************************************************************************/
71 
72     public alias .IInputDevice IInputDevice;
73 
74     private IInputDevice input;
75 
76     /**************************************************************************
77 
78         Data buffer
79 
80      **************************************************************************/
81 
82     private void[] data;
83 
84     /**************************************************************************
85 
86         End index of available and consumed data.
87 
88         Available data is data received by receive() or read() and not yet
89         consumed by the consumer delegate passed to consume() or read().
90         Consumed data is data received by receive() or read() and already
91         consumed by the consumer delegate passed to consume() or read().
92 
93      **************************************************************************/
94 
95     private size_t available = 0,
96                    consumed  = 0;
97 
98     /**************************************************************************
99 
100         Invariant to assure consumed/available are in correct order and range
101 
102      **************************************************************************/
103 
104     invariant()
105     {
106         assert (available <= this.data.length);
107         assert (consumed  <= this.data.length);
108         assert (consumed  <= available);
109     }
110 
111     /**************************************************************************
112 
113         Constructor.
114 
115         error_e and warning_e may be the same object if distinguishing between
116         error and warning is not required.
117 
118         Params:
119             input       = input device
120             fiber       = input reading fiber
121             warning_e   = exception to throw on end-of-flow condition or if the
122                           remote hung up
123             error_e     = exception to throw on I/O error
124             buffer_size = input buffer size
125 
126         In:
127             buffer_size must not be 0.
128 
129      **************************************************************************/
130 
131     public this ( IInputDevice input, SelectFiber fiber,
132                   IOWarning warning_e, IOError error_e,
133                   size_t buffer_size = this.default_buffer_size )
134     {
135         verify(buffer_size > 0, "zero input buffer size specified");
136 
137         super(this.input = input, Event.EPOLLIN | Event.EPOLLRDHUP,
138               fiber, warning_e, error_e);
139 
140         this.data = new ubyte[buffer_size];
141     }
142 
143     /**************************************************************************
144 
145         Constructor
146 
147         Uses the conduit, fiber and exceptions from the other
148         IFiberSelectProtocol instance. This is useful when this instance shares
149         the conduit and fiber with another IFiberSelectProtocol instance, e.g.
150         a FiberSelectWriter.
151 
152         The conduit owned by the other instance must have been downcast from
153         IInputDevice.
154 
155         Params:
156             other       = other instance of this class
157             buffer_size = input buffer size
158 
159         In:
160             buffer_size must not be 0.
161 
162      **************************************************************************/
163 
164     public this ( typeof (super) other,
165                   size_t buffer_size = this.default_buffer_size )
166     {
167         verify(buffer_size > 0, "zero input buffer size specified");
168 
169         super(other, Event.EPOLLIN | Event.EPOLLRDHUP);
170 
171         this.data = new ubyte[buffer_size];
172 
173         this.input = cast (IInputDevice) this.conduit;
174 
175         verify(this.input !is null, typeof (this).stringof ~ ": the conduit of "
176                 ~ "the other " ~ typeof (super).stringof ~ " instance must be a "
177                 ~ IInputDevice.stringof);
178     }
179 
180     /**************************************************************************
181 
182         Resets the amount of consumed/available data to 0.
183 
184         Returns:
185             this instance
186 
187      **************************************************************************/
188 
189     public typeof (this) reset ( )
190     {
191         this.consumed  = 0;
192         this.available = 0;
193 
194         return this;
195     }
196 
197     /**************************************************************************
198 
199         Returns:
200             data in buffer available and consumed so far
201 
202      **************************************************************************/
203 
204     public void[] consumed_data ( )
205     {
206         return this.data[0 .. this.consumed];
207     }
208 
209     /**************************************************************************
210 
211         Returns:
212             data in buffer available but not consumed so far
213 
214      **************************************************************************/
215 
216     public void[] remaining_data ( )
217     {
218         return this.data[this.consumed .. this.available];
219     }
220 
221     /**************************************************************************
222 
223         Invokes consume with the data that are currently available and haven't
224         yet been consumed.
225         If the amount of data is sufficient, consume must return the number of
226         bytes it consumed. Otherwise if consume consumed all data and needs more
227         input data to be read from the I/O device, it must return a value
228         greater than the number of data bytes passed to it.
229 
230         Invokes consume to consume the available data until consume indicates to
231         be finished or all available data is consumed.
232 
233         Params:
234             consume = consumer callback delegate
235 
236         Returns:
237             - true if all available data in buffer has been consumed and consume
238               indicated that it requires more or
239             - false if consume indicated to be finished.
240 
241      **************************************************************************/
242 
243     public bool consume ( scope Consumer consume )
244     {
245         size_t n   = consume(this.data[this.consumed .. this.available]),
246                end = this.consumed + n;
247 
248         // n can be very big (size_t.max) so end may overflow, n will be greater
249         // than this.available in this case.
250 
251         if (end <= this.available && n <= this.available)
252         {
253             this.consumed = end;
254 
255             return false;
256         }
257         else
258         {
259             // All data consumed: reset and return false if end == available.
260 
261             this.reset();
262 
263             return end != this.available;
264         }
265     }
266 
267      /**************************************************************************
268 
269         Reads T.sizeof bytes from the socket and writes it to 'value'
270         Suspends if not enough data is available and resumes when
271         the data became available
272 
273         Params:
274             value = reference to a variable to be filled
275 
276         Returns:
277             this instance
278 
279         Throws:
280             IOException if no data were received and won't arrive later:
281                 - IOWarning on end-of-flow condition or if the remote hung up,
282                 - IOError (IOWarning subclass) on I/O error.
283 
284      **************************************************************************/
285 
286     public typeof (this) read ( T ) ( ref T value )
287     {
288         return this.readRaw((cast(ubyte*)&value)[0 .. value.sizeof]);
289     }
290 
291     /**************************************************************************
292 
293         Reads data.length bytes from the socket and writes them to the array.
294 
295         Will only return once enough data is available and the array could
296         be filled.
297 
298         Params:
299             data_out = pre-allocated array which will be filled
300 
301         Returns:
302             this instance
303 
304         Throws:
305             IOException if no data were received and won't arrive later:
306                 - IOWarning on end-of-flow condition or if the remote hung up,
307                 - IOError (IOWarning subclass) on I/O error.
308 
309      **************************************************************************/
310 
311     public typeof (this) readRaw ( ubyte[] data_out )
312     {
313         auto available_data = this.data[this.consumed .. this.available];
314 
315         if (available_data.length >= data_out.length)
316         {
317             // Enough data are already in the buffer: Copy them into data_out
318             // and return.
319             data_out[] = cast(ubyte[])available_data[0 .. data_out.length];
320             this.consumed += data_out.length;
321             if (this.consumed == this.available)
322                 this.reset();
323         }
324         else
325         {
326             // Not enough data in the buffer: First copy all buffered data to
327             // data_out.
328             data_out[0 .. available_data.length] = cast(ubyte[])available_data;
329             data_out = data_out[available_data.length .. $];
330             this.reset();
331             if (data_out.length > this.data.length)
332             {
333                 // Read straight into data_out, circumventing the internal
334                 // buffer, as long as the amount of data left to read is greater
335                 // than the buffer size.
336                 auto this_data = this.data;
337                 this.data = data_out;
338                 try
339                 {
340                     while ((this.data.length - this.available) > this_data.length)
341                         this.receive();
342                     data_out = cast(ubyte[])this.data[this.available .. $];
343                 }
344                 finally
345                 {
346                     this.data = this_data;
347                     this.reset();
348                 }
349             }
350 
351             // Read the remaining data.
352             while (this.available < data_out.length)
353                 this.receive();
354             data_out[] = cast(ubyte[])
355                 this.data[0 .. this.consumed = data_out.length];
356         }
357 
358         return this;
359     }
360 
361     /**************************************************************************
362 
363         Reads data from the input conduit and appends them to the data buffer,
364         waiting for data to be read from the input conduit if
365 
366         If no data is available from the input conduit, the input reading fiber
367         is suspended and continues reading on resume.
368 
369         Returns:
370             number of bytes read
371 
372         Throws:
373             IOException if no data were received and won't arrive later:
374                 - IOWarning on end-of-flow condition or if the remote hung up,
375                 - IOError (IOWarning subclass) on I/O error.
376 
377      **************************************************************************/
378 
379     public size_t receive ( )
380     {
381         if (this.available >= this.data.length)
382         {
383             this.reset();
384         }
385 
386         size_t available_before = this.available;
387 
388         this.transmitLoop();
389 
390         return this.available - available_before;
391     }
392 
393     /**************************************************************************
394 
395         Reads data from the input conduit, appends it to the data buffer and
396         invokes consume with the data that is currently available and hasn't
397         yet been consumed.
398         If consume feels that the amount of data passed to it is sufficient,
399         it must return the number of bytes it consumed. Otherwise if consume
400         consumed all the data and still needs more input data to be read from
401         the I/O device, it must return a value greater than length of the the
402         data passed to it. The fiber is then suspended to wait for more data
403         to be available from the input device, and consume is invoked again
404         with the newly available data.
405 
406         Params:
407             consume = consumer callback delegate
408 
409         Returns:
410             this instance
411 
412         Throws:
413             IOException if no data was received and none will arrive later:
414                 - IOWarning on end-of-flow condition or if the remote hung up,
415                 - IOError (IOWarning subclass) on an I/O error.
416 
417      **************************************************************************/
418 
419     public typeof (this) readConsume ( scope Consumer consume )
420     {
421         bool more;
422 
423         do
424         {
425             if (this.consumed >= this.available) // only this.consumed == this.available possible
426             {
427                 this.receive();
428             }
429 
430             more = this.consume(consume);
431         }
432         while (more);
433 
434         return this;
435     }
436 
437     /**************************************************************************
438 
439         Reads data from the input conduit and appends them to the data buffer.
440 
441         Params:
442             events = events reported for the input device
443 
444         Returns:
445             true if data were received or false to retry later.
446 
447         Throws:
448             IOException if no data were received and won't arrive later:
449                 - IOWarning on end-of-flow condition or if the remote hung up,
450                 - IOError (IOWarning subclass) on I/O error.
451 
452         Note: POSIX says the following about the return value of read():
453 
454             When attempting to read from an empty pipe or FIFO [remark: includes
455             sockets]:
456 
457             - If no process has the pipe open for writing, read() shall return 0
458               to indicate end-of-file.
459 
460             - If some process has the pipe open for writing and O_NONBLOCK is
461               set, read() shall return -1 and set errno to [EAGAIN].
462 
463             - If some process has the pipe open for writing and O_NONBLOCK is
464               clear, read() shall block the calling thread until some data is
465               written or the pipe is closed by all processes that had the pipe
466               open for writing.
467 
468         @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html
469 
470      **************************************************************************/
471 
472     protected override bool transmit ( Event events )
473     out (received)
474     {
475         assert (received <= data.length, "received length too high");
476     }
477     do
478     {
479         verify(this.available < this.data.length, "requested to receive nothing");
480 
481         .errno = 0;
482 
483         input.ssize_t n = this.input.read(this.data[this.available .. $]);
484 
485         if (n <= 0)
486         {
487              // EOF or error: Check for socket error and hung-up event first.
488 
489             this.error_e.checkDeviceError(n? "read error" : "end of flow whilst reading", __FILE__, __LINE__);
490 
491             enforce(this.warning_e, !(events & events.EPOLLRDHUP), "connection hung up on read");
492             enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up");
493 
494             if (n)
495             {
496                 // read() error and no socket error or hung-up event: Check
497                 // errno. Carry on if there are just currently no data available
498                 // (EAGAIN/EWOULDBLOCK/EINTR) or throw error otherwise.
499 
500                 switch (.errno)
501                 {
502                     default:
503                         this.error_e.enforce(false, "read error");
504                         assert (false);
505 
506                     case EINTR, EAGAIN:
507                         static if ( EAGAIN != EWOULDBLOCK )
508                         {
509                             case EWOULDBLOCK:
510                         }
511 
512                         // EAGAIN/EWOULDBLOCK: currently no data available.
513                         // EINTR: read() was interrupted by a signal before data
514                         //        became available.
515 
516                         n = 0;
517                 }
518             }
519             else
520             {
521                 // EOF and no socket error or hung-up event: Throw EOF warning.
522                 enforce(this.warning_e, false, "end of flow whilst reading");
523             }
524         }
525         else
526         {
527             debug (Raw) Stderr.formatln("[{}] Read  {:X2} ({} bytes)",
528                 super.conduit.fileHandle,
529                 this.data[this.available .. this.available + n], n);
530 
531             this.available += n;
532         }
533 
534         verify(n >= 0);
535 
536         return !n;
537     }
538 }
539 
540 /******************************************************************************/
541 
542 version (unittest)
543 {
544     import ocean.io.select.client.model.ISelectClient;
545     import ocean.io.select.fiber.SelectFiber;
546     import core.sys.posix.stdlib;
547     import ocean.core.Test;
548     import ocean.meta.types.Qualifiers;
549 }
550 
551 unittest
552 {
553     static class UnSelectFiber: SelectFiber
554     {
555         this ( ) { super(null, {assert(false);}); }
556         override:
557         Message start      (Message)                { assert(false); }
558         Message suspend    (Token, Object, Message) { assert(false); }
559         Message resume     (Token, Object, Message) { assert(false); }
560         void    kill       (istring, long)          { assert(false); }
561         bool    register   (ISelectClient)          { assert(false); }
562         bool    unregister ()                       { assert(false); }
563     }
564 
565     // All numeric constants related to buffer sizes and amounts of data below
566     // are tuned to test the internals of readRaw().
567 
568     static class Input: IInputDevice
569     {
570         /// Input data for `read()`.
571         void[] data;
572 
573         /// Copies `data[0 .. n]` into `dst[0 .. n]` where `n` is the minimum of
574         /// `data.length`, `dst.length` and 15, and advances
575         /// `data = data[n .. $]`. Returns `n`.
576         ssize_t read ( void[] dst )
577         {
578             if (dst.length > this.data.length)
579                 dst = dst[0 .. this.data.length];
580             if (dst.length > 15)
581                 dst = dst[0 .. 15];
582 
583             dst[] = this.data[0 .. dst.length];
584             this.data = this.data[dst.length .. $];
585             return dst.length;
586         }
587 
588         Handle fileHandle ( ) { return cast(Handle)4711; } /// Interface method
589     }
590 
591     static class TestFiberSelectReader: FiberSelectReader
592     {
593         this ( IInputDevice input, SelectFiber fiber )
594         {
595             auto e = new IOError(input);
596             super(input, fiber, e, e, 50);
597         }
598 
599         override protected void transmitLoop ( )
600         {
601             while (this.transmit(Event.init)) {}
602         }
603     }
604 
605     // The test starts here.
606 
607     scope input  = new Input,
608           fiber  = new UnSelectFiber,
609           reader = new TestFiberSelectReader(input, fiber);
610 
611     ubyte[100] input_data, output_data;
612 
613     ushort[3] xsubi;
614     xsubi[0] = 0x330E; // see jrand48() documentation
615     foreach (ref n; cast(uint[])input_data)
616         n = cast(uint)jrand48(xsubi);
617 
618     input.data = input_data;
619 
620     reader.readRaw(output_data[0 .. 10]);
621     test!("==")(input_data[0 .. 10], output_data[0 .. 10]);
622 
623     reader.readRaw(output_data[10 .. $ - 3]);
624     test!("==")(input_data[10 .. $ - 3], output_data[10 .. $ - 3]);
625 
626     reader.readRaw(output_data[$ - 3 .. $]);
627     test!("==")(input_data[$ - 3 .. $], output_data[$ - 3 .. $]);
628 }