1 /******************************************************************************* 2 3 Performs non-blocking I/O, suspending the current task to wait for the I/O 4 device to be ready if it would have blocked. 5 Because the most common case is using a TCP socket, one TCP-specific 6 facility (TCP Cork) is built into `TaskSelectTransceiver`. The simplicity, 7 convenience (it avoids a custom implementation for output buffering) and 8 frequency of use justifies having it in `TaskSelectTransceiver` rather than 9 a separate class or module. 10 11 Copyright: 12 Copyright (c) 2017 dunnhumby Germany GmbH. 13 All rights reserved. 14 15 License: 16 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 17 Alternatively, this file may be distributed under the terms of the Tango 18 3-Clause BSD License (see LICENSE_BSD.txt for details). 19 20 *******************************************************************************/ 21 22 module ocean.io.select.protocol.task.TaskSelectTransceiver; 23 24 import ocean.core.Verify; 25 import ocean.io.device.IODevice; 26 import ocean.io.select.client.model.ISelectClient; 27 28 /// ditto 29 30 class TaskSelectTransceiver 31 { 32 import ocean.io.select.protocol.task.TaskSelectClient; 33 import ocean.io.select.protocol.task.internal.BufferedReader; 34 35 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR; 36 import core.sys.posix.sys.uio: iovec, readv; 37 import core.sys.posix.sys.socket: setsockopt; 38 import core.sys.posix.netinet.in_: IPPROTO_TCP; 39 import core.sys.linux.netinet.tcp: TCP_CORK; 40 41 import ocean.sys.Epoll: epoll_event_t; 42 alias epoll_event_t.Event Event; 43 44 import ocean.io.select.protocol.generic.ErrnoIOException: IOWarning, IOError; 45 46 debug (Raw) import ocean.io.Stdout: Stdout; 47 48 import ocean.core.Enforce: enforce; 49 import ocean.meta.types.Qualifiers; 50 51 /*************************************************************************** 52 53 I/O device 54 55 ***************************************************************************/ 56 57 package IODevice iodev; 58 59 /*************************************************************************** 60 61 Task select client to wait for I/O events 62 63 ***************************************************************************/ 64 65 package TaskSelectClient select_client; 66 67 /*************************************************************************** 68 69 Read buffer manager 70 71 ***************************************************************************/ 72 73 private BufferedReader buffered_reader; 74 75 /*************************************************************************** 76 77 Possible values for the TCP Cork status of the I/O device. `Unknown` 78 means TCP Cork support for the I/O device has not been queried yet, 79 `Disabled` that the I/O device does not support TCP Cork. 80 81 ***************************************************************************/ 82 83 private enum TcpCorkStatus: uint 84 { 85 Unknown = 0, 86 Disabled, 87 Enabled 88 } 89 90 /*************************************************************************** 91 92 The TCP Cork status of the I/O device. TCP Cork is a Linux 93 feature to buffer outgoing data for TCP sockets to minimise the number 94 of TCP frames sent. 95 96 ***************************************************************************/ 97 98 private TcpCorkStatus tcp_cork_status; 99 100 /*************************************************************************** 101 102 Thrown on EOF and remote hangup event 103 104 ***************************************************************************/ 105 106 private IOWarning warning_e; 107 108 /*************************************************************************** 109 110 Thrown on socket I/O error 111 112 ***************************************************************************/ 113 114 private IOError error_e; 115 116 /*************************************************************************** 117 118 Constructor. 119 120 error_e and warning_e may be the same object if distinguishing between 121 error and warning is not required. 122 123 Params: 124 iodev = I/O device 125 warning_e = exception to throw on end-of-flow condition or if 126 the remote hung up 127 error_e = exception to throw on I/O error 128 read_buffer_size = read buffer size 129 130 131 ***************************************************************************/ 132 133 public this ( IODevice iodev, IOWarning warning_e, IOError error_e, 134 size_t read_buffer_size = BufferedReader.default_read_buffer_size ) 135 { 136 this.iodev = iodev; 137 this.warning_e = warning_e; 138 this.error_e = error_e; 139 this.select_client = new TaskSelectClient(iodev, &error_e.error_code); 140 this.buffered_reader = new BufferedReader(read_buffer_size); 141 } 142 143 /*************************************************************************** 144 145 Populates `data` with data read from the I/O device. 146 147 Params: 148 data = destination data buffer 149 150 Throws: 151 IOException if no data were received nor will any arrive later: 152 - IOWarning on end-of-flow condition or if the remote hung up, 153 - IOError (IOWarning subclass) on I/O error. 154 155 ***************************************************************************/ 156 157 public void read ( void[] data ) 158 { 159 if (data.length) 160 this.buffered_reader.readRaw(data, &this.deviceRead); 161 } 162 163 /*************************************************************************** 164 165 Populates `value` with `value.sizeof` bytes read from the I/O device. 166 167 Params: 168 value = reference to a variable to be populated 169 170 Throws: 171 IOException if no data were received nor will any arrive later: 172 - IOWarning on end-of-flow condition or if the remote hung up, 173 - IOError (IOWarning subclass) on I/O error. 174 175 ***************************************************************************/ 176 177 public void readValue ( T ) ( out T value ) 178 { 179 static if (T.sizeof) 180 this.buffered_reader.readRaw((cast(void*)&value)[0 .. value.sizeof], 181 &this.deviceRead); 182 } 183 184 /*************************************************************************** 185 186 Calls `consume` with data read from the I/O device. 187 188 If `consume` feels that the amount of `data` passed to it is sufficient 189 it should return the number of bytes it consumed, which is a value 190 between 0 and `data.length` (inclusive). Otherwise, if `consume` 191 consumed all `data` and still needs more data from the I/O device, it 192 should return a value greater than `data.`length`; it will then called 193 again after more data have been received. 194 195 Params: 196 consume = consumer callback delegate 197 198 Throws: 199 IOException if no data were received nor will any arrive later: 200 - IOWarning on end-of-flow condition or if the remote hung up, 201 - IOError (IOWarning subclass) on I/O error. 202 203 ***************************************************************************/ 204 205 public void readConsume ( scope size_t delegate ( void[] data ) consume ) 206 { 207 this.buffered_reader.readConsume(consume, &this.deviceRead); 208 } 209 210 /*************************************************************************** 211 212 Writes the byte data of `value` to the I/O device. 213 If the I/O device is a TCP socket then the data may be buffered for at 214 most 200ms using the TCP Cork feature of Linux. In this case call 215 `flush()` to write all pending data immediately. 216 217 Params: 218 value = the value of which the byte data to write 219 220 Throws: 221 IOException if no data were sent nor will it be possible later: 222 - IOWarning if the remote hung up, 223 - IOError (IOWarning subclass) on I/O error. 224 225 ***************************************************************************/ 226 227 public void writeValue ( T ) ( in T value ) 228 { 229 static if (T.sizeof) 230 this.write((cast(const(void*))&value)[0 .. value.sizeof]); 231 } 232 233 /*************************************************************************** 234 235 Writes `data` to the I/O device. 236 If the I/O device is a TCP socket then `data` may be buffered for at 237 most 200ms using the TCP Cork feature of Linux. In this case call 238 `flush()` to write all pending data immediately. 239 240 Params: 241 data = data to write 242 243 Throws: 244 IOException if no data were sent nor will it be possible later: 245 - IOWarning if the remote hung up, 246 - IOError (IOWarning subclass) on I/O error. 247 248 ***************************************************************************/ 249 250 public void write ( const(void)[] data ) 251 { 252 while (data.length) 253 data = data[this.deviceWrite(data) .. $]; 254 } 255 256 /*************************************************************************** 257 258 Sends all pending output data immediately. Calling this method has an 259 effect only if the I/O device is a TCP socket. 260 261 Throws: 262 IOError on I/O error. 263 264 ***************************************************************************/ 265 266 public void flush ( ) 267 { 268 if (this.tcp_cork_status == tcp_cork_status.Enabled) 269 { 270 this.setTcpCork(false); 271 this.setTcpCork(true); 272 } 273 } 274 275 /*************************************************************************** 276 277 Removes any remaining data from I/O buffers, sends any pending output 278 data immediately if possible, and removes the epoll registration of the 279 I/O device, if any. 280 281 You need to call this method if you close and then reopen or otherwise 282 reassign the I/O device's file descriptor *without* suspending and 283 resuming or terminating and restarting the task in between. You may call 284 this method at any time between the last time you read from the old and 285 the first time you read from the new device. 286 287 This method does not throw. It is safe to call it if the I/O device is 288 not (yet or any more) usable. 289 290 ***************************************************************************/ 291 292 public void reset ( ) 293 { 294 this.buffered_reader.reset(); 295 this.select_client.unregister(); 296 297 if (this.tcp_cork_status == tcp_cork_status.Enabled) 298 this.setTcpCork(false, false); 299 300 this.tcp_cork_status = tcp_cork_status.Unknown; 301 } 302 303 /*************************************************************************** 304 305 Calls `io_op` until it returns a positive value. Waits for `wait_event` 306 if `io_op` fails with `EAGAIN` or `EWOULDBLOCK`. 307 308 `io_op` should behave like POSIX `read/write` and return 309 - the non-zero number of bytes read or written on success or 310 - 0 on end-of-flow condition or 311 - a negative value on error and set `errno` accordingly. 312 313 Params: 314 io_op = I/O operation 315 wait_event = the event to wait for if `io_op` fails with 316 `EAGAIN/EWOULDBLOCK` 317 opname = the name of the I/O operation for error messages 318 319 Returns: 320 the number of bytes read or written by `io_op`. 321 322 Throws: 323 IOException if no data were transmitted and won't be later: 324 - IOWarning on end-of-flow condition or if a hung-up event was 325 reported for the I/O device, 326 - IOError (IOWarning subclass) if `io_op` failed with an error 327 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 328 event was reported for the I/O device. 329 330 Note: POSIX says the following about the return value of `read`: 331 332 When attempting to read from an empty pipe or FIFO [remark: includes 333 sockets]: 334 335 - If no process has the pipe open for writing, read() shall return 0 336 to indicate end-of-file. 337 338 - If some process has the pipe open for writing and O_NONBLOCK is 339 set, read() shall return -1 and set errno to [EAGAIN]. 340 341 - If some process has the pipe open for writing and O_NONBLOCK is 342 clear, read() shall block the calling thread until some data is 343 written or the pipe is closed by all processes that had the pipe 344 open for writing. 345 346 @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html 347 348 ***************************************************************************/ 349 350 private size_t transfer ( lazy iodev.ssize_t io_op, Event wait_event, istring opname ) 351 out (n) 352 { 353 assert(n > 0); 354 } 355 do 356 { 357 // Prevent misinformation if an error happens that is not detected by 358 // io_op, such as a socket error reported by getsockopt(SO_ERROR) or an 359 // epoll event like EPOLLHUP or EPOLLERR. 360 errno = 0; 361 iodev.ssize_t n; 362 363 // First call io_op. If io_op fails with EAGAIN/EWOULDBLOCK, enter a 364 // loop, waiting for EPOLLIN and calliing io_op again, until 365 // - io_op succeeds (i.e. returns a positive value) or 366 // - io_op reports EOF (i.e. returns 0; only read() does that) or 367 // - io_op fails (i.e. returns a negative value) with errno other 368 // than EAGAIN/EWOULDBLOCK (or EINTR, see below) or 369 // - epoll reports EPOLLHUP or EPOLLERR. 370 for (n = io_op; n <= 0; n = io_op) 371 { 372 enforce(this.warning_e, n, "end of flow whilst reading"); 373 switch (errno) 374 { 375 case EAGAIN: 376 static if (EAGAIN != EWOULDBLOCK) 377 { 378 case EWOULDBLOCK: 379 } 380 this.ioWait(wait_event); 381 break; 382 383 default: 384 this.error_e.checkDeviceError("I/O error"); 385 throw this.error_e.useGlobalErrno(opname); 386 387 case EINTR: 388 // io_op was interrupted by a signal before data were read 389 // or written. May not be possible with non-blocking I/O, 390 // but neither POSIX nor Linux documentation clarifies that, 391 // so handle it by calling io_op again to be safe. 392 } 393 } 394 395 return n; 396 } 397 398 /*************************************************************************** 399 400 Suspends the current task until epoll reports any of the events in 401 `wait_event` for the I/O device or the I/O device times out. 402 403 Params: 404 events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR` 405 are always implicitly added) 406 407 Returns: 408 the events reported by epoll. 409 410 Throws: 411 - `IOWarning` on `EPOLLHUP`, 412 - `IOError` on `EPOLLERR`, 413 - `EpollException` if registering with epoll failed, 414 - `TimeoutException` on timeout waiting for I/O events. 415 416 ***************************************************************************/ 417 418 private Event ioWait ( Event wait_event ) 419 { 420 Event events = this.select_client.ioWait(wait_event); 421 enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up"); 422 423 if (events & events.EPOLLERR) 424 { 425 this.error_e.checkDeviceError("epoll reported I/O device error"); 426 enforce(this.error_e, false, "epoll reported I/O device error"); 427 assert(false); 428 } 429 else 430 return events; 431 } 432 433 /*************************************************************************** 434 435 Reads as much data from the I/O device as can be read with one 436 successful `read` call but at most `dst.length` bytes, and stores the 437 data in `dst`. 438 439 Params: 440 dst = destination buffer 441 442 Returns: 443 the number `n` of bytes read, which are stored in `dst[0 .. n]`. 444 445 Throws: 446 IOException if no data were received and won't arrive later: 447 - IOWarning on end-of-flow condition or if a hung-up event was 448 reported for the I/O device, 449 - IOError (IOWarning subclass) if `read` failed with an error 450 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 451 event was reported for the I/O device. 452 453 ***************************************************************************/ 454 455 private size_t deviceRead ( void[] dst ) 456 out (n) 457 { 458 debug (Raw) Stdout.formatln( 459 "[{}] Read {:X2} ({} bytes)", this.iodev.fileHandle, dst[0 .. n], n 460 ); 461 } 462 do 463 { 464 return this.transfer(this.iodev.read(dst), Event.EPOLLIN, "read"); 465 } 466 467 /*************************************************************************** 468 469 Reads as much data from the I/O device as can be read with one 470 successful `readv` call but at most `dst_a.length + dst_b.length` bytes, 471 and stores the data in `dst_a` and `dst_b`. 472 473 Params: 474 dst_a = first destination buffer 475 dst_b = second destination buffer 476 477 Returns: 478 the number `n` of bytes read, which are stored in 479 - `dst_a[0 .. n]` if `n <= dst_a.length` or 480 - `dst_a` and `dst_b[0 .. n - dst_a.length]` if `n > dst_a.length`. 481 482 Throws: 483 IOException if no data were received and won't arrive later: 484 - IOWarning on end-of-flow condition or if a hung-up event was 485 reported for the I/O device, 486 - IOError (IOWarning subclass) if `readv` failed with an error 487 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 488 event was reported for the I/O device. 489 490 ***************************************************************************/ 491 492 private size_t deviceRead ( void[] dst_a, void[] dst_b ) 493 out (n) 494 { 495 debug (Raw) 496 { 497 if (n > dst_a.length) 498 Stdout.formatln("[{}] Read {:X2}{:X2} ({} bytes)", 499 this.iodev.fileHandle, dst_a, dst_b[0 .. n - dst_a.length], n); 500 else 501 Stdout.formatln("[{}] Read {:X2} ({} bytes)", 502 this.iodev.fileHandle, dst_a[0 .. n], n); 503 } 504 } 505 do 506 { 507 // Work around a linker error caused by a druntime packagin bug: The 508 // druntime is by mistake currently not linked with the 509 // core.sys.posix.sys.uio module. 510 static dst_init = iovec.init; 511 iovec[2] dst = dst_init; 512 513 dst[0] = iovec(dst_a.ptr, dst_a.length); 514 dst[1] = iovec(dst_b.ptr, dst_b.length); 515 int fd = this.iodev.fileHandle; 516 return this.transfer( 517 readv(fd, dst.ptr, cast(int)dst.length), Event.EPOLLIN, "readv" 518 ); 519 } 520 521 /*************************************************************************** 522 523 Writes as much data from the I/O device as can be read with one 524 successful `write` call but at most `src.length` bytes, taking the data 525 from `src`. 526 527 Params: 528 src = source buffer 529 530 Returns: 531 the number `n` of bytes written, which were taken from 532 `src[0 .. n]`. 533 534 Throws: 535 IOException if no data were written and won't be later: 536 - IOWarning if a hung-up event was reported for the I/O device, 537 - IOError (IOWarning subclass) if `write` failed with an error 538 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 539 event was reported for the I/O device. 540 541 ***************************************************************************/ 542 543 private size_t deviceWrite ( const(void)[] src ) 544 { 545 debug (Raw) Stdout.formatln( 546 "[{}] Write {:X2} ({} bytes)", this.iodev.fileHandle, 547 src, src.length 548 ); 549 550 if (!this.tcp_cork_status) 551 { 552 // Try enabling TCP Cork. If it fails then TCP Cork is not supported 553 // for the I/O device. 554 this.tcp_cork_status = 555 this.setTcpCork(true, false) 556 ? tcp_cork_status.Enabled 557 : tcp_cork_status.Disabled; 558 } 559 560 return this.transfer(this.iodev.write(src), Event.EPOLLOUT, "write"); 561 } 562 563 /*************************************************************************** 564 565 Sets the TCP_CORK option. Disabling (`enable` = 0) sends all pending 566 data. 567 568 Params: 569 enable = 0 disables TCP_CORK and flushes if previously enabled, a 570 different value enables TCP_CORK. 571 throw_on_error = throw on error rather than returning `false` 572 573 Returns: 574 `true` on success or `false` on error if `throw_on_error` is 575 `false`. 576 577 Throws: 578 `IOError` if the `TCP_CORK` option cannot be set for `socket` and 579 `throw_on_error` is `true`. In practice this can fail only for one 580 of the following reasons: 581 - `socket.fileHandle` does not contain a valid file descriptor 582 (`errno == EBADF`). This is the case if the socket was not 583 created by `socket()` or `accept()` yet. 584 - `socket.fileHandle` does not refer to a socket 585 (`errno == ENOTSOCK`). 586 587 ***************************************************************************/ 588 589 private bool setTcpCork ( int enable, bool throw_on_error = true ) 590 { 591 if (!setsockopt(this.iodev.fileHandle, IPPROTO_TCP, TCP_CORK, 592 &enable, enable.sizeof)) 593 { 594 return true; 595 } 596 else if (throw_on_error) 597 { 598 this.error_e.checkDeviceError("setsockopt(TCP_CORK)"); 599 throw this.error_e.useGlobalErrno("setsockopt(TCP_CORK)"); 600 } 601 else 602 { 603 return false; 604 } 605 } 606 } 607 608 import core.stdc.errno; 609 import ocean.core.Enforce; 610 import ocean.text.util.ClassName; 611 612 /******************************************************************************* 613 614 Utility function to `connect` a socket that is managed by a 615 `TaskSelectTransceiver`. 616 617 Calls `socket_connect` once. If it returns `false`, evaluates `errno` and 618 waits for establishing the connection to complete if needed, suspending the 619 task. 620 621 When calling `socket_connect` the I/O device which was passed to the 622 constructor of `tst` is passed to it via the `socket` parameter. 623 `socket_connect` should call the POSIX `connect` function, passing 624 `socket.fileHandle`, and return `true` on success or `false` on failure, 625 corresponding to `connect` returning 0 or -1, respectively. 626 627 If `socket_connect` returns `true` then this method does nothing but 628 returning 0. Otherwise, if `socket_connect` returns `false` then it does one 629 of the following actions depending on `errno`: 630 - `EINPROGRESS`, `EALREADY`, `EINTR`: Wait for establishing the connection 631 to complete, then return `errno`. 632 - `EISCONN`, 0: Return `errno` (and do nothing else). 633 - All other codes: Throw `IOError`. 634 635 The `Socket` type must to be chosen so that the I/O device passed to the 636 constructor can be cast to it. 637 638 Params: 639 tst = the `TaskSelectTransceiver` instance that manages the 640 socket to connect 641 socket_connect = calls POSIX `connect` 642 643 Returns: 644 - 0 if `socket_connect` returned `true`, 645 - or the initial `errno` code otherwise, if the socket is now 646 connected. 647 648 Throws: 649 `IOError` if `socket_connect` returned `false` and `errno` indicated 650 that the socket connection cannot be established. 651 652 *******************************************************************************/ 653 654 public int connect ( Socket: IODevice ) ( TaskSelectTransceiver tst, 655 scope bool delegate ( Socket socket ) socket_connect ) 656 { 657 auto socket = cast(Socket)tst.iodev; 658 verify(socket !is null, "connect: Unable to cast the I/O " ~ 659 "device from " ~ classname(tst.iodev) ~ " to " ~ Socket.stringof); 660 return connect_(tst, socket_connect(socket)); 661 } 662 663 /******************************************************************************* 664 665 Implements the logic described for `connect`. 666 667 Params: 668 tst = the `TaskSelectTransceiver` instance that manages the 669 socket to connect 670 socket_connect = calls POSIX `connect` 671 672 Returns: 673 See `connect`. 674 675 Throws: 676 See `connect`. 677 678 ******************************************************************************/ 679 680 private int connect_ ( TaskSelectTransceiver tst, lazy bool socket_connect ) 681 { 682 errno = 0; 683 if (socket_connect) 684 return 0; 685 686 auto errnum = errno; 687 688 switch (errnum) 689 { 690 case EINPROGRESS, 691 EALREADY, 692 EINTR: // TODO: Might never be reported, see note below. 693 tst.ioWait(tst.Event.EPOLLOUT); 694 goto case; 695 696 case EISCONN, 0: 697 return errnum; 698 699 default: 700 tst.error_e.checkDeviceError("error establishing connection"); 701 enforce(tst.error_e, false, "error establishing connection"); 702 assert(false); 703 } 704 705 /* The POSIX specification says about connect() failing with EINTR: 706 707 "If connect() is interrupted by a signal that is caught while blocked 708 waiting to establish a connection, connect() shall fail and set errno to 709 EINTR, but the connection request shall not be aborted, and the 710 connection shall be established asynchronously." 711 712 It remains unclear whether a nonblocking connect() can also fail with EINTR 713 or not. Assuming that, if it is possible, it has the same meaning as for 714 blocking connect(), we handle EINTR in the same way as EINPROGRESS. 715 TODO: Remove handling of EINTR or this note when this is clarified. */ 716 } 717 718 version (unittest) 719 { 720 import ocean.io.select.protocol.generic.ErrnoIOException; 721 import ocean.task.Task; 722 import ocean.meta.types.Qualifiers; 723 } 724 725 /// Example of sending and receiving data with the `TaskSelectTransceiver`. 726 unittest 727 { 728 static class IOTaskDemo: Task 729 { 730 TaskSelectTransceiver tst; 731 732 this ( ) 733 { 734 // I/O device to read/write via. In the real world, this would 735 // typically be your socket. 736 IODevice iodev; 737 this.tst = new TaskSelectTransceiver( 738 iodev, new IOWarning(iodev), new IOError(iodev) 739 ); 740 } 741 742 override void run ( ) 743 { 744 // Send a newline-terminated string. 745 this.tst.write("Hello World!\n"); 746 // Send an integer value. 747 this.tst.writeValue(3); 748 this.tst.flush(); 749 750 // Receive a newline-terminated string. 751 { 752 char[] response; 753 this.tst.readConsume( 754 (void[] data) 755 { 756 auto str = cast(char[])data; 757 foreach (i, c; str) 758 { 759 if (c == '\n') 760 { 761 response ~= str[0 .. i]; 762 return i; 763 } 764 } 765 766 response ~= str; 767 return str.length + 1; 768 } 769 ); 770 } 771 772 // Receive an integer value. 773 { 774 int x; 775 this.tst.readValue(x); 776 } 777 } 778 } 779 } 780 781 version (unittest) 782 { 783 import ocean.sys.socket.IPSocket; 784 } 785 786 /// Example of connecting a TCP/IP socket. 787 unittest 788 { 789 static class ConnectIOTaskDemo: Task 790 { 791 TaskSelectTransceiver tst; 792 793 this ( ) 794 { 795 // Create a TCP/IP socket, throwing SocketError on failure. 796 auto socket = new IPSocket!(); 797 auto e = new SocketError(socket); 798 // The `true` parameter makes the socket non-blocking. 799 e.enforce(socket.tcpSocket(true) >= 0, "", "socket"); 800 801 this.tst = new TaskSelectTransceiver( 802 socket, new IOWarning(socket), e 803 ); 804 } 805 806 override void run ( ) 807 { 808 connect(this.tst, 809 (IPSocket!() socket) 810 { 811 // Call `connect` to initiate establishing the connection. 812 // Return `true` if `connect` returns 0 (i.e. succeeded) or 813 // false otherwise. 814 // `connect` will likely return -1 (i.e. fail) and set 815 // `errno = EINPROGRESS`. `this.tst.connect` will then block 816 // this task until establishing the socket connection has 817 // completed. 818 IPSocket!().InetAddress address; 819 return !socket.connect(address("127.0.0.1", 4711)); 820 } 821 ); 822 // Now the socket is connected, and `this.tst` is ready for reading 823 // and writing, as shown in the documented unit test for sending/ 824 // receiving above. 825 } 826 } 827 }