1 /******************************************************************************* 2 3 Performs non-blocking I/O, suspending the current task to wait for the I/O 4 device to be ready if it would have blocked. 5 Because the most common case is using a TCP socket, one TCP-specific 6 facility (TCP Cork) is built into `TaskSelectTransceiver`. The simplicity, 7 convenience (it avoids a custom implementation for output buffering) and 8 frequency of use justifies having it in `TaskSelectTransceiver` rather than 9 a separate class or module. 10 11 Copyright: 12 Copyright (c) 2017 dunnhumby Germany GmbH. 13 All rights reserved. 14 15 License: 16 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 17 Alternatively, this file may be distributed under the terms of the Tango 18 3-Clause BSD License (see LICENSE_BSD.txt for details). 19 20 *******************************************************************************/ 21 22 module ocean.io.select.protocol.task.TaskSelectTransceiver; 23 24 import ocean.core.Verify; 25 import ocean.io.device.IODevice; 26 import ocean.io.select.client.model.ISelectClient; 27 28 /// ditto 29 30 class TaskSelectTransceiver 31 { 32 import ocean.io.select.protocol.task.TaskSelectClient; 33 import ocean.io.select.protocol.task.internal.BufferedReader; 34 35 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR; 36 import ocean.stdc.posix.sys.uio: iovec, readv; 37 import ocean.stdc.posix.sys.socket: setsockopt; 38 import ocean.stdc.posix.netinet.in_: IPPROTO_TCP; 39 40 static if (__VERSION__ >= 2000 && __VERSION__ < 2073) 41 enum { TCP_CORK = 3 } 42 else static if (__VERSION__ >= 2077) 43 import core.sys.linux.netinet.tcp: TCP_CORK; 44 else 45 import core.sys.linux.sys.netinet.tcp: TCP_CORK; 46 47 import ocean.sys.Epoll: epoll_event_t; 48 alias epoll_event_t.Event Event; 49 50 import ocean.io.select.protocol.generic.ErrnoIOException: IOWarning, IOError; 51 52 debug (Raw) import ocean.io.Stdout: Stdout; 53 54 import ocean.core.Enforce: enforce; 55 import ocean.transition; 56 57 /*************************************************************************** 58 59 I/O device 60 61 ***************************************************************************/ 62 63 package IODevice iodev; 64 65 /*************************************************************************** 66 67 Task select client to wait for I/O events 68 69 ***************************************************************************/ 70 71 package TaskSelectClient select_client; 72 73 /*************************************************************************** 74 75 Read buffer manager 76 77 ***************************************************************************/ 78 79 private BufferedReader buffered_reader; 80 81 /*************************************************************************** 82 83 Possible values for the TCP Cork status of the I/O device. `Unknown` 84 means TCP Cork support for the I/O device has not been queried yet, 85 `Disabled` that the I/O device does not support TCP Cork. 86 87 ***************************************************************************/ 88 89 private enum TcpCorkStatus: uint 90 { 91 Unknown = 0, 92 Disabled, 93 Enabled 94 } 95 96 /*************************************************************************** 97 98 The TCP Cork status of the I/O device. TCP Cork is a Linux 99 feature to buffer outgoing data for TCP sockets to minimise the number 100 of TCP frames sent. 101 102 ***************************************************************************/ 103 104 private TcpCorkStatus tcp_cork_status; 105 106 /*************************************************************************** 107 108 Thrown on EOF and remote hangup event 109 110 ***************************************************************************/ 111 112 private IOWarning warning_e; 113 114 /*************************************************************************** 115 116 Thrown on socket I/O error 117 118 ***************************************************************************/ 119 120 private IOError error_e; 121 122 /*************************************************************************** 123 124 Constructor. 125 126 error_e and warning_e may be the same object if distinguishing between 127 error and warning is not required. 128 129 Params: 130 iodev = I/O device 131 warning_e = exception to throw on end-of-flow condition or if 132 the remote hung up 133 error_e = exception to throw on I/O error 134 read_buffer_size = read buffer size 135 136 137 ***************************************************************************/ 138 139 public this ( IODevice iodev, IOWarning warning_e, IOError error_e, 140 size_t read_buffer_size = BufferedReader.default_read_buffer_size ) 141 { 142 this.iodev = iodev; 143 this.warning_e = warning_e; 144 this.error_e = error_e; 145 this.select_client = new TaskSelectClient(iodev, &error_e.error_code); 146 this.buffered_reader = new BufferedReader(read_buffer_size); 147 } 148 149 /*************************************************************************** 150 151 Populates `data` with data read from the I/O device. 152 153 Params: 154 data = destination data buffer 155 156 Throws: 157 IOException if no data were received nor will any arrive later: 158 - IOWarning on end-of-flow condition or if the remote hung up, 159 - IOError (IOWarning subclass) on I/O error. 160 161 ***************************************************************************/ 162 163 public void read ( void[] data ) 164 { 165 if (data.length) 166 this.buffered_reader.readRaw(data, &this.deviceRead); 167 } 168 169 /*************************************************************************** 170 171 Populates `value` with `value.sizeof` bytes read from the I/O device. 172 173 Params: 174 value = reference to a variable to be populated 175 176 Throws: 177 IOException if no data were received nor will any arrive later: 178 - IOWarning on end-of-flow condition or if the remote hung up, 179 - IOError (IOWarning subclass) on I/O error. 180 181 ***************************************************************************/ 182 183 public void readValue ( T ) ( out T value ) 184 { 185 static if (T.sizeof) 186 this.buffered_reader.readRaw((cast(void*)&value)[0 .. value.sizeof], 187 &this.deviceRead); 188 } 189 190 /*************************************************************************** 191 192 Calls `consume` with data read from the I/O device. 193 194 If `consume` feels that the amount of `data` passed to it is sufficient 195 it should return the number of bytes it consumed, which is a value 196 between 0 and `data.length` (inclusive). Otherwise, if `consume` 197 consumed all `data` and still needs more data from the I/O device, it 198 should return a value greater than `data.`length`; it will then called 199 again after more data have been received. 200 201 Params: 202 consume = consumer callback delegate 203 204 Throws: 205 IOException if no data were received nor will any arrive later: 206 - IOWarning on end-of-flow condition or if the remote hung up, 207 - IOError (IOWarning subclass) on I/O error. 208 209 ***************************************************************************/ 210 211 public void readConsume ( scope size_t delegate ( void[] data ) consume ) 212 { 213 this.buffered_reader.readConsume(consume, &this.deviceRead); 214 } 215 216 /*************************************************************************** 217 218 Writes the byte data of `value` to the I/O device. 219 If the I/O device is a TCP socket then the data may be buffered for at 220 most 200ms using the TCP Cork feature of Linux. In this case call 221 `flush()` to write all pending data immediately. 222 223 Params: 224 value = the value of which the byte data to write 225 226 Throws: 227 IOException if no data were sent nor will it be possible later: 228 - IOWarning if the remote hung up, 229 - IOError (IOWarning subclass) on I/O error. 230 231 ***************************************************************************/ 232 233 public void writeValue ( T ) ( in T value ) 234 { 235 static if (T.sizeof) 236 this.write((cast(Const!(void*))&value)[0 .. value.sizeof]); 237 } 238 239 /*************************************************************************** 240 241 Writes `data` to the I/O device. 242 If the I/O device is a TCP socket then `data` may be buffered for at 243 most 200ms using the TCP Cork feature of Linux. In this case call 244 `flush()` to write all pending data immediately. 245 246 Params: 247 data = data to write 248 249 Throws: 250 IOException if no data were sent nor will it be possible later: 251 - IOWarning if the remote hung up, 252 - IOError (IOWarning subclass) on I/O error. 253 254 ***************************************************************************/ 255 256 public void write ( Const!(void)[] data ) 257 { 258 while (data.length) 259 data = data[this.deviceWrite(data) .. $]; 260 } 261 262 /*************************************************************************** 263 264 Sends all pending output data immediately. Calling this method has an 265 effect only if the I/O device is a TCP socket. 266 267 Throws: 268 IOError on I/O error. 269 270 ***************************************************************************/ 271 272 public void flush ( ) 273 { 274 if (this.tcp_cork_status == tcp_cork_status.Enabled) 275 { 276 this.setTcpCork(false); 277 this.setTcpCork(true); 278 } 279 } 280 281 /*************************************************************************** 282 283 Removes any remaining data from I/O buffers, sends any pending output 284 data immediately if possible, and removes the epoll registration of the 285 I/O device, if any. 286 287 You need to call this method if you close and then reopen or otherwise 288 reassign the I/O device's file descriptor *without* suspending and 289 resuming or terminating and restarting the task in between. You may call 290 this method at any time between the last time you read from the old and 291 the first time you read from the new device. 292 293 This method does not throw. It is safe to call it if the I/O device is 294 not (yet or any more) usable. 295 296 ***************************************************************************/ 297 298 public void reset ( ) 299 { 300 this.buffered_reader.reset(); 301 this.select_client.unregister(); 302 303 if (this.tcp_cork_status == tcp_cork_status.Enabled) 304 this.setTcpCork(false, false); 305 306 this.tcp_cork_status = tcp_cork_status.Unknown; 307 } 308 309 /*************************************************************************** 310 311 Calls `io_op` until it returns a positive value. Waits for `wait_event` 312 if `io_op` fails with `EAGAIN` or `EWOULDBLOCK`. 313 314 `io_op` should behave like POSIX `read/write` and return 315 - the non-zero number of bytes read or written on success or 316 - 0 on end-of-flow condition or 317 - a negative value on error and set `errno` accordingly. 318 319 Params: 320 io_op = I/O operation 321 wait_event = the event to wait for if `io_op` fails with 322 `EAGAIN/EWOULDBLOCK` 323 opname = the name of the I/O operation for error messages 324 325 Returns: 326 the number of bytes read or written by `io_op`. 327 328 Throws: 329 IOException if no data were transmitted and won't be later: 330 - IOWarning on end-of-flow condition or if a hung-up event was 331 reported for the I/O device, 332 - IOError (IOWarning subclass) if `io_op` failed with an error 333 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 334 event was reported for the I/O device. 335 336 Note: POSIX says the following about the return value of `read`: 337 338 When attempting to read from an empty pipe or FIFO [remark: includes 339 sockets]: 340 341 - If no process has the pipe open for writing, read() shall return 0 342 to indicate end-of-file. 343 344 - If some process has the pipe open for writing and O_NONBLOCK is 345 set, read() shall return -1 and set errno to [EAGAIN]. 346 347 - If some process has the pipe open for writing and O_NONBLOCK is 348 clear, read() shall block the calling thread until some data is 349 written or the pipe is closed by all processes that had the pipe 350 open for writing. 351 352 @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html 353 354 ***************************************************************************/ 355 356 private size_t transfer ( lazy iodev.ssize_t io_op, Event wait_event, istring opname ) 357 out (n) 358 { 359 assert(n > 0); 360 } 361 body 362 { 363 // Prevent misinformation if an error happens that is not detected by 364 // io_op, such as a socket error reported by getsockopt(SO_ERROR) or an 365 // epoll event like EPOLLHUP or EPOLLERR. 366 errno = 0; 367 iodev.ssize_t n; 368 369 // First call io_op. If io_op fails with EAGAIN/EWOULDBLOCK, enter a 370 // loop, waiting for EPOLLIN and calliing io_op again, until 371 // - io_op succeeds (i.e. returns a positive value) or 372 // - io_op reports EOF (i.e. returns 0; only read() does that) or 373 // - io_op fails (i.e. returns a negative value) with errno other 374 // than EAGAIN/EWOULDBLOCK (or EINTR, see below) or 375 // - epoll reports EPOLLHUP or EPOLLERR. 376 for (n = io_op; n <= 0; n = io_op) 377 { 378 enforce(this.warning_e, n, "end of flow whilst reading"); 379 switch (errno) 380 { 381 case EAGAIN: 382 static if (EAGAIN != EWOULDBLOCK) 383 { 384 case EWOULDBLOCK: 385 } 386 this.ioWait(wait_event); 387 break; 388 389 default: 390 this.error_e.checkDeviceError("I/O error"); 391 throw this.error_e.useGlobalErrno(opname); 392 393 case EINTR: 394 // io_op was interrupted by a signal before data were read 395 // or written. May not be possible with non-blocking I/O, 396 // but neither POSIX nor Linux documentation clarifies that, 397 // so handle it by calling io_op again to be safe. 398 } 399 } 400 401 return n; 402 } 403 404 /*************************************************************************** 405 406 Suspends the current task until epoll reports any of the events in 407 `wait_event` for the I/O device or the I/O device times out. 408 409 Params: 410 events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR` 411 are always implicitly added) 412 413 Returns: 414 the events reported by epoll. 415 416 Throws: 417 - `IOWarning` on `EPOLLHUP`, 418 - `IOError` on `EPOLLERR`, 419 - `EpollException` if registering with epoll failed, 420 - `TimeoutException` on timeout waiting for I/O events. 421 422 ***************************************************************************/ 423 424 private Event ioWait ( Event wait_event ) 425 { 426 Event events = this.select_client.ioWait(wait_event); 427 enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up"); 428 429 if (events & events.EPOLLERR) 430 { 431 this.error_e.checkDeviceError("epoll reported I/O device error"); 432 enforce(this.error_e, false, "epoll reported I/O device error"); 433 assert(false); 434 } 435 else 436 return events; 437 } 438 439 /*************************************************************************** 440 441 Reads as much data from the I/O device as can be read with one 442 successful `read` call but at most `dst.length` bytes, and stores the 443 data in `dst`. 444 445 Params: 446 dst = destination buffer 447 448 Returns: 449 the number `n` of bytes read, which are stored in `dst[0 .. n]`. 450 451 Throws: 452 IOException if no data were received and won't arrive later: 453 - IOWarning on end-of-flow condition or if a hung-up event was 454 reported for the I/O device, 455 - IOError (IOWarning subclass) if `read` failed with an error 456 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 457 event was reported for the I/O device. 458 459 ***************************************************************************/ 460 461 private size_t deviceRead ( void[] dst ) 462 out (n) 463 { 464 debug (Raw) Stdout.formatln( 465 "[{}] Read {:X2} ({} bytes)", this.iodev.fileHandle, dst[0 .. n], n 466 ); 467 } 468 body 469 { 470 return this.transfer(this.iodev.read(dst), Event.EPOLLIN, "read"); 471 } 472 473 /*************************************************************************** 474 475 Reads as much data from the I/O device as can be read with one 476 successful `readv` call but at most `dst_a.length + dst_b.length` bytes, 477 and stores the data in `dst_a` and `dst_b`. 478 479 Params: 480 dst_a = first destination buffer 481 dst_b = second destination buffer 482 483 Returns: 484 the number `n` of bytes read, which are stored in 485 - `dst_a[0 .. n]` if `n <= dst_a.length` or 486 - `dst_a` and `dst_b[0 .. n - dst_a.length]` if `n > dst_a.length`. 487 488 Throws: 489 IOException if no data were received and won't arrive later: 490 - IOWarning on end-of-flow condition or if a hung-up event was 491 reported for the I/O device, 492 - IOError (IOWarning subclass) if `readv` failed with an error 493 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 494 event was reported for the I/O device. 495 496 ***************************************************************************/ 497 498 private size_t deviceRead ( void[] dst_a, void[] dst_b ) 499 out (n) 500 { 501 debug (Raw) 502 { 503 if (n > dst_a.length) 504 Stdout.formatln("[{}] Read {:X2}{:X2} ({} bytes)", 505 this.iodev.fileHandle, dst_a, dst_b[0 .. n - dst_a.length], n); 506 else 507 Stdout.formatln("[{}] Read {:X2} ({} bytes)", 508 this.iodev.fileHandle, dst_a[0 .. n], n); 509 } 510 } 511 body 512 { 513 // Work around a linker error caused by a druntime packagin bug: The 514 // druntime is by mistake currently not linked with the 515 // core.sys.posix.sys.uio module. 516 static dst_init = iovec.init; 517 iovec[2] dst = dst_init; 518 519 dst[0] = iovec(dst_a.ptr, dst_a.length); 520 dst[1] = iovec(dst_b.ptr, dst_b.length); 521 int fd = this.iodev.fileHandle; 522 return this.transfer( 523 readv(fd, dst.ptr, cast(int)dst.length), Event.EPOLLIN, "readv" 524 ); 525 } 526 527 /*************************************************************************** 528 529 Writes as much data from the I/O device as can be read with one 530 successful `write` call but at most `src.length` bytes, taking the data 531 from `src`. 532 533 Params: 534 src = source buffer 535 536 Returns: 537 the number `n` of bytes written, which were taken from 538 `src[0 .. n]`. 539 540 Throws: 541 IOException if no data were written and won't be later: 542 - IOWarning if a hung-up event was reported for the I/O device, 543 - IOError (IOWarning subclass) if `write` failed with an error 544 other than `EAGAIN`, `EWOULDBLOCK` or `EINTR` or if an error 545 event was reported for the I/O device. 546 547 ***************************************************************************/ 548 549 private size_t deviceWrite ( Const!(void)[] src ) 550 { 551 debug (Raw) Stdout.formatln( 552 "[{}] Write {:X2} ({} bytes)", this.iodev.fileHandle, 553 src, src.length 554 ); 555 556 if (!this.tcp_cork_status) 557 { 558 // Try enabling TCP Cork. If it fails then TCP Cork is not supported 559 // for the I/O device. 560 this.tcp_cork_status = 561 this.setTcpCork(true, false) 562 ? tcp_cork_status.Enabled 563 : tcp_cork_status.Disabled; 564 } 565 566 return this.transfer(this.iodev.write(src), Event.EPOLLOUT, "write"); 567 } 568 569 /*************************************************************************** 570 571 Sets the TCP_CORK option. Disabling (`enable` = 0) sends all pending 572 data. 573 574 Params: 575 enable = 0 disables TCP_CORK and flushes if previously enabled, a 576 different value enables TCP_CORK. 577 throw_on_error = throw on error rather than returning `false` 578 579 Returns: 580 `true` on success or `false` on error if `throw_on_error` is 581 `false`. 582 583 Throws: 584 `IOError` if the `TCP_CORK` option cannot be set for `socket` and 585 `throw_on_error` is `true`. In practice this can fail only for one 586 of the following reasons: 587 - `socket.fileHandle` does not contain a valid file descriptor 588 (`errno == EBADF`). This is the case if the socket was not 589 created by `socket()` or `accept()` yet. 590 - `socket.fileHandle` does not refer to a socket 591 (`errno == ENOTSOCK`). 592 593 ***************************************************************************/ 594 595 private bool setTcpCork ( int enable, bool throw_on_error = true ) 596 { 597 if (!setsockopt(this.iodev.fileHandle, IPPROTO_TCP, TCP_CORK, 598 &enable, enable.sizeof)) 599 { 600 return true; 601 } 602 else if (throw_on_error) 603 { 604 this.error_e.checkDeviceError("setsockopt(TCP_CORK)"); 605 throw this.error_e.useGlobalErrno("setsockopt(TCP_CORK)"); 606 } 607 else 608 { 609 return false; 610 } 611 } 612 } 613 614 import core.stdc.errno; 615 import ocean.core.Enforce; 616 import ocean.text.util.ClassName; 617 618 /******************************************************************************* 619 620 Utility function to `connect` a socket that is managed by a 621 `TaskSelectTransceiver`. 622 623 Calls `socket_connect` once. If it returns `false`, evaluates `errno` and 624 waits for establishing the connection to complete if needed, suspending the 625 task. 626 627 When calling `socket_connect` the I/O device which was passed to the 628 constructor of `tst` is passed to it via the `socket` parameter. 629 `socket_connect` should call the POSIX `connect` function, passing 630 `socket.fileHandle`, and return `true` on success or `false` on failure, 631 corresponding to `connect` returning 0 or -1, respectively. 632 633 If `socket_connect` returns `true` then this method does nothing but 634 returning 0. Otherwise, if `socket_connect` returns `false` then it does one 635 of the following actions depending on `errno`: 636 - `EINPROGRESS`, `EALREADY`, `EINTR`: Wait for establishing the connection 637 to complete, then return `errno`. 638 - `EISCONN`, 0: Return `errno` (and do nothing else). 639 - All other codes: Throw `IOError`. 640 641 The `Socket` type must to be chosen so that the I/O device passed to the 642 constructor can be cast to it. 643 644 Params: 645 tst = the `TaskSelectTransceiver` instance that manages the 646 socket to connect 647 socket_connect = calls POSIX `connect` 648 649 Returns: 650 - 0 if `socket_connect` returned `true`, 651 - or the initial `errno` code otherwise, if the socket is now 652 connected. 653 654 Throws: 655 `IOError` if `socket_connect` returned `false` and `errno` indicated 656 that the socket connection cannot be established. 657 658 *******************************************************************************/ 659 660 public int connect ( Socket: IODevice ) ( TaskSelectTransceiver tst, 661 scope bool delegate ( Socket socket ) socket_connect ) 662 { 663 auto socket = cast(Socket)tst.iodev; 664 verify(socket !is null, "connect: Unable to cast the I/O " ~ 665 "device from " ~ classname(tst.iodev) ~ " to " ~ Socket.stringof); 666 return connect_(tst, socket_connect(socket)); 667 } 668 669 /******************************************************************************* 670 671 Implements the logic described for `connect`. 672 673 Params: 674 tst = the `TaskSelectTransceiver` instance that manages the 675 socket to connect 676 socket_connect = calls POSIX `connect` 677 678 Returns: 679 See `connect`. 680 681 Throws: 682 See `connect`. 683 684 ******************************************************************************/ 685 686 private int connect_ ( TaskSelectTransceiver tst, lazy bool socket_connect ) 687 { 688 errno = 0; 689 if (socket_connect) 690 return 0; 691 692 auto errnum = errno; 693 694 switch (errnum) 695 { 696 case EINPROGRESS, 697 EALREADY, 698 EINTR: // TODO: Might never be reported, see note below. 699 tst.ioWait(tst.Event.EPOLLOUT); 700 goto case; 701 702 case EISCONN, 0: 703 return errnum; 704 705 default: 706 tst.error_e.checkDeviceError("error establishing connection"); 707 enforce(tst.error_e, false, "error establishing connection"); 708 assert(false); 709 } 710 711 /* The POSIX specification says about connect() failing with EINTR: 712 713 "If connect() is interrupted by a signal that is caught while blocked 714 waiting to establish a connection, connect() shall fail and set errno to 715 EINTR, but the connection request shall not be aborted, and the 716 connection shall be established asynchronously." 717 718 It remains unclear whether a nonblocking connect() can also fail with EINTR 719 or not. Assuming that, if it is possible, it has the same meaning as for 720 blocking connect(), we handle EINTR in the same way as EINPROGRESS. 721 TODO: Remove handling of EINTR or this note when this is clarified. */ 722 } 723 724 version (UnitTest) 725 { 726 import ocean.io.select.protocol.generic.ErrnoIOException; 727 import ocean.task.Task; 728 import ocean.transition; 729 } 730 731 /// Example of sending and receiving data with the `TaskSelectTransceiver`. 732 unittest 733 { 734 static class IOTaskDemo: Task 735 { 736 TaskSelectTransceiver tst; 737 738 this ( ) 739 { 740 // I/O device to read/write via. In the real world, this would 741 // typically be your socket. 742 IODevice iodev; 743 this.tst = new TaskSelectTransceiver( 744 iodev, new IOWarning(iodev), new IOError(iodev) 745 ); 746 } 747 748 override void run ( ) 749 { 750 // Send a newline-terminated string. 751 this.tst.write("Hello World!\n"); 752 // Send an integer value. 753 this.tst.writeValue(3); 754 this.tst.flush(); 755 756 // Receive a newline-terminated string. 757 { 758 char[] response; 759 this.tst.readConsume( 760 (void[] data) 761 { 762 auto str = cast(char[])data; 763 foreach (i, c; str) 764 { 765 if (c == '\n') 766 { 767 response ~= str[0 .. i]; 768 return i; 769 } 770 } 771 772 response ~= str; 773 return str.length + 1; 774 } 775 ); 776 } 777 778 // Receive an integer value. 779 { 780 int x; 781 this.tst.readValue(x); 782 } 783 } 784 } 785 } 786 787 version (UnitTest) 788 { 789 import ocean.sys.socket.IPSocket; 790 } 791 792 /// Example of connecting a TCP/IP socket. 793 unittest 794 { 795 static class ConnectIOTaskDemo: Task 796 { 797 TaskSelectTransceiver tst; 798 799 this ( ) 800 { 801 // Create a TCP/IP socket, throwing SocketError on failure. 802 auto socket = new IPSocket!(); 803 auto e = new SocketError(socket); 804 // The `true` parameter makes the socket non-blocking. 805 e.enforce(socket.tcpSocket(true) >= 0, "", "socket"); 806 807 this.tst = new TaskSelectTransceiver( 808 socket, new IOWarning(socket), e 809 ); 810 } 811 812 override void run ( ) 813 { 814 connect(this.tst, 815 (IPSocket!() socket) 816 { 817 // Call `connect` to initiate establishing the connection. 818 // Return `true` if `connect` returns 0 (i.e. succeeded) or 819 // false otherwise. 820 // `connect` will likely return -1 (i.e. fail) and set 821 // `errno = EINPROGRESS`. `this.tst.connect` will then block 822 // this task until establishing the socket connection has 823 // completed. 824 IPSocket!().InetAddress address; 825 return !socket.connect(address("127.0.0.1", 4711)); 826 } 827 ); 828 // Now the socket is connected, and `this.tst` is ready for reading 829 // and writing, as shown in the documented unit test for sending/ 830 // receiving above. 831 } 832 } 833 }