1 /*******************************************************************************
2 
3     Performs non-blocking I/O, suspending the current task to wait for the I/O
4     device to be ready if it would have blocked.
5     Because the most common case is using a TCP socket, one TCP-specific
6     facility (TCP Cork) is built into `TaskSelectTransceiver`. The simplicity,
7     convenience (it avoids a custom implementation for output buffering) and
8     frequency of use justifies having it in `TaskSelectTransceiver` rather than
9     a separate class or module.
10 
11     Copyright:
12         Copyright (c) 2017 dunnhumby Germany GmbH.
13         All rights reserved.
14 
15     License:
16         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
17         Alternatively, this file may be distributed under the terms of the Tango
18         3-Clause BSD License (see LICENSE_BSD.txt for details).
19 
20 *******************************************************************************/
21 
22 module ocean.io.select.protocol.task.TaskSelectTransceiver;
23 
24 import ocean.core.Verify;
25 import ocean.io.device.IODevice;
26 import ocean.io.select.client.model.ISelectClient;
27 
28 /// ditto
29 
30 class TaskSelectTransceiver
31 {
32     import ocean.io.select.protocol.task.TaskSelectClient;
33     import ocean.io.select.protocol.task.internal.BufferedReader;
34 
35     import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
36     import core.sys.posix.sys.uio: iovec, readv;
37     import core.sys.posix.sys.socket: setsockopt;
38     import core.sys.posix.netinet.in_: IPPROTO_TCP;
39     import core.sys.linux.netinet.tcp: TCP_CORK;
40 
41     import ocean.sys.Epoll: epoll_event_t;
42     alias epoll_event_t.Event Event;
43 
44     import ocean.io.select.protocol.generic.ErrnoIOException: IOWarning, IOError;
45 
46     debug (Raw) import ocean.io.Stdout: Stdout;
47 
48     import ocean.core.Enforce: enforce;
49     import ocean.meta.types.Qualifiers;
50 
51     /***************************************************************************
52 
53         I/O device
54 
55     ***************************************************************************/
56 
57     package IODevice iodev;
58 
59     /***************************************************************************
60 
61         Task select client to wait for I/O events
62 
63     ***************************************************************************/
64 
65     package TaskSelectClient select_client;
66 
67     /***************************************************************************
68 
69         Read buffer manager
70 
71     ***************************************************************************/
72 
73     private BufferedReader buffered_reader;
74 
75     /***************************************************************************
76 
77         Possible values for the TCP Cork status of the I/O device. `Unknown`
78         means TCP Cork support for the I/O device has not been queried yet,
79         `Disabled` that the I/O device does not support TCP Cork.
80 
81     ***************************************************************************/
82 
83     private enum TcpCorkStatus: uint
84     {
85         Unknown = 0,
86         Disabled,
87         Enabled
88     }
89 
90     /***************************************************************************
91 
92         The TCP Cork status of the I/O device. TCP Cork is a Linux
93         feature to buffer outgoing data for TCP sockets to minimise the number
94         of TCP frames sent.
95 
96     ***************************************************************************/
97 
98     private TcpCorkStatus tcp_cork_status;
99 
100     /***************************************************************************
101 
102         Thrown on EOF and remote hangup event
103 
104     ***************************************************************************/
105 
106     private IOWarning warning_e;
107 
108     /***************************************************************************
109 
110         Thrown on socket I/O error
111 
112     ***************************************************************************/
113 
114     private IOError error_e;
115 
116     /***************************************************************************
117 
118         Constructor.
119 
120         error_e and warning_e may be the same object if distinguishing between
121         error and warning is not required.
122 
123         Params:
124             iodev            = I/O device
125             warning_e        = exception to throw on end-of-flow condition or if
126                                the remote hung up
127             error_e          = exception to throw on I/O error
128             read_buffer_size = read buffer size
129 
130 
131     ***************************************************************************/
132 
133     public this ( IODevice iodev, IOWarning warning_e, IOError error_e,
134                   size_t read_buffer_size = BufferedReader.default_read_buffer_size )
135     {
136         this.iodev = iodev;
137         this.warning_e = warning_e;
138         this.error_e = error_e;
139         this.select_client = new TaskSelectClient(iodev, &error_e.error_code);
140         this.buffered_reader = new BufferedReader(read_buffer_size);
141     }
142 
143     /***************************************************************************
144 
145         Populates `data` with data read from the I/O device.
146 
147         Params:
148             data = destination data buffer
149 
150         Throws:
151             IOException if no data were received nor will any arrive later:
152                 - IOWarning on end-of-flow condition or if the remote hung up,
153                 - IOError (IOWarning subclass) on I/O error.
154 
155     ***************************************************************************/
156 
157     public void read ( void[] data )
158     {
159         if (data.length)
160             this.buffered_reader.readRaw(data, &this.deviceRead);
161     }
162 
163     /***************************************************************************
164 
165         Populates `value` with `value.sizeof` bytes read from the I/O device.
166 
167         Params:
168             value = reference to a variable to be populated
169 
170         Throws:
171             IOException if no data were received nor will any arrive later:
172                 - IOWarning on end-of-flow condition or if the remote hung up,
173                 - IOError (IOWarning subclass) on I/O error.
174 
175     ***************************************************************************/
176 
177     public void readValue ( T ) ( out T value )
178     {
179         static if (T.sizeof)
180             this.buffered_reader.readRaw((cast(void*)&value)[0 .. value.sizeof],
181                 &this.deviceRead);
182     }
183 
184     /***************************************************************************
185 
186         Calls `consume` with data read from the I/O device.
187 
188         If `consume` feels that the amount of `data` passed to it is sufficient
189         it should return the number of bytes it consumed, which is a value
190         between 0 and `data.length` (inclusive). Otherwise, if `consume`
191         consumed all `data` and still needs more data from the I/O device, it
192         should return a value greater than `data.`length`; it will then called
193         again after more data have been received.
194 
195         Params:
196             consume = consumer callback delegate
197 
198         Throws:
199             IOException if no data were received nor will any arrive later:
200                 - IOWarning on end-of-flow condition or if the remote hung up,
201                 - IOError (IOWarning subclass) on I/O error.
202 
203     ***************************************************************************/
204 
205     public void readConsume ( scope size_t delegate ( void[] data ) consume )
206     {
207         this.buffered_reader.readConsume(consume, &this.deviceRead);
208     }
209 
210     /***************************************************************************
211 
212         Writes the byte data of `value` to the I/O device.
213         If the I/O device is a TCP socket then the data may be buffered for at
214         most 200ms using the TCP Cork feature of Linux. In this case call
215         `flush()` to write all pending data immediately.
216 
217         Params:
218             value = the value of which the byte data to write
219 
220         Throws:
221             IOException if no data were sent nor will it be possible later:
222                 - IOWarning if the remote hung up,
223                 - IOError (IOWarning subclass) on I/O error.
224 
225     ***************************************************************************/
226 
227     public void writeValue ( T ) ( in T value )
228     {
229         static if (T.sizeof)
230             this.write((cast(const(void*))&value)[0 .. value.sizeof]);
231     }
232 
233     /***************************************************************************
234 
235         Writes `data` to the I/O device.
236         If the I/O device is a TCP socket then `data` may be buffered for at
237         most 200ms using the TCP Cork feature of Linux. In this case call
238         `flush()` to write all pending data immediately.
239 
240         Params:
241             data = data to write
242 
243         Throws:
244             IOException if no data were sent nor will it be possible later:
245                 - IOWarning if the remote hung up,
246                 - IOError (IOWarning subclass) on I/O error.
247 
248     ***************************************************************************/
249 
250     public void write ( const(void)[] data )
251     {
252         while (data.length)
253             data = data[this.deviceWrite(data) .. $];
254     }
255 
256     /***************************************************************************
257 
258         Sends all pending output data immediately. Calling this method has an
259         effect only if the I/O device is a TCP socket.
260 
261         Throws:
262             IOError on I/O error.
263 
264     ***************************************************************************/
265 
266     public void flush ( )
267     {
268         if (this.tcp_cork_status == tcp_cork_status.Enabled)
269         {
270             this.setTcpCork(false);
271             this.setTcpCork(true);
272         }
273     }
274 
275     /***************************************************************************
276 
277         Removes any remaining data from I/O buffers, sends any pending output
278         data immediately if possible, and removes the epoll registration of the
279         I/O device, if any.
280 
281         You need to call this method if you close and then reopen or otherwise
282         reassign the I/O device's file descriptor *without* suspending and
283         resuming or terminating and restarting the task in between. You may call
284         this method at any time between the last time you read from the old and
285         the first time you read from the new device.
286 
287         This method does not throw. It is safe to call it if the I/O device is
288         not (yet or any more) usable.
289 
290     ***************************************************************************/
291 
292     public void reset ( )
293     {
294         this.buffered_reader.reset();
295         this.select_client.unregister();
296 
297         if (this.tcp_cork_status == tcp_cork_status.Enabled)
298             this.setTcpCork(false, false);
299 
300         this.tcp_cork_status = tcp_cork_status.Unknown;
301     }
302 
303     /***************************************************************************
304 
305         Calls `io_op` until it returns a positive value. Waits for `wait_event`
306         if `io_op` fails with `EAGAIN` or `EWOULDBLOCK`.
307 
308         `io_op` should behave like POSIX `read/write` and return
309           - the non-zero number of bytes read or written on success or
310           - 0 on end-of-flow condition or
311           - a negative value on error and set `errno` accordingly.
312 
313         Params:
314             io_op      = I/O operation
315             wait_event = the event to wait for if `io_op` fails with
316                          `EAGAIN/EWOULDBLOCK`
317             opname     = the name of the I/O operation for error messages
318 
319         Returns:
320             the number of bytes read or written by `io_op`.
321 
322         Throws:
323             IOException if no data were transmitted and won't be later:
324                 - IOWarning on end-of-flow condition or if a hung-up event was
325                   reported for the I/O device,
326                 - IOError (IOWarning subclass) if `io_op` failed with an error
327                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
328                   event was reported for the I/O device.
329 
330         Note: POSIX says the following about the return value of `read`:
331 
332             When attempting to read from an empty pipe or FIFO [remark: includes
333             sockets]:
334 
335             - If no process has the pipe open for writing, read() shall return 0
336               to indicate end-of-file.
337 
338             - If some process has the pipe open for writing and O_NONBLOCK is
339               set, read() shall return -1 and set errno to [EAGAIN].
340 
341             - If some process has the pipe open for writing and O_NONBLOCK is
342               clear, read() shall block the calling thread until some data is
343               written or the pipe is closed by all processes that had the pipe
344               open for writing.
345 
346         @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html
347 
348     ***************************************************************************/
349 
350     private size_t transfer ( lazy iodev.ssize_t io_op, Event wait_event, istring opname )
351     out (n)
352     {
353         assert(n > 0);
354     }
355     do
356     {
357         // Prevent misinformation if an error happens that is not detected by
358         // io_op, such as a socket error reported by getsockopt(SO_ERROR) or an
359         // epoll event like EPOLLHUP or EPOLLERR.
360         errno = 0;
361         iodev.ssize_t n;
362 
363         // First call io_op. If io_op fails with EAGAIN/EWOULDBLOCK, enter a
364         // loop, waiting for EPOLLIN and calliing io_op again, until
365         //   - io_op succeeds (i.e. returns a positive value) or
366         //   - io_op reports EOF (i.e. returns 0; only read() does that) or
367         //   - io_op fails (i.e. returns a negative value) with errno other
368         //     than EAGAIN/EWOULDBLOCK (or EINTR, see below) or
369         //   - epoll reports EPOLLHUP or EPOLLERR.
370         for (n = io_op; n <= 0; n = io_op)
371         {
372             enforce(this.warning_e, n, "end of flow whilst reading");
373             switch (errno)
374             {
375                 case EAGAIN:
376                     static if (EAGAIN != EWOULDBLOCK)
377                     {
378                         case EWOULDBLOCK:
379                     }
380                     this.ioWait(wait_event);
381                     break;
382 
383                 default:
384                     this.error_e.checkDeviceError("I/O error");
385                     throw this.error_e.useGlobalErrno(opname);
386 
387                 case EINTR:
388                     // io_op was interrupted by a signal before data were read
389                     // or written. May not be possible with non-blocking I/O,
390                     // but neither POSIX nor Linux documentation clarifies that,
391                     // so handle it by calling io_op again to be safe.
392             }
393         }
394 
395         return n;
396     }
397 
398     /***************************************************************************
399 
400         Suspends the current task until epoll reports any of the events in
401         `wait_event` for the I/O device or the I/O device times out.
402 
403         Params:
404             events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR`
405                               are always implicitly added)
406 
407         Returns:
408             the events reported by epoll.
409 
410         Throws:
411             - `IOWarning` on `EPOLLHUP`,
412             - `IOError` on `EPOLLERR`,
413             - `EpollException` if registering with epoll failed,
414             - `TimeoutException` on timeout waiting for I/O events.
415 
416     ***************************************************************************/
417 
418     private Event ioWait ( Event wait_event )
419     {
420         Event events = this.select_client.ioWait(wait_event);
421         enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up");
422 
423         if (events & events.EPOLLERR)
424         {
425             this.error_e.checkDeviceError("epoll reported I/O device error");
426             enforce(this.error_e, false, "epoll reported I/O device error");
427             assert(false);
428         }
429         else
430             return events;
431     }
432 
433     /***************************************************************************
434 
435         Reads as much data from the I/O device as can be read with one
436         successful `read` call but at most `dst.length` bytes, and stores the
437         data in `dst`.
438 
439         Params:
440             dst = destination buffer
441 
442         Returns:
443             the number `n` of bytes read, which are stored in `dst[0 .. n]`.
444 
445         Throws:
446             IOException if no data were received and won't arrive later:
447                 - IOWarning on end-of-flow condition or if a hung-up event was
448                   reported for the I/O device,
449                 - IOError (IOWarning subclass) if `read` failed with an error
450                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
451                   event was reported for the I/O device.
452 
453     ***************************************************************************/
454 
455     private size_t deviceRead ( void[] dst )
456     out (n)
457     {
458         debug (Raw) Stdout.formatln(
459             "[{}] Read  {:X2} ({} bytes)", this.iodev.fileHandle, dst[0 .. n], n
460         );
461     }
462     do
463     {
464         return this.transfer(this.iodev.read(dst), Event.EPOLLIN, "read");
465     }
466 
467     /***************************************************************************
468 
469         Reads as much data from the I/O device as can be read with one
470         successful `readv` call but at most `dst_a.length + dst_b.length` bytes,
471         and stores the data in `dst_a` and `dst_b`.
472 
473         Params:
474             dst_a = first destination buffer
475             dst_b = second destination buffer
476 
477         Returns:
478             the number `n` of bytes read, which are stored in
479             - `dst_a[0 .. n]` if `n <= dst_a.length` or
480             - `dst_a` and `dst_b[0 .. n - dst_a.length]` if `n > dst_a.length`.
481 
482         Throws:
483             IOException if no data were received and won't arrive later:
484                 - IOWarning on end-of-flow condition or if a hung-up event was
485                   reported for the I/O device,
486                 - IOError (IOWarning subclass) if `readv` failed with an error
487                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
488                   event was reported for the I/O device.
489 
490     ***************************************************************************/
491 
492     private size_t deviceRead ( void[] dst_a, void[] dst_b )
493     out (n)
494     {
495         debug (Raw)
496         {
497             if (n > dst_a.length)
498                 Stdout.formatln("[{}] Read  {:X2}{:X2} ({} bytes)",
499                     this.iodev.fileHandle, dst_a, dst_b[0 .. n - dst_a.length], n);
500             else
501                 Stdout.formatln("[{}] Read  {:X2} ({} bytes)",
502                     this.iodev.fileHandle, dst_a[0 .. n], n);
503         }
504     }
505     do
506     {
507         // Work around a linker error caused by a druntime packagin bug: The
508         // druntime is by mistake currently not linked with the
509         // core.sys.posix.sys.uio module.
510         static dst_init = iovec.init;
511         iovec[2] dst = dst_init;
512 
513         dst[0] = iovec(dst_a.ptr, dst_a.length);
514         dst[1] = iovec(dst_b.ptr, dst_b.length);
515         int fd = this.iodev.fileHandle;
516         return this.transfer(
517             readv(fd, dst.ptr, cast(int)dst.length), Event.EPOLLIN, "readv"
518         );
519     }
520 
521     /***************************************************************************
522 
523         Writes as much data from the I/O device as can be read with one
524         successful `write` call but at most `src.length` bytes, taking the data
525         from `src`.
526 
527         Params:
528             src = source buffer
529 
530         Returns:
531             the number `n` of bytes written, which were taken from
532             `src[0 .. n]`.
533 
534         Throws:
535             IOException if no data were written and won't be later:
536                 - IOWarning if a hung-up event was reported for the I/O device,
537                 - IOError (IOWarning subclass) if `write` failed with an error
538                   other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
539                   event was reported for the I/O device.
540 
541     ***************************************************************************/
542 
543     private size_t deviceWrite ( const(void)[] src )
544     {
545         debug (Raw) Stdout.formatln(
546             "[{}] Write  {:X2} ({} bytes)", this.iodev.fileHandle,
547             src, src.length
548         );
549 
550         if (!this.tcp_cork_status)
551         {
552             // Try enabling TCP Cork. If it fails then TCP Cork is not supported
553             // for the I/O device.
554             this.tcp_cork_status =
555                 this.setTcpCork(true, false)
556                 ? tcp_cork_status.Enabled
557                 : tcp_cork_status.Disabled;
558         }
559 
560         return this.transfer(this.iodev.write(src), Event.EPOLLOUT, "write");
561     }
562 
563     /***************************************************************************
564 
565         Sets the TCP_CORK option. Disabling (`enable` = 0) sends all pending
566         data.
567 
568         Params:
569             enable = 0 disables TCP_CORK and flushes if previously enabled, a
570                 different value enables TCP_CORK.
571             throw_on_error = throw on error rather than returning `false`
572 
573         Returns:
574             `true` on success or `false` on error if `throw_on_error` is
575             `false`.
576 
577         Throws:
578             `IOError` if the `TCP_CORK` option cannot be set for `socket` and
579             `throw_on_error` is `true`. In practice this can fail only for one
580             of the following reasons:
581              - `socket.fileHandle` does not contain a valid file descriptor
582                (`errno == EBADF`). This is the case if the socket was not
583                created by `socket()` or `accept()` yet.
584              - `socket.fileHandle` does not refer to a socket
585                (`errno == ENOTSOCK`).
586 
587     ***************************************************************************/
588 
589     private bool setTcpCork ( int enable, bool throw_on_error = true )
590     {
591         if (!setsockopt(this.iodev.fileHandle, IPPROTO_TCP, TCP_CORK,
592             &enable, enable.sizeof))
593         {
594             return true;
595         }
596         else if (throw_on_error)
597         {
598             this.error_e.checkDeviceError("setsockopt(TCP_CORK)");
599             throw this.error_e.useGlobalErrno("setsockopt(TCP_CORK)");
600         }
601         else
602         {
603             return false;
604         }
605     }
606 }
607 
608 import core.stdc.errno;
609 import ocean.core.Enforce;
610 import ocean.text.util.ClassName;
611 
612 /*******************************************************************************
613 
614     Utility function to `connect` a socket that is managed by a
615     `TaskSelectTransceiver`.
616 
617     Calls `socket_connect` once. If it returns `false`, evaluates `errno` and
618     waits for establishing the connection to complete if needed, suspending the
619     task.
620 
621     When calling `socket_connect` the I/O device which was passed to the
622     constructor of `tst` is passed to it via the `socket` parameter.
623     `socket_connect` should call the POSIX `connect` function, passing
624     `socket.fileHandle`, and return `true` on success or `false` on failure,
625     corresponding to `connect` returning 0 or -1, respectively.
626 
627     If `socket_connect` returns `true` then this method does nothing but
628     returning 0. Otherwise, if `socket_connect` returns `false` then it does one
629     of the following actions depending on `errno`:
630      - `EINPROGRESS`, `EALREADY`, `EINTR`: Wait for establishing the connection
631        to complete, then return `errno`.
632      - `EISCONN`, 0: Return `errno` (and do nothing else).
633      - All other codes: Throw `IOError`.
634 
635     The `Socket` type must to be chosen so that the I/O device passed to the
636     constructor can be cast to it.
637 
638     Params:
639         tst            = the `TaskSelectTransceiver` instance that manages the
640                          socket to connect
641         socket_connect = calls POSIX `connect`
642 
643     Returns:
644         - 0 if `socket_connect` returned `true`,
645         - or the initial `errno` code otherwise, if the socket is now
646           connected.
647 
648     Throws:
649         `IOError` if `socket_connect` returned `false` and `errno` indicated
650          that the socket connection cannot be established.
651 
652 *******************************************************************************/
653 
654 public int connect ( Socket: IODevice ) ( TaskSelectTransceiver tst,
655     scope bool delegate ( Socket socket ) socket_connect )
656 {
657     auto socket = cast(Socket)tst.iodev;
658     verify(socket !is null, "connect: Unable to cast the I/O " ~
659         "device from " ~ classname(tst.iodev) ~ " to " ~ Socket.stringof);
660     return connect_(tst, socket_connect(socket));
661 }
662 
663 /*******************************************************************************
664 
665     Implements the logic described for `connect`.
666 
667     Params:
668         tst            = the `TaskSelectTransceiver` instance that manages the
669                          socket to connect
670         socket_connect = calls POSIX `connect`
671 
672     Returns:
673         See `connect`.
674 
675     Throws:
676         See `connect`.
677 
678 ******************************************************************************/
679 
680 private int connect_ ( TaskSelectTransceiver tst, lazy bool socket_connect )
681 {
682     errno = 0;
683     if (socket_connect)
684         return 0;
685 
686     auto errnum = errno;
687 
688     switch (errnum)
689     {
690         case EINPROGRESS,
691              EALREADY,
692              EINTR: // TODO: Might never be reported, see note below.
693             tst.ioWait(tst.Event.EPOLLOUT);
694             goto case;
695 
696         case EISCONN, 0:
697             return errnum;
698 
699         default:
700             tst.error_e.checkDeviceError("error establishing connection");
701             enforce(tst.error_e, false, "error establishing connection");
702             assert(false);
703     }
704 
705     /* The POSIX specification says about connect() failing with EINTR:
706 
707         "If connect() is interrupted by a signal that is caught while blocked
708         waiting to establish a connection, connect() shall fail and set errno to
709         EINTR, but the connection request shall not be aborted, and the
710         connection shall be established asynchronously."
711 
712     It remains unclear whether a nonblocking connect() can also fail with EINTR
713     or not. Assuming that, if it is possible, it has the same meaning as for
714     blocking connect(), we handle EINTR in the same way as EINPROGRESS.
715     TODO: Remove handling of EINTR or this note when this is clarified. */
716 }
717 
718 version (unittest)
719 {
720     import ocean.io.select.protocol.generic.ErrnoIOException;
721     import ocean.task.Task;
722     import ocean.meta.types.Qualifiers;
723 }
724 
725 /// Example of sending and receiving data with the `TaskSelectTransceiver`.
726 unittest
727 {
728     static class IOTaskDemo: Task
729     {
730         TaskSelectTransceiver tst;
731 
732         this ( )
733         {
734             // I/O device to read/write via. In the real world, this would
735             // typically be your socket.
736             IODevice iodev;
737             this.tst = new TaskSelectTransceiver(
738                 iodev, new IOWarning(iodev), new IOError(iodev)
739             );
740         }
741 
742         override void run ( )
743         {
744             // Send a newline-terminated string.
745             this.tst.write("Hello World!\n");
746             // Send an integer value.
747             this.tst.writeValue(3);
748             this.tst.flush();
749 
750             // Receive a newline-terminated string.
751             {
752                 char[] response;
753                 this.tst.readConsume(
754                     (void[] data)
755                     {
756                         auto str = cast(char[])data;
757                         foreach (i, c; str)
758                         {
759                             if (c == '\n')
760                             {
761                                 response ~= str[0 .. i];
762                                 return i;
763                             }
764                         }
765 
766                         response ~= str;
767                         return str.length + 1;
768                     }
769                 );
770             }
771 
772             // Receive an integer value.
773             {
774                 int x;
775                 this.tst.readValue(x);
776             }
777         }
778     }
779 }
780 
781 version (unittest)
782 {
783     import ocean.sys.socket.IPSocket;
784 }
785 
786 /// Example of connecting a TCP/IP socket.
787 unittest
788 {
789     static class ConnectIOTaskDemo: Task
790     {
791         TaskSelectTransceiver tst;
792 
793         this ( )
794         {
795             // Create a TCP/IP socket, throwing SocketError on failure.
796             auto socket = new IPSocket!();
797             auto e = new SocketError(socket);
798             // The `true` parameter makes the socket non-blocking.
799             e.enforce(socket.tcpSocket(true) >= 0, "", "socket");
800 
801             this.tst = new TaskSelectTransceiver(
802                 socket, new IOWarning(socket), e
803             );
804         }
805 
806         override void run ( )
807         {
808             connect(this.tst,
809                 (IPSocket!() socket)
810                 {
811                     // Call `connect` to initiate establishing the connection.
812                     // Return `true` if `connect` returns 0 (i.e. succeeded) or
813                     // false otherwise.
814                     // `connect` will likely return -1 (i.e. fail) and set
815                     // `errno = EINPROGRESS`. `this.tst.connect` will then block
816                     // this task until establishing the socket connection has
817                     // completed.
818                     IPSocket!().InetAddress address;
819                     return !socket.connect(address("127.0.0.1", 4711));
820                 }
821             );
822             // Now the socket is connected, and `this.tst` is ready for reading
823             // and writing, as shown in the documented unit test for sending/
824             // receiving above.
825         }
826     }
827 }