1 /*******************************************************************************
2 
3     Performs non-blocking I/O, suspending the current task to wait for the I/O
4     device to be ready if it would have blocked.
5     Because the most common case is using a TCP socket, one TCP-specific
6     facility (TCP Cork) is built into `TaskSelectTransceiver`. The simplicity,
7     convenience (it avoids a custom implementation for output buffering) and
8     frequency of use justifies having it in `TaskSelectTransceiver` rather than
9     a separate class or module.
10 
11     Copyright:
12         Copyright (c) 2017 dunnhumby Germany GmbH.
13         All rights reserved.
14 
15     License:
16         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
17         Alternatively, this file may be distributed under the terms of the Tango
18         3-Clause BSD License (see LICENSE_BSD.txt for details).
19 
20 *******************************************************************************/
21 
22 module ocean.io.select.protocol.task.TaskSelectTransceiver;
23 
24 import ocean.core.Verify;
25 import ocean.io.device.IODevice;
26 import ocean.io.select.client.model.ISelectClient;
27 
28 /// ditto
29 
30 class TaskSelectTransceiver
31 {
32     import ocean.io.select.protocol.task.TaskSelectClient;
33     import ocean.io.select.protocol.task.internal.BufferedReader;
34 
35     import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
36     import ocean.stdc.posix.sys.uio: iovec, readv;
37     import ocean.stdc.posix.sys.socket: setsockopt;
38     import ocean.stdc.posix.netinet.in_: IPPROTO_TCP;
39 
40     static if (__VERSION__ >= 2000 && __VERSION__ < 2073)
41         enum { TCP_CORK = 3 }
42     else static if (__VERSION__ >= 2077)
43         import core.sys.linux.netinet.tcp: TCP_CORK;
44     else
45         import core.sys.linux.sys.netinet.tcp: TCP_CORK;
46 
47     import ocean.sys.Epoll: epoll_event_t;
48     alias epoll_event_t.Event Event;
49 
50     import ocean.io.select.protocol.generic.ErrnoIOException: IOWarning, IOError;
51 
52     debug (Raw) import ocean.io.Stdout: Stdout;
53 
54     import ocean.core.Enforce: enforce;
55     import ocean.transition;
56 
57     /***************************************************************************
58 
59         I/O device
60 
61     ***************************************************************************/
62 
63     package IODevice iodev;
64 
65     /***************************************************************************
66 
67         Task select client to wait for I/O events
68 
69     ***************************************************************************/
70 
71     package TaskSelectClient select_client;
72 
73     /***************************************************************************
74 
75         Read buffer manager
76 
77     ***************************************************************************/
78 
79     private BufferedReader buffered_reader;
80 
81     /***************************************************************************
82 
83         Possible values for the TCP Cork status of the I/O device. `Unknown`
84         means TCP Cork support for the I/O device has not been queried yet,
85         `Disabled` that the I/O device does not support TCP Cork.
86 
87     ***************************************************************************/
88 
89     private enum TcpCorkStatus: uint
90     {
91         Unknown = 0,
92         Disabled,
93         Enabled
94     }
95 
96     /***************************************************************************
97 
98         The TCP Cork status of the I/O device. TCP Cork is a Linux
99         feature to buffer outgoing data for TCP sockets to minimise the number
100         of TCP frames sent.
101 
102     ***************************************************************************/
103 
104     private TcpCorkStatus tcp_cork_status;
105 
106     /***************************************************************************
107 
108         Thrown on EOF and remote hangup event
109 
110     ***************************************************************************/
111 
112     private IOWarning warning_e;
113 
114     /***************************************************************************
115 
116         Thrown on socket I/O error
117 
118     ***************************************************************************/
119 
120     private IOError error_e;
121 
122     /***************************************************************************
123 
124         Constructor.
125 
126         error_e and warning_e may be the same object if distinguishing between
127         error and warning is not required.
128 
129         Params:
130             iodev            = I/O device
131             warning_e        = exception to throw on end-of-flow condition or if
132                                the remote hung up
133             error_e          = exception to throw on I/O error
134             read_buffer_size = read buffer size
135 
136 
137     ***************************************************************************/
138 
139     public this ( IODevice iodev, IOWarning warning_e, IOError error_e,
140                   size_t read_buffer_size = BufferedReader.default_read_buffer_size )
141     {
142         this.iodev = iodev;
143         this.warning_e = warning_e;
144         this.error_e = error_e;
145         this.select_client = new TaskSelectClient(iodev, &error_e.error_code);
146         this.buffered_reader = new BufferedReader(read_buffer_size);
147     }
148 
149     /***************************************************************************
150 
151         Populates `data` with data read from the I/O device.
152 
153         Params:
154             data = destination data buffer
155 
156         Throws:
157             IOException if no data were received nor will any arrive later:
158                 - IOWarning on end-of-flow condition or if the remote hung up,
159                 - IOError (IOWarning subclass) on I/O error.
160 
161     ***************************************************************************/
162 
163     public void read ( void[] data )
164     {
165         if (data.length)
166             this.buffered_reader.readRaw(data, &this.deviceRead);
167     }
168 
169     /***************************************************************************
170 
171         Populates `value` with `value.sizeof` bytes read from the I/O device.
172 
173         Params:
174             value = reference to a variable to be populated
175 
176         Throws:
177             IOException if no data were received nor will any arrive later:
178                 - IOWarning on end-of-flow condition or if the remote hung up,
179                 - IOError (IOWarning subclass) on I/O error.
180 
181     ***************************************************************************/
182 
183     public void readValue ( T ) ( out T value )
184     {
185         static if (T.sizeof)
186             this.buffered_reader.readRaw((cast(void*)&value)[0 .. value.sizeof],
187                 &this.deviceRead);
188     }
189 
190     /***************************************************************************
191 
192         Calls `consume` with data read from the I/O device.
193 
194         If `consume` feels that the amount of `data` passed to it is sufficient
195         it should return the number of bytes it consumed, which is a value
196         between 0 and `data.length` (inclusive). Otherwise, if `consume`
197         consumed all `data` and still needs more data from the I/O device, it
198         should return a value greater than `data.`length`; it will then called
199         again after more data have been received.
200 
201         Params:
202             consume = consumer callback delegate
203 
204         Throws:
205             IOException if no data were received nor will any arrive later:
206                 - IOWarning on end-of-flow condition or if the remote hung up,
207                 - IOError (IOWarning subclass) on I/O error.
208 
209     ***************************************************************************/
210 
211     public void readConsume ( scope size_t delegate ( void[] data ) consume )
212     {
213         this.buffered_reader.readConsume(consume, &this.deviceRead);
214     }
215 
216     /***************************************************************************
217 
218         Writes the byte data of `value` to the I/O device.
219         If the I/O device is a TCP socket then the data may be buffered for at
220         most 200ms using the TCP Cork feature of Linux. In this case call
221         `flush()` to write all pending data immediately.
222 
223         Params:
224             value = the value of which the byte data to write
225 
226         Throws:
227             IOException if no data were sent nor will it be possible later:
228                 - IOWarning if the remote hung up,
229                 - IOError (IOWarning subclass) on I/O error.
230 
231     ***************************************************************************/
232 
233     public void writeValue ( T ) ( in T value )
234     {
235         static if (T.sizeof)
236             this.write((cast(Const!(void*))&value)[0 .. value.sizeof]);
237     }
238 
239     /***************************************************************************
240 
241         Writes `data` to the I/O device.
242         If the I/O device is a TCP socket then `data` may be buffered for at
243         most 200ms using the TCP Cork feature of Linux. In this case call
244         `flush()` to write all pending data immediately.
245 
246         Params:
247             data = data to write
248 
249         Throws:
250             IOException if no data were sent nor will it be possible later:
251                 - IOWarning if the remote hung up,
252                 - IOError (IOWarning subclass) on I/O error.
253 
254     ***************************************************************************/
255 
256     public void write ( Const!(void)[] data )
257     {
258         while (data.length)
259             data = data[this.deviceWrite(data) .. $];
260     }
261 
262     /***************************************************************************
263 
264         Sends all pending output data immediately. Calling this method has an
265         effect only if the I/O device is a TCP socket.
266 
267         Throws:
268             IOError on I/O error.
269 
270     ***************************************************************************/
271 
272     public void flush ( )
273     {
274         if (this.tcp_cork_status == tcp_cork_status.Enabled)
275         {
276             this.setTcpCork(false);
277             this.setTcpCork(true);
278         }
279     }
280 
281     /***************************************************************************
282 
283         Removes any remaining data from I/O buffers, sends any pending output
284         data immediately if possible, and removes the epoll registration of the
285         I/O device, if any.
286 
287         You need to call this method if you close and then reopen or otherwise
288         reassign the I/O device's file descriptor *without* suspending and
289         resuming or terminating and restarting the task in between. You may call
290         this method at any time between the last time you read from the old and
291         the first time you read from the new device.
292 
293         This method does not throw. It is safe to call it if the I/O device is
294         not (yet or any more) usable.
295 
296     ***************************************************************************/
297 
298     public void reset ( )
299     {
300         this.buffered_reader.reset();
301         this.select_client.unregister();
302 
303         if (this.tcp_cork_status == tcp_cork_status.Enabled)
304             this.setTcpCork(false, false);
305 
306         this.tcp_cork_status = tcp_cork_status.Unknown;
307     }
308 
309     /***************************************************************************
310 
311         Calls `io_op` until it returns a positive value. Waits for `wait_event`
312         if `io_op` fails with `EAGAIN` or `EWOULDBLOCK`.
313 
314         `io_op` should behave like POSIX `read/write` and return
315           - the non-zero number of bytes read or written on success or
316           - 0 on end-of-flow condition or
317           - a negative value on error and set `errno` accordingly.
318 
319         Params:
320             io_op      = I/O operation
321             wait_event = the event to wait for if `io_op` fails with
322                          `EAGAIN/EWOULDBLOCK`
323             opname     = the name of the I/O operation for error messages
324 
325         Returns:
326             the number of bytes read or written by `io_op`.
327 
328         Throws:
329             IOException if no data were transmitted and won't be later:
330                 - IOWarning on end-of-flow condition or if a hung-up event was
331                   reported for the I/O device,
332                 - IOError (IOWarning subclass) if `io_op` failed with an error
333                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
334                   event was reported for the I/O device.
335 
336         Note: POSIX says the following about the return value of `read`:
337 
338             When attempting to read from an empty pipe or FIFO [remark: includes
339             sockets]:
340 
341             - If no process has the pipe open for writing, read() shall return 0
342               to indicate end-of-file.
343 
344             - If some process has the pipe open for writing and O_NONBLOCK is
345               set, read() shall return -1 and set errno to [EAGAIN].
346 
347             - If some process has the pipe open for writing and O_NONBLOCK is
348               clear, read() shall block the calling thread until some data is
349               written or the pipe is closed by all processes that had the pipe
350               open for writing.
351 
352         @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html
353 
354     ***************************************************************************/
355 
356     private size_t transfer ( lazy iodev.ssize_t io_op, Event wait_event, istring opname )
357     out (n)
358     {
359         assert(n > 0);
360     }
361     body
362     {
363         // Prevent misinformation if an error happens that is not detected by
364         // io_op, such as a socket error reported by getsockopt(SO_ERROR) or an
365         // epoll event like EPOLLHUP or EPOLLERR.
366         errno = 0;
367         iodev.ssize_t n;
368 
369         // First call io_op. If io_op fails with EAGAIN/EWOULDBLOCK, enter a
370         // loop, waiting for EPOLLIN and calliing io_op again, until
371         //   - io_op succeeds (i.e. returns a positive value) or
372         //   - io_op reports EOF (i.e. returns 0; only read() does that) or
373         //   - io_op fails (i.e. returns a negative value) with errno other
374         //     than EAGAIN/EWOULDBLOCK (or EINTR, see below) or
375         //   - epoll reports EPOLLHUP or EPOLLERR.
376         for (n = io_op; n <= 0; n = io_op)
377         {
378             enforce(this.warning_e, n, "end of flow whilst reading");
379             switch (errno)
380             {
381                 case EAGAIN:
382                     static if (EAGAIN != EWOULDBLOCK)
383                     {
384                         case EWOULDBLOCK:
385                     }
386                     this.ioWait(wait_event);
387                     break;
388 
389                 default:
390                     this.error_e.checkDeviceError("I/O error");
391                     throw this.error_e.useGlobalErrno(opname);
392 
393                 case EINTR:
394                     // io_op was interrupted by a signal before data were read
395                     // or written. May not be possible with non-blocking I/O,
396                     // but neither POSIX nor Linux documentation clarifies that,
397                     // so handle it by calling io_op again to be safe.
398             }
399         }
400 
401         return n;
402     }
403 
404     /***************************************************************************
405 
406         Suspends the current task until epoll reports any of the events in
407         `wait_event` for the I/O device or the I/O device times out.
408 
409         Params:
410             events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR`
411                               are always implicitly added)
412 
413         Returns:
414             the events reported by epoll.
415 
416         Throws:
417             - `IOWarning` on `EPOLLHUP`,
418             - `IOError` on `EPOLLERR`,
419             - `EpollException` if registering with epoll failed,
420             - `TimeoutException` on timeout waiting for I/O events.
421 
422     ***************************************************************************/
423 
424     private Event ioWait ( Event wait_event )
425     {
426         Event events = this.select_client.ioWait(wait_event);
427         enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up");
428 
429         if (events & events.EPOLLERR)
430         {
431             this.error_e.checkDeviceError("epoll reported I/O device error");
432             enforce(this.error_e, false, "epoll reported I/O device error");
433             assert(false);
434         }
435         else
436             return events;
437     }
438 
439     /***************************************************************************
440 
441         Reads as much data from the I/O device as can be read with one
442         successful `read` call but at most `dst.length` bytes, and stores the
443         data in `dst`.
444 
445         Params:
446             dst = destination buffer
447 
448         Returns:
449             the number `n` of bytes read, which are stored in `dst[0 .. n]`.
450 
451         Throws:
452             IOException if no data were received and won't arrive later:
453                 - IOWarning on end-of-flow condition or if a hung-up event was
454                   reported for the I/O device,
455                 - IOError (IOWarning subclass) if `read` failed with an error
456                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
457                   event was reported for the I/O device.
458 
459     ***************************************************************************/
460 
461     private size_t deviceRead ( void[] dst )
462     out (n)
463     {
464         debug (Raw) Stdout.formatln(
465             "[{}] Read  {:X2} ({} bytes)", this.iodev.fileHandle, dst[0 .. n], n
466         );
467     }
468     body
469     {
470         return this.transfer(this.iodev.read(dst), Event.EPOLLIN, "read");
471     }
472 
473     /***************************************************************************
474 
475         Reads as much data from the I/O device as can be read with one
476         successful `readv` call but at most `dst_a.length + dst_b.length` bytes,
477         and stores the data in `dst_a` and `dst_b`.
478 
479         Params:
480             dst_a = first destination buffer
481             dst_b = second destination buffer
482 
483         Returns:
484             the number `n` of bytes read, which are stored in
485             - `dst_a[0 .. n]` if `n <= dst_a.length` or
486             - `dst_a` and `dst_b[0 .. n - dst_a.length]` if `n > dst_a.length`.
487 
488         Throws:
489             IOException if no data were received and won't arrive later:
490                 - IOWarning on end-of-flow condition or if a hung-up event was
491                   reported for the I/O device,
492                 - IOError (IOWarning subclass) if `readv` failed with an error
493                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
494                   event was reported for the I/O device.
495 
496     ***************************************************************************/
497 
498     private size_t deviceRead ( void[] dst_a, void[] dst_b )
499     out (n)
500     {
501         debug (Raw)
502         {
503             if (n > dst_a.length)
504                 Stdout.formatln("[{}] Read  {:X2}{:X2} ({} bytes)",
505                     this.iodev.fileHandle, dst_a, dst_b[0 .. n - dst_a.length], n);
506             else
507                 Stdout.formatln("[{}] Read  {:X2} ({} bytes)",
508                     this.iodev.fileHandle, dst_a[0 .. n], n);
509         }
510     }
511     body
512     {
513         // Work around a linker error caused by a druntime packagin bug: The
514         // druntime is by mistake currently not linked with the
515         // core.sys.posix.sys.uio module.
516         static dst_init = iovec.init;
517         iovec[2] dst = dst_init;
518 
519         dst[0] = iovec(dst_a.ptr, dst_a.length);
520         dst[1] = iovec(dst_b.ptr, dst_b.length);
521         int fd = this.iodev.fileHandle;
522         return this.transfer(
523             readv(fd, dst.ptr, cast(int)dst.length), Event.EPOLLIN, "readv"
524         );
525     }
526 
527     /***************************************************************************
528 
529         Writes as much data from the I/O device as can be read with one
530         successful `write` call but at most `src.length` bytes, taking the data
531         from `src`.
532 
533         Params:
534             src = source buffer
535 
536         Returns:
537             the number `n` of bytes written, which were taken from
538             `src[0 .. n]`.
539 
540         Throws:
541             IOException if no data were written and won't be later:
542                 - IOWarning if a hung-up event was reported for the I/O device,
543                 - IOError (IOWarning subclass) if `write` failed with an error
544                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
545                   event was reported for the I/O device.
546 
547     ***************************************************************************/
548 
549     private size_t deviceWrite ( Const!(void)[] src )
550     {
551         debug (Raw) Stdout.formatln(
552             "[{}] Write  {:X2} ({} bytes)", this.iodev.fileHandle,
553             src, src.length
554         );
555 
556         if (!this.tcp_cork_status)
557         {
558             // Try enabling TCP Cork. If it fails then TCP Cork is not supported
559             // for the I/O device.
560             this.tcp_cork_status =
561                 this.setTcpCork(true, false)
562                 ? tcp_cork_status.Enabled
563                 : tcp_cork_status.Disabled;
564         }
565 
566         return this.transfer(this.iodev.write(src), Event.EPOLLOUT, "write");
567     }
568 
569     /***************************************************************************
570 
571         Sets the TCP_CORK option. Disabling (`enable` = 0) sends all pending
572         data.
573 
574         Params:
575             enable = 0 disables TCP_CORK and flushes if previously enabled, a
576                 different value enables TCP_CORK.
577             throw_on_error = throw on error rather than returning `false`
578 
579         Returns:
580             `true` on success or `false` on error if `throw_on_error` is
581             `false`.
582 
583         Throws:
584             `IOError` if the `TCP_CORK` option cannot be set for `socket` and
585             `throw_on_error` is `true`. In practice this can fail only for one
586             of the following reasons:
587              - `socket.fileHandle` does not contain a valid file descriptor
588                (`errno == EBADF`). This is the case if the socket was not
589                created by `socket()` or `accept()` yet.
590              - `socket.fileHandle` does not refer to a socket
591                (`errno == ENOTSOCK`).
592 
593     ***************************************************************************/
594 
595     private bool setTcpCork ( int enable, bool throw_on_error = true )
596     {
597         if (!setsockopt(this.iodev.fileHandle, IPPROTO_TCP, TCP_CORK,
598             &enable, enable.sizeof))
599         {
600             return true;
601         }
602         else if (throw_on_error)
603         {
604             this.error_e.checkDeviceError("setsockopt(TCP_CORK)");
605             throw this.error_e.useGlobalErrno("setsockopt(TCP_CORK)");
606         }
607         else
608         {
609             return false;
610         }
611     }
612 }
613 
614 import core.stdc.errno;
615 import ocean.core.Enforce;
616 import ocean.text.util.ClassName;
617 
618 /*******************************************************************************
619 
620     Utility function to `connect` a socket that is managed by a
621     `TaskSelectTransceiver`.
622 
623     Calls `socket_connect` once. If it returns `false`, evaluates `errno` and
624     waits for establishing the connection to complete if needed, suspending the
625     task.
626 
627     When calling `socket_connect` the I/O device which was passed to the
628     constructor of `tst` is passed to it via the `socket` parameter.
629     `socket_connect` should call the POSIX `connect` function, passing
630     `socket.fileHandle`, and return `true` on success or `false` on failure,
631     corresponding to `connect` returning 0 or -1, respectively.
632 
633     If `socket_connect` returns `true` then this method does nothing but
634     returning 0. Otherwise, if `socket_connect` returns `false` then it does one
635     of the following actions depending on `errno`:
636      - `EINPROGRESS`, `EALREADY`, `EINTR`: Wait for establishing the connection
637        to complete, then return `errno`.
638      - `EISCONN`, 0: Return `errno` (and do nothing else).
639      - All other codes: Throw `IOError`.
640 
641     The `Socket` type must to be chosen so that the I/O device passed to the
642     constructor can be cast to it.
643 
644     Params:
645         tst            = the `TaskSelectTransceiver` instance that manages the
646                          socket to connect
647         socket_connect = calls POSIX `connect`
648 
649     Returns:
650         - 0 if `socket_connect` returned `true`,
651         - or the initial `errno` code otherwise, if the socket is now
652           connected.
653 
654     Throws:
655         `IOError` if `socket_connect` returned `false` and `errno` indicated
656          that the socket connection cannot be established.
657 
658 *******************************************************************************/
659 
660 public int connect ( Socket: IODevice ) ( TaskSelectTransceiver tst,
661     scope bool delegate ( Socket socket ) socket_connect )
662 {
663     auto socket = cast(Socket)tst.iodev;
664     verify(socket !is null, "connect: Unable to cast the I/O " ~
665         "device from " ~ classname(tst.iodev) ~ " to " ~ Socket.stringof);
666     return connect_(tst, socket_connect(socket));
667 }
668 
669 /*******************************************************************************
670 
671     Implements the logic described for `connect`.
672 
673     Params:
674         tst            = the `TaskSelectTransceiver` instance that manages the
675                          socket to connect
676         socket_connect = calls POSIX `connect`
677 
678     Returns:
679         See `connect`.
680 
681     Throws:
682         See `connect`.
683 
684 ******************************************************************************/
685 
686 private int connect_ ( TaskSelectTransceiver tst, lazy bool socket_connect )
687 {
688     errno = 0;
689     if (socket_connect)
690         return 0;
691 
692     auto errnum = errno;
693 
694     switch (errnum)
695     {
696         case EINPROGRESS,
697              EALREADY,
698              EINTR: // TODO: Might never be reported, see note below.
699             tst.ioWait(tst.Event.EPOLLOUT);
700             goto case;
701 
702         case EISCONN, 0:
703             return errnum;
704 
705         default:
706             tst.error_e.checkDeviceError("error establishing connection");
707             enforce(tst.error_e, false, "error establishing connection");
708             assert(false);
709     }
710 
711     /* The POSIX specification says about connect() failing with EINTR:
712 
713         "If connect() is interrupted by a signal that is caught while blocked
714         waiting to establish a connection, connect() shall fail and set errno to
715         EINTR, but the connection request shall not be aborted, and the
716         connection shall be established asynchronously."
717 
718     It remains unclear whether a nonblocking connect() can also fail with EINTR
719     or not. Assuming that, if it is possible, it has the same meaning as for
720     blocking connect(), we handle EINTR in the same way as EINPROGRESS.
721     TODO: Remove handling of EINTR or this note when this is clarified. */
722 }
723 
724 version (UnitTest)
725 {
726     import ocean.io.select.protocol.generic.ErrnoIOException;
727     import ocean.task.Task;
728     import ocean.transition;
729 }
730 
731 /// Example of sending and receiving data with the `TaskSelectTransceiver`.
732 unittest
733 {
734     static class IOTaskDemo: Task
735     {
736         TaskSelectTransceiver tst;
737 
738         this ( )
739         {
740             // I/O device to read/write via. In the real world, this would
741             // typically be your socket.
742             IODevice iodev;
743             this.tst = new TaskSelectTransceiver(
744                 iodev, new IOWarning(iodev), new IOError(iodev)
745             );
746         }
747 
748         override void run ( )
749         {
750             // Send a newline-terminated string.
751             this.tst.write("Hello World!\n");
752             // Send an integer value.
753             this.tst.writeValue(3);
754             this.tst.flush();
755 
756             // Receive a newline-terminated string.
757             {
758                 char[] response;
759                 this.tst.readConsume(
760                     (void[] data)
761                     {
762                         auto str = cast(char[])data;
763                         foreach (i, c; str)
764                         {
765                             if (c == '\n')
766                             {
767                                 response ~= str[0 .. i];
768                                 return i;
769                             }
770                         }
771 
772                         response ~= str;
773                         return str.length + 1;
774                     }
775                 );
776             }
777 
778             // Receive an integer value.
779             {
780                 int x;
781                 this.tst.readValue(x);
782             }
783         }
784     }
785 }
786 
787 version (UnitTest)
788 {
789     import ocean.sys.socket.IPSocket;
790 }
791 
792 /// Example of connecting a TCP/IP socket.
793 unittest
794 {
795     static class ConnectIOTaskDemo: Task
796     {
797         TaskSelectTransceiver tst;
798 
799         this ( )
800         {
801             // Create a TCP/IP socket, throwing SocketError on failure.
802             auto socket = new IPSocket!();
803             auto e = new SocketError(socket);
804             // The `true` parameter makes the socket non-blocking.
805             e.enforce(socket.tcpSocket(true) >= 0, "", "socket");
806 
807             this.tst = new TaskSelectTransceiver(
808                 socket, new IOWarning(socket), e
809             );
810         }
811 
812         override void run ( )
813         {
814             connect(this.tst,
815                 (IPSocket!() socket)
816                 {
817                     // Call `connect` to initiate establishing the connection.
818                     // Return `true` if `connect` returns 0 (i.e. succeeded) or
819                     // false otherwise.
820                     // `connect` will likely return -1 (i.e. fail) and set
821                     // `errno = EINPROGRESS`. `this.tst.connect` will then block
822                     // this task until establishing the socket connection has
823                     // completed.
824                     IPSocket!().InetAddress address;
825                     return !socket.connect(address("127.0.0.1", 4711));
826                 }
827             );
828             // Now the socket is connected, and `this.tst` is ready for reading
829             // and writing, as shown in the documented unit test for sending/
830             // receiving above.
831         }
832     }
833 }