1 /*******************************************************************************
2 3 Performs non-blocking I/O, suspending the current task to wait for the I/O
4 device to be ready if it would have blocked.
5 Because the most common case is using a TCP socket, one TCP-specific
6 facility (TCP Cork) is built into `TaskSelectTransceiver`. The simplicity,
7 convenience (it avoids a custom implementation for output buffering) and
8 frequency of use justifies having it in `TaskSelectTransceiver` rather than
9 a separate class or module.
10 11 Copyright:
12 Copyright (c) 2017 dunnhumby Germany GmbH.
13 All rights reserved.
14 15 License:
16 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
17 Alternatively, this file may be distributed under the terms of the Tango
18 3-Clause BSD License (see LICENSE_BSD.txt for details).
19 20 *******************************************************************************/21 22 moduleocean.io.select.protocol.task.TaskSelectTransceiver;
23 24 importocean.core.Verify;
25 importocean.io.device.IODevice;
26 importocean.io.select.client.model.ISelectClient;
27 28 /// ditto29 30 classTaskSelectTransceiver31 {
32 importocean.io.select.protocol.task.TaskSelectClient;
33 importocean.io.select.protocol.task.internal.BufferedReader;
34 35 importcore.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
36 importocean.stdc.posix.sys.uio: iovec, readv;
37 importocean.stdc.posix.sys.socket: setsockopt;
38 importocean.stdc.posix.netinet.in_: IPPROTO_TCP;
39 40 staticif (__VERSION__ >= 2000 && __VERSION__ < 2073)
41 enum { TCP_CORK = 3 }
42 elsestaticif (__VERSION__ >= 2077)
43 importcore.sys.linux.netinet.tcp: TCP_CORK;
44 else45 importcore.sys.linux.sys.netinet.tcp: TCP_CORK;
46 47 importocean.sys.Epoll: epoll_event_t;
48 aliasepoll_event_t.EventEvent;
49 50 importocean.io.select.protocol.generic.ErrnoIOException: IOWarning, IOError;
51 52 debug (Raw) importocean.io.Stdout: Stdout;
53 54 importocean.core.Enforce: enforce;
55 importocean.transition;
56 57 /***************************************************************************
58 59 I/O device
60 61 ***************************************************************************/62 63 packageIODeviceiodev;
64 65 /***************************************************************************
66 67 Task select client to wait for I/O events
68 69 ***************************************************************************/70 71 packageTaskSelectClientselect_client;
72 73 /***************************************************************************
74 75 Read buffer manager
76 77 ***************************************************************************/78 79 privateBufferedReaderbuffered_reader;
80 81 /***************************************************************************
82 83 Possible values for the TCP Cork status of the I/O device. `Unknown`
84 means TCP Cork support for the I/O device has not been queried yet,
85 `Disabled` that the I/O device does not support TCP Cork.
86 87 ***************************************************************************/88 89 privateenumTcpCorkStatus: uint90 {
91 Unknown = 0,
92 Disabled,
93 Enabled94 }
95 96 /***************************************************************************
97 98 The TCP Cork status of the I/O device. TCP Cork is a Linux
99 feature to buffer outgoing data for TCP sockets to minimise the number
100 of TCP frames sent.
101 102 ***************************************************************************/103 104 privateTcpCorkStatustcp_cork_status;
105 106 /***************************************************************************
107 108 Thrown on EOF and remote hangup event
109 110 ***************************************************************************/111 112 privateIOWarningwarning_e;
113 114 /***************************************************************************
115 116 Thrown on socket I/O error
117 118 ***************************************************************************/119 120 privateIOErrorerror_e;
121 122 /***************************************************************************
123 124 Constructor.
125 126 error_e and warning_e may be the same object if distinguishing between
127 error and warning is not required.
128 129 Params:
130 iodev = I/O device
131 warning_e = exception to throw on end-of-flow condition or if
132 the remote hung up
133 error_e = exception to throw on I/O error
134 read_buffer_size = read buffer size
135 136 137 ***************************************************************************/138 139 publicthis ( IODeviceiodev, IOWarningwarning_e, IOErrorerror_e,
140 size_tread_buffer_size = BufferedReader.default_read_buffer_size )
141 {
142 this.iodev = iodev;
143 this.warning_e = warning_e;
144 this.error_e = error_e;
145 this.select_client = newTaskSelectClient(iodev, &error_e.error_code);
146 this.buffered_reader = newBufferedReader(read_buffer_size);
147 }
148 149 /***************************************************************************
150 151 Populates `data` with data read from the I/O device.
152 153 Params:
154 data = destination data buffer
155 156 Throws:
157 IOException if no data were received nor will any arrive later:
158 - IOWarning on end-of-flow condition or if the remote hung up,
159 - IOError (IOWarning subclass) on I/O error.
160 161 ***************************************************************************/162 163 publicvoidread ( void[] data )
164 {
165 if (data.length)
166 this.buffered_reader.readRaw(data, &this.deviceRead);
167 }
168 169 /***************************************************************************
170 171 Populates `value` with `value.sizeof` bytes read from the I/O device.
172 173 Params:
174 value = reference to a variable to be populated
175 176 Throws:
177 IOException if no data were received nor will any arrive later:
178 - IOWarning on end-of-flow condition or if the remote hung up,
179 - IOError (IOWarning subclass) on I/O error.
180 181 ***************************************************************************/182 183 publicvoidreadValue ( T ) ( outTvalue )
184 {
185 staticif (T.sizeof)
186 this.buffered_reader.readRaw((cast(void*)&value)[0 .. value.sizeof],
187 &this.deviceRead);
188 }
189 190 /***************************************************************************
191 192 Calls `consume` with data read from the I/O device.
193 194 If `consume` feels that the amount of `data` passed to it is sufficient
195 it should return the number of bytes it consumed, which is a value
196 between 0 and `data.length` (inclusive). Otherwise, if `consume`
197 consumed all `data` and still needs more data from the I/O device, it
198 should return a value greater than `data.`length`; it will then called
199 again after more data have been received.
200 201 Params:
202 consume = consumer callback delegate
203 204 Throws:
205 IOException if no data were received nor will any arrive later:
206 - IOWarning on end-of-flow condition or if the remote hung up,
207 - IOError (IOWarning subclass) on I/O error.
208 209 ***************************************************************************/210 211 publicvoidreadConsume ( scopesize_tdelegate ( void[] data ) consume )
212 {
213 this.buffered_reader.readConsume(consume, &this.deviceRead);
214 }
215 216 /***************************************************************************
217 218 Writes the byte data of `value` to the I/O device.
219 If the I/O device is a TCP socket then the data may be buffered for at
220 most 200ms using the TCP Cork feature of Linux. In this case call
221 `flush()` to write all pending data immediately.
222 223 Params:
224 value = the value of which the byte data to write
225 226 Throws:
227 IOException if no data were sent nor will it be possible later:
228 - IOWarning if the remote hung up,
229 - IOError (IOWarning subclass) on I/O error.
230 231 ***************************************************************************/232 233 publicvoidwriteValue ( T ) ( inTvalue )
234 {
235 staticif (T.sizeof)
236 this.write((cast(Const!(void*))&value)[0 .. value.sizeof]);
237 }
238 239 /***************************************************************************
240 241 Writes `data` to the I/O device.
242 If the I/O device is a TCP socket then `data` may be buffered for at
243 most 200ms using the TCP Cork feature of Linux. In this case call
244 `flush()` to write all pending data immediately.
245 246 Params:
247 data = data to write
248 249 Throws:
250 IOException if no data were sent nor will it be possible later:
251 - IOWarning if the remote hung up,
252 - IOError (IOWarning subclass) on I/O error.
253 254 ***************************************************************************/255 256 publicvoidwrite ( Const!(void)[] data )
257 {
258 while (data.length)
259 data = data[this.deviceWrite(data) .. $];
260 }
261 262 /***************************************************************************
263 264 Sends all pending output data immediately. Calling this method has an
265 effect only if the I/O device is a TCP socket.
266 267 Throws:
268 IOError on I/O error.
269 270 ***************************************************************************/271 272 publicvoidflush ( )
273 {
274 if (this.tcp_cork_status == tcp_cork_status.Enabled)
275 {
276 this.setTcpCork(false);
277 this.setTcpCork(true);
278 }
279 }
280 281 /***************************************************************************
282 283 Removes any remaining data from I/O buffers, sends any pending output
284 data immediately if possible, and removes the epoll registration of the
285 I/O device, if any.
286 287 You need to call this method if you close and then reopen or otherwise
288 reassign the I/O device's file descriptor *without* suspending and
289 resuming or terminating and restarting the task in between. You may call
290 this method at any time between the last time you read from the old and
291 the first time you read from the new device.
292 293 This method does not throw. It is safe to call it if the I/O device is
294 not (yet or any more) usable.
295 296 ***************************************************************************/297 298 publicvoidreset ( )
299 {
300 this.buffered_reader.reset();
301 this.select_client.unregister();
302 303 if (this.tcp_cork_status == tcp_cork_status.Enabled)
304 this.setTcpCork(false, false);
305 306 this.tcp_cork_status = tcp_cork_status.Unknown;
307 }
308 309 /***************************************************************************
310 311 Calls `io_op` until it returns a positive value. Waits for `wait_event`
312 if `io_op` fails with `EAGAIN` or `EWOULDBLOCK`.
313 314 `io_op` should behave like POSIX `read/write` and return
315 - the non-zero number of bytes read or written on success or
316 - 0 on end-of-flow condition or
317 - a negative value on error and set `errno` accordingly.
318 319 Params:
320 io_op = I/O operation
321 wait_event = the event to wait for if `io_op` fails with
322 `EAGAIN/EWOULDBLOCK`
323 opname = the name of the I/O operation for error messages
324 325 Returns:
326 the number of bytes read or written by `io_op`.
327 328 Throws:
329 IOException if no data were transmitted and won't be later:
330 - IOWarning on end-of-flow condition or if a hung-up event was
331 reported for the I/O device,
332 - IOError (IOWarning subclass) if `io_op` failed with an error
333 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
334 event was reported for the I/O device.
335 336 Note: POSIX says the following about the return value of `read`:
337 338 When attempting to read from an empty pipe or FIFO [remark: includes
339 sockets]:
340 341 - If no process has the pipe open for writing, read() shall return 0
342 to indicate end-of-file.
343 344 - If some process has the pipe open for writing and O_NONBLOCK is
345 set, read() shall return -1 and set errno to [EAGAIN].
346 347 - If some process has the pipe open for writing and O_NONBLOCK is
348 clear, read() shall block the calling thread until some data is
349 written or the pipe is closed by all processes that had the pipe
350 open for writing.
351 352 @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html
353 354 ***************************************************************************/355 356 privatesize_ttransfer ( lazyiodev.ssize_tio_op, Eventwait_event, istringopname )
357 out (n)
358 {
359 assert(n > 0);
360 }
361 body362 {
363 // Prevent misinformation if an error happens that is not detected by364 // io_op, such as a socket error reported by getsockopt(SO_ERROR) or an365 // epoll event like EPOLLHUP or EPOLLERR.366 errno = 0;
367 iodev.ssize_tn;
368 369 // First call io_op. If io_op fails with EAGAIN/EWOULDBLOCK, enter a370 // loop, waiting for EPOLLIN and calliing io_op again, until371 // - io_op succeeds (i.e. returns a positive value) or372 // - io_op reports EOF (i.e. returns 0; only read() does that) or373 // - io_op fails (i.e. returns a negative value) with errno other374 // than EAGAIN/EWOULDBLOCK (or EINTR, see below) or375 // - epoll reports EPOLLHUP or EPOLLERR.376 for (n = io_op; n <= 0; n = io_op)
377 {
378 enforce(this.warning_e, n, "end of flow whilst reading");
379 switch (errno)
380 {
381 caseEAGAIN:
382 staticif (EAGAIN != EWOULDBLOCK)
383 {
384 caseEWOULDBLOCK:
385 }
386 this.ioWait(wait_event);
387 break;
388 389 default:
390 this.error_e.checkDeviceError("I/O error");
391 throwthis.error_e.useGlobalErrno(opname);
392 393 caseEINTR:
394 // io_op was interrupted by a signal before data were read395 // or written. May not be possible with non-blocking I/O,396 // but neither POSIX nor Linux documentation clarifies that,397 // so handle it by calling io_op again to be safe.398 }
399 }
400 401 returnn;
402 }
403 404 /***************************************************************************
405 406 Suspends the current task until epoll reports any of the events in
407 `wait_event` for the I/O device or the I/O device times out.
408 409 Params:
410 events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR`
411 are always implicitly added)
412 413 Returns:
414 the events reported by epoll.
415 416 Throws:
417 - `IOWarning` on `EPOLLHUP`,
418 - `IOError` on `EPOLLERR`,
419 - `EpollException` if registering with epoll failed,
420 - `TimeoutException` on timeout waiting for I/O events.
421 422 ***************************************************************************/423 424 privateEventioWait ( Eventwait_event )
425 {
426 Eventevents = this.select_client.ioWait(wait_event);
427 enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up");
428 429 if (events & events.EPOLLERR)
430 {
431 this.error_e.checkDeviceError("epoll reported I/O device error");
432 enforce(this.error_e, false, "epoll reported I/O device error");
433 assert(false);
434 }
435 else436 returnevents;
437 }
438 439 /***************************************************************************
440 441 Reads as much data from the I/O device as can be read with one
442 successful `read` call but at most `dst.length` bytes, and stores the
443 data in `dst`.
444 445 Params:
446 dst = destination buffer
447 448 Returns:
449 the number `n` of bytes read, which are stored in `dst[0 .. n]`.
450 451 Throws:
452 IOException if no data were received and won't arrive later:
453 - IOWarning on end-of-flow condition or if a hung-up event was
454 reported for the I/O device,
455 - IOError (IOWarning subclass) if `read` failed with an error
456 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
457 event was reported for the I/O device.
458 459 ***************************************************************************/460 461 privatesize_tdeviceRead ( void[] dst )
462 out (n)
463 {
464 debug (Raw) Stdout.formatln(
465 "[{}] Read {:X2} ({} bytes)", this.iodev.fileHandle, dst[0 .. n], n466 );
467 }
468 body469 {
470 returnthis.transfer(this.iodev.read(dst), Event.EPOLLIN, "read");
471 }
472 473 /***************************************************************************
474 475 Reads as much data from the I/O device as can be read with one
476 successful `readv` call but at most `dst_a.length + dst_b.length` bytes,
477 and stores the data in `dst_a` and `dst_b`.
478 479 Params:
480 dst_a = first destination buffer
481 dst_b = second destination buffer
482 483 Returns:
484 the number `n` of bytes read, which are stored in
485 - `dst_a[0 .. n]` if `n <= dst_a.length` or
486 - `dst_a` and `dst_b[0 .. n - dst_a.length]` if `n > dst_a.length`.
487 488 Throws:
489 IOException if no data were received and won't arrive later:
490 - IOWarning on end-of-flow condition or if a hung-up event was
491 reported for the I/O device,
492 - IOError (IOWarning subclass) if `readv` failed with an error
493 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
494 event was reported for the I/O device.
495 496 ***************************************************************************/497 498 privatesize_tdeviceRead ( void[] dst_a, void[] dst_b )
499 out (n)
500 {
501 debug (Raw)
502 {
503 if (n > dst_a.length)
504 Stdout.formatln("[{}] Read {:X2}{:X2} ({} bytes)",
505 this.iodev.fileHandle, dst_a, dst_b[0 .. n - dst_a.length], n);
506 else507 Stdout.formatln("[{}] Read {:X2} ({} bytes)",
508 this.iodev.fileHandle, dst_a[0 .. n], n);
509 }
510 }
511 body512 {
513 // Work around a linker error caused by a druntime packagin bug: The514 // druntime is by mistake currently not linked with the515 // core.sys.posix.sys.uio module.516 staticdst_init = iovec.init;
517 iovec[2] dst = dst_init;
518 519 dst[0] = iovec(dst_a.ptr, dst_a.length);
520 dst[1] = iovec(dst_b.ptr, dst_b.length);
521 intfd = this.iodev.fileHandle;
522 returnthis.transfer(
523 readv(fd, dst.ptr, cast(int)dst.length), Event.EPOLLIN, "readv"524 );
525 }
526 527 /***************************************************************************
528 529 Writes as much data from the I/O device as can be read with one
530 successful `write` call but at most `src.length` bytes, taking the data
531 from `src`.
532 533 Params:
534 src = source buffer
535 536 Returns:
537 the number `n` of bytes written, which were taken from
538 `src[0 .. n]`.
539 540 Throws:
541 IOException if no data were written and won't be later:
542 - IOWarning if a hung-up event was reported for the I/O device,
543 - IOError (IOWarning subclass) if `write` failed with an error
544 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error
545 event was reported for the I/O device.
546 547 ***************************************************************************/548 549 privatesize_tdeviceWrite ( Const!(void)[] src )
550 {
551 debug (Raw) Stdout.formatln(
552 "[{}] Write {:X2} ({} bytes)", this.iodev.fileHandle,
553 src, src.length554 );
555 556 if (!this.tcp_cork_status)
557 {
558 // Try enabling TCP Cork. If it fails then TCP Cork is not supported559 // for the I/O device.560 this.tcp_cork_status =
561 this.setTcpCork(true, false)
562 ? tcp_cork_status.Enabled563 : tcp_cork_status.Disabled;
564 }
565 566 returnthis.transfer(this.iodev.write(src), Event.EPOLLOUT, "write");
567 }
568 569 /***************************************************************************
570 571 Sets the TCP_CORK option. Disabling (`enable` = 0) sends all pending
572 data.
573 574 Params:
575 enable = 0 disables TCP_CORK and flushes if previously enabled, a
576 different value enables TCP_CORK.
577 throw_on_error = throw on error rather than returning `false`
578 579 Returns:
580 `true` on success or `false` on error if `throw_on_error` is
581 `false`.
582 583 Throws:
584 `IOError` if the `TCP_CORK` option cannot be set for `socket` and
585 `throw_on_error` is `true`. In practice this can fail only for one
586 of the following reasons:
587 - `socket.fileHandle` does not contain a valid file descriptor
588 (`errno == EBADF`). This is the case if the socket was not
589 created by `socket()` or `accept()` yet.
590 - `socket.fileHandle` does not refer to a socket
591 (`errno == ENOTSOCK`).
592 593 ***************************************************************************/594 595 privateboolsetTcpCork ( intenable, boolthrow_on_error = true )
596 {
597 if (!setsockopt(this.iodev.fileHandle, IPPROTO_TCP, TCP_CORK,
598 &enable, enable.sizeof))
599 {
600 returntrue;
601 }
602 elseif (throw_on_error)
603 {
604 this.error_e.checkDeviceError("setsockopt(TCP_CORK)");
605 throwthis.error_e.useGlobalErrno("setsockopt(TCP_CORK)");
606 }
607 else608 {
609 returnfalse;
610 }
611 }
612 }
613 614 importcore.stdc.errno;
615 importocean.core.Enforce;
616 importocean.text.util.ClassName;
617 618 /*******************************************************************************
619 620 Utility function to `connect` a socket that is managed by a
621 `TaskSelectTransceiver`.
622 623 Calls `socket_connect` once. If it returns `false`, evaluates `errno` and
624 waits for establishing the connection to complete if needed, suspending the
625 task.
626 627 When calling `socket_connect` the I/O device which was passed to the
628 constructor of `tst` is passed to it via the `socket` parameter.
629 `socket_connect` should call the POSIX `connect` function, passing
630 `socket.fileHandle`, and return `true` on success or `false` on failure,
631 corresponding to `connect` returning 0 or -1, respectively.
632 633 If `socket_connect` returns `true` then this method does nothing but
634 returning 0. Otherwise, if `socket_connect` returns `false` then it does one
635 of the following actions depending on `errno`:
636 - `EINPROGRESS`, `EALREADY`, `EINTR`: Wait for establishing the connection
637 to complete, then return `errno`.
638 - `EISCONN`, 0: Return `errno` (and do nothing else).
639 - All other codes: Throw `IOError`.
640 641 The `Socket` type must to be chosen so that the I/O device passed to the
642 constructor can be cast to it.
643 644 Params:
645 tst = the `TaskSelectTransceiver` instance that manages the
646 socket to connect
647 socket_connect = calls POSIX `connect`
648 649 Returns:
650 - 0 if `socket_connect` returned `true`,
651 - or the initial `errno` code otherwise, if the socket is now
652 connected.
653 654 Throws:
655 `IOError` if `socket_connect` returned `false` and `errno` indicated
656 that the socket connection cannot be established.
657 658 *******************************************************************************/659 660 publicintconnect ( Socket: IODevice ) ( TaskSelectTransceivertst,
661 scopebooldelegate ( Socketsocket ) socket_connect )
662 {
663 autosocket = cast(Socket)tst.iodev;
664 verify(socket !isnull, "connect: Unable to cast the I/O " ~
665 "device from " ~ classname(tst.iodev) ~ " to " ~ Socket.stringof);
666 returnconnect_(tst, socket_connect(socket));
667 }
668 669 /*******************************************************************************
670 671 Implements the logic described for `connect`.
672 673 Params:
674 tst = the `TaskSelectTransceiver` instance that manages the
675 socket to connect
676 socket_connect = calls POSIX `connect`
677 678 Returns:
679 See `connect`.
680 681 Throws:
682 See `connect`.
683 684 ******************************************************************************/685 686 privateintconnect_ ( TaskSelectTransceivertst, lazyboolsocket_connect )
687 {
688 errno = 0;
689 if (socket_connect)
690 return0;
691 692 autoerrnum = errno;
693 694 switch (errnum)
695 {
696 caseEINPROGRESS,
697 EALREADY,
698 EINTR: // TODO: Might never be reported, see note below.699 tst.ioWait(tst.Event.EPOLLOUT);
700 gotocase;
701 702 caseEISCONN, 0:
703 returnerrnum;
704 705 default:
706 tst.error_e.checkDeviceError("error establishing connection");
707 enforce(tst.error_e, false, "error establishing connection");
708 assert(false);
709 }
710 711 /* The POSIX specification says about connect() failing with EINTR:
712 713 "If connect() is interrupted by a signal that is caught while blocked
714 waiting to establish a connection, connect() shall fail and set errno to
715 EINTR, but the connection request shall not be aborted, and the
716 connection shall be established asynchronously."
717 718 It remains unclear whether a nonblocking connect() can also fail with EINTR
719 or not. Assuming that, if it is possible, it has the same meaning as for
720 blocking connect(), we handle EINTR in the same way as EINPROGRESS.
721 TODO: Remove handling of EINTR or this note when this is clarified. */722 }
723 724 version (UnitTest)
725 {
726 importocean.io.select.protocol.generic.ErrnoIOException;
727 importocean.task.Task;
728 importocean.transition;
729 }
730 731 /// Example of sending and receiving data with the `TaskSelectTransceiver`.732 unittest733 {
734 staticclassIOTaskDemo: Task735 {
736 TaskSelectTransceivertst;
737 738 this ( )
739 {
740 // I/O device to read/write via. In the real world, this would741 // typically be your socket.742 IODeviceiodev;
743 this.tst = newTaskSelectTransceiver(
744 iodev, newIOWarning(iodev), newIOError(iodev)
745 );
746 }
747 748 overridevoidrun ( )
749 {
750 // Send a newline-terminated string.751 this.tst.write("Hello World!\n");
752 // Send an integer value.753 this.tst.writeValue(3);
754 this.tst.flush();
755 756 // Receive a newline-terminated string.757 {
758 char[] response;
759 this.tst.readConsume(
760 (void[] data)
761 {
762 autostr = cast(char[])data;
763 foreach (i, c; str)
764 {
765 if (c == '\n')
766 {
767 response ~= str[0 .. i];
768 returni;
769 }
770 }
771 772 response ~= str;
773 returnstr.length + 1;
774 }
775 );
776 }
777 778 // Receive an integer value.779 {
780 intx;
781 this.tst.readValue(x);
782 }
783 }
784 }
785 }
786 787 version (UnitTest)
788 {
789 importocean.sys.socket.IPSocket;
790 }
791 792 /// Example of connecting a TCP/IP socket.793 unittest794 {
795 staticclassConnectIOTaskDemo: Task796 {
797 TaskSelectTransceivertst;
798 799 this ( )
800 {
801 // Create a TCP/IP socket, throwing SocketError on failure.802 autosocket = newIPSocket!();
803 autoe = newSocketError(socket);
804 // The `true` parameter makes the socket non-blocking.805 e.enforce(socket.tcpSocket(true) >= 0, "", "socket");
806 807 this.tst = newTaskSelectTransceiver(
808 socket, newIOWarning(socket), e809 );
810 }
811 812 overridevoidrun ( )
813 {
814 connect(this.tst,
815 (IPSocket!() socket)
816 {
817 // Call `connect` to initiate establishing the connection.818 // Return `true` if `connect` returns 0 (i.e. succeeded) or819 // false otherwise.820 // `connect` will likely return -1 (i.e. fail) and set821 // `errno = EINPROGRESS`. `this.tst.connect` will then block822 // this task until establishing the socket connection has823 // completed.824 IPSocket!().InetAddressaddress;
825 return !socket.connect(address("127.0.0.1", 4711));
826 }
827 );
828 // Now the socket is connected, and `this.tst` is ready for reading829 // and writing, as shown in the documented unit test for sending/830 // receiving above.831 }
832 }
833 }