1 /******************************************************************************* 2 3 Base class for a non-blocking socket connection select client using a 4 fiber/coroutine to suspend operation while waiting for the connection to be 5 established and resume on that event (a Write event signifies that the 6 connection has been established). 7 8 Copyright: 9 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 10 All rights reserved. 11 12 License: 13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 14 Alternatively, this file may be distributed under the terms of the Tango 15 3-Clause BSD License (see LICENSE_BSD.txt for details). 16 17 *******************************************************************************/ 18 19 module ocean.io.select.protocol.fiber.FiberSocketConnection; 20 21 22 23 24 import ocean.transition; 25 26 import ocean.core.Verify; 27 28 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 29 30 import ocean.io.select.EpollSelectDispatcher; 31 32 import ocean.core.Array : copy; 33 34 import ocean.sys.socket.AddressIPSocket, 35 ocean.sys.socket.InetAddress, 36 ocean.sys.socket.IPSocket: IIPSocket; 37 38 39 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError; 40 41 import ocean.stdc.posix.sys.socket: SOL_SOCKET, IPPROTO_TCP, SO_KEEPALIVE; 42 43 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN; 44 45 debug ( EpollTiming ) import ocean.time.StopWatch; 46 debug ( ISelectClient ) 47 { 48 import ocean.io.Stdout : Stderr; 49 import ocean.text.util.StringC; 50 } 51 52 53 54 public class FiberSocketConnection ( bool IPv6 = false ) : IFiberSocketConnection 55 { 56 /************************************************************************** 57 58 Alias of the address struct type. 59 60 **************************************************************************/ 61 62 alias .InetAddress!(IPv6) InetAddress; 63 64 /************************************************************************** 65 66 Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false) 67 or sockaddr_in6 for IPv6 (IPv6 = true). 68 69 **************************************************************************/ 70 71 alias InetAddress.Addr InAddr; 72 73 /************************************************************************** 74 75 Socket 76 77 **************************************************************************/ 78 79 alias AddressIPSocket!(IPv6) IPSocket; 80 81 protected IPSocket socket; 82 83 /************************************************************************** 84 85 Constructor. 86 87 Params: 88 socket = IPSocket instance to use internally 89 fiber = fiber to be suspended when socket connection does not 90 immediately succeed or fail 91 92 **************************************************************************/ 93 94 public this ( IPSocket socket, SelectFiber fiber ) 95 { 96 this.socket = socket; 97 98 super(this.socket, fiber); 99 } 100 101 /************************************************************************** 102 103 Constructor. 104 105 warning_e and socket_error may be the same object. 106 107 Params: 108 socket = IPSocket instance to use internally 109 fiber = fiber to be suspended when socket connection does not 110 immediately succeed or fail 111 warning_e = exception to be thrown when the remote hung up 112 socket_error = exception to be thrown on socket error 113 114 **************************************************************************/ 115 116 public this ( IPSocket socket, SelectFiber fiber, 117 IOWarning warning_e, SocketError socket_error ) 118 { 119 super(this.socket = socket, fiber, warning_e, socket_error); 120 } 121 122 /************************************************************************** 123 124 Attempts to connect to the remote host, suspending the fiber if 125 establishing the connection does not immediately succeed or fail. If a 126 connection to the same address and port is already established, the 127 Already flag is set in the return value. If a connection to a different 128 address and port is already established, this connection is closed and a 129 new connection is opened. 130 131 Params: 132 address = remote IP address 133 port = remote TCP port 134 force = false: don't call connect() if currently connected to the 135 same address and port; true: always call connect() 136 Returns: 137 ConnectionStatus.Connected if the connection was newly established 138 or ConnectionStatus.Connected | ConnectionStatus.Already if the 139 connection was already established. 140 141 Throws: 142 - SocketError (IOException) on fatal I/O error, 143 - IOWarning if the remote hung up. 144 145 **************************************************************************/ 146 147 override public ConnectionStatus connect ( cstring address, ushort port, bool force = false ) 148 { 149 if (!this.sameAddress(address, port)) 150 { 151 this.disconnect(); 152 } 153 154 return this.connect_(!this.socket.connect(address, port), force); 155 } 156 157 /*************************************************************************** 158 159 Ditto. 160 161 Params: 162 address = remote address 163 force = false: don't call connect() if currently connected to the 164 same address and port; true: always call connect() 165 166 Returns: 167 see connect() above. 168 169 ***************************************************************************/ 170 171 public ConnectionStatus connect ( InAddr address, bool force = false ) 172 { 173 if (this.in_addr != address) 174 { 175 this.disconnect(); 176 } 177 178 return this.connect_(!this.socket.connect(address), force); 179 } 180 181 /************************************************************************** 182 183 Returns: 184 the remote address of the connected socket, or the last attempted 185 connection, or an empty address if no connection has been attempted 186 187 **************************************************************************/ 188 189 public InAddr in_addr ( ) 190 { 191 return this.connected_? this.socket.in_addr : InetAddress.addr_init; 192 } 193 194 /*************************************************************************** 195 196 Returns: 197 the remote IP address of the connected socket, or the last attempted 198 connection, or "" if no connection has been attempted 199 200 ***************************************************************************/ 201 202 override protected cstring address_ ( ) 203 { 204 return this.socket.address; 205 } 206 207 /*************************************************************************** 208 209 Returns: 210 the remote TCP port of the connected socket, or the last attempted 211 connection, or ushort.init if no connection has been attempted 212 213 ***************************************************************************/ 214 215 override protected ushort port_ ( ) 216 { 217 return this.socket.port; 218 } 219 220 /*************************************************************************** 221 222 Compares ip_address_str and port with the current address and port. 223 224 Params: 225 ip_address_str = string with the IP address to compare with the 226 current address 227 port = port to compare with the current 228 229 Returns: 230 true if ip_address_str and port are the same as the current or false 231 if not. 232 233 Throws: 234 SocketError if ip_address_str does not contain a valid IP address. 235 236 ***************************************************************************/ 237 238 public bool sameAddress ( InAddr addr ) 239 { 240 return addr == this.socket.in_addr; 241 } 242 243 /*************************************************************************** 244 245 Compares ip_address_str and port with the current address and port. 246 247 Params: 248 ip_address_str = string with the IP address to compare with the 249 current address 250 port = port to compare with the current 251 252 Returns: 253 true if ip_address_str and port are the same as the current or false 254 if not. 255 256 Throws: 257 SocketError if ip_address_str does not contain a valid IP address. 258 259 ***************************************************************************/ 260 261 public bool sameAddress ( cstring ip_address_str, ushort port ) 262 { 263 InetAddress address; 264 265 this.socket_error.enforce(address.inet_pton(ip_address_str) == 1, 266 "invalid IP address"); 267 268 address.port = port; 269 270 return this.sameAddress(address.addr); 271 } 272 } 273 274 /// 275 unittest 276 { 277 alias FiberSocketConnection!(true) IPV6; 278 alias FiberSocketConnection!(false) IPV4; 279 } 280 281 282 public class IFiberSocketConnection : IFiberSelectProtocol 283 { 284 /************************************************************************** 285 286 This alias for chainable methods 287 288 **************************************************************************/ 289 290 public alias typeof (this) This; 291 292 /************************************************************************** 293 294 Connection status as returned by connect() and disconnect(). 295 296 **************************************************************************/ 297 298 public enum ConnectionStatus : uint 299 { 300 Disconnected = 0, 301 Connected = 1 << 0, 302 Already = 1 << 1 303 } 304 305 /************************************************************************** 306 307 Socket 308 309 **************************************************************************/ 310 311 protected IIPSocket socket; 312 313 /************************************************************************** 314 315 Socket error exception 316 317 **************************************************************************/ 318 319 private SocketError socket_error; 320 321 /************************************************************************** 322 323 Current connection status 324 325 **************************************************************************/ 326 327 protected bool connected_ = false; 328 329 /************************************************************************** 330 331 Count of the number of times transmit() has been invoked since the call 332 to super.transmitLoop. 333 334 **************************************************************************/ 335 336 private uint transmit_calls; 337 338 /************************************************************************** 339 340 Delegate which is called (in EpollTiming debug mode) after a socket 341 connection is established. 342 343 FIXME: the logging of connection times was intended to be done directly 344 in this module, not via a delegate, but dmd bugs with varargs made this 345 impossible. The delegate solution is ok though. 346 347 **************************************************************************/ 348 349 debug ( EpollTiming ) 350 { 351 private alias void delegate ( ulong microsec ) ConnectionTimeDg; 352 public ConnectionTimeDg connection_time_dg; 353 } 354 355 /************************************************************************** 356 357 Constructor. 358 359 warning_e and socket_error may be the same object. 360 361 Params: 362 socket = IPSocket instance to use internally 363 fiber = fiber to be suspended when socket connection does not 364 immediately succeed or fail 365 warning_e = exception to be thrown when the remote hung up 366 socket_error = exception to be thrown on socket error 367 368 **************************************************************************/ 369 370 protected this ( IIPSocket socket, SelectFiber fiber, 371 IOWarning warning_e, SocketError socket_error ) 372 { 373 this.socket = socket; 374 375 this.socket_error = socket_error; 376 377 super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error); 378 } 379 380 /************************************************************************** 381 382 Constructor. 383 384 Params: 385 socket = IPSocket instance to use internally 386 fiber = fiber to be suspended when socket connection does not 387 immediately succeed or fail 388 389 **************************************************************************/ 390 391 protected this ( IIPSocket socket, SelectFiber fiber ) 392 { 393 this(socket, fiber, new IOWarning(socket), new SocketError(socket)); 394 } 395 396 /************************************************************************** 397 398 Returns: 399 true if the socket is currently connected or false if not. 400 401 **************************************************************************/ 402 403 public bool connected ( ) 404 { 405 return this.connected_; 406 } 407 408 /*************************************************************************** 409 410 Returns: 411 the IP address of the connected socket, or the last attempted 412 connection, or "" if no connection has been attempted 413 414 ***************************************************************************/ 415 416 public cstring address ( ) 417 { 418 return this.connected_? this.address_ : ""; 419 } 420 421 /*************************************************************************** 422 423 Returns: 424 the TCP port of the connected socket, or the last attempted 425 connection, or ushort.init if no connection has been attempted 426 427 ***************************************************************************/ 428 429 public ushort port ( ) 430 { 431 return this.connected_? this.port_ : ushort.init; 432 } 433 434 /************************************************************************** 435 436 Attempts to connect to the remote host, suspending the fiber if 437 establishing the connection does not immediately succeed or fail. If a 438 connection to the same address and port is already established, the 439 Already flag is set in the return value. If a connection to a different 440 address and port is already established, this connection is closed and a 441 new connection is opened. 442 443 Params: 444 address = remote IP address 445 port = remote TCP port 446 force = false: don't call connect() if currently connected to the 447 same address and port; true: always call connect() 448 449 Returns: 450 ConnectionStatus.Connected if the connection was newly established 451 or ConnectionStatus.Connected | ConnectionStatus.Already if the 452 connection was already established. 453 454 Throws: 455 - SocketError (IOException) on fatal I/O error, 456 - IOWarning if the remote hung up. 457 458 **************************************************************************/ 459 460 abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false ); 461 462 /************************************************************************** 463 464 Disconnects from provided address (if connected). 465 466 Params: 467 force = false: disconnect only if connected; true: force disconnect 468 469 Returns: 470 the connection status before disconnecting. 471 472 **************************************************************************/ 473 474 public ConnectionStatus disconnect ( bool force = false ) 475 out 476 { 477 assert (!this.connected_); 478 } 479 body 480 { 481 if (this.connected_ || force) 482 { 483 this.onDisconnect(); 484 this.socket.shutdown(); 485 this.socket.close(); 486 } 487 488 scope (exit) this.connected_ = false; 489 490 with (ConnectionStatus) return this.connected_? 491 Disconnected : 492 Already | Disconnected; 493 } 494 495 /************************************************************************** 496 497 Establishes a non-blocking socket connection according to the POSIX 498 specification for connect(): 499 500 "If the connection cannot be established immediately and O_NONBLOCK 501 is set for the file descriptor for the socket, connect() shall fail 502 and set errno to [EINPROGRESS], but the connection request shall not 503 be aborted, and the connection shall be established asynchronously. 504 [...] 505 When the connection has been established asynchronously, select() 506 and poll() shall indicate that the file descriptor for the socket is 507 ready for writing." 508 509 Calls connect_syscall, which should forward to connect() to establish a 510 connection. If connect_syscall returns false, errno is evaluated to 511 obtain the connection status. 512 If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so 513 that this method returns when the socket is ready for writing or throws 514 when a connection error was detected. 515 516 Params: 517 connect_syscall = should call connect() and return true if connect() 518 returns 0 or false otherwise 519 520 force = false: don't call connect_syscall if currently 521 connected to the same address and port; true: 522 always call connect_syscall 523 524 Returns: 525 - ConnectionStatus.Connected if the connection was newly 526 established, either because connect_syscall returned true or after 527 the socket became ready for writing, 528 - ConnectionStatus.Connected | ConnectionStatus.Already if connect() 529 failed with EISCONN. 530 531 Throws: 532 - SocketError (IOException) if connect_syscall fails with an error 533 other than EINPROGRESS/EINTR or EISCONN or if a socket error was 534 detected, 535 - IOWarning if the remote hung up. 536 537 Out: 538 The socket is connected, the returned status is never Disconnected. 539 540 Note: The POSIX specification says about connect() failing with EINTR: 541 542 "If connect() is interrupted by a signal that is caught while 543 blocked waiting to establish a connection, connect() shall fail and 544 set errno to EINTR, but the connection request shall not be aborted, 545 and the connection shall be established asynchronously." 546 547 It remains unclear whether a nonblocking connect() can also fail with 548 EINTR or not. Assuming that, if it is possible, it has the same meaning 549 as for blocking connect(), we handle EINTR in the same way as 550 EINPROGRESS. TODO: Remove handling of EINTR or this note when this is 551 clarified. 552 553 **************************************************************************/ 554 555 protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force ) 556 out (status) 557 { 558 assert (this.connected_); 559 assert (status); 560 } 561 body 562 { 563 // Create a socket if it is currently closed. 564 if (this.socket.fileHandle < 0) 565 { 566 this.connected_ = false; 567 568 this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0, 569 "error creating socket", __FILE__, __LINE__); 570 571 this.initSocket(); 572 } 573 574 if (!this.connected_ || force) 575 { 576 debug ( EpollTiming ) 577 { 578 StopWatch sw; 579 sw.start; 580 581 scope ( success ) 582 { 583 if ( this.connection_time_dg ) 584 { 585 this.connection_time_dg(sw.microsec); 586 } 587 } 588 } 589 590 if ( connect_syscall ) 591 { 592 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket", 593 this.address, this.port).flush(); 594 this.connected_ = true; 595 return ConnectionStatus.Connected; 596 } 597 else 598 { 599 int errnum = .errno; 600 601 debug ( ISelectClient ) 602 { 603 import core.stdc.string : strerror; 604 Stderr.formatln("[{}:{}]: {}", 605 this.address_, this.port_, StringC.toDString( 606 strerror(errnum))).flush(); 607 } 608 609 switch (errnum) 610 { 611 case EISCONN: 612 this.connected_ = true; 613 return ConnectionStatus.Already; 614 615 case EINTR, // TODO: Might never be reported, see note above. 616 EINPROGRESS, 617 EALREADY: 618 debug ( ISelectClient ) 619 { 620 Stderr.formatln("[{}:{}]: waiting for the socket to become writable", 621 this.address_, this.port_).flush(); 622 623 scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable", 624 this.address_, this.port_).flush(); 625 626 scope (success) Stderr.formatln("[{}:{}]: socket has become writable", 627 this.address_, this.port_).flush(); 628 } 629 this.transmit_calls = 0; 630 this.transmitLoop(); 631 this.connected_ = true; 632 return ConnectionStatus.Connected; 633 634 default: 635 throw this.socket_error.setSock(errnum, 636 "error establishing connection", __FILE__, __LINE__); 637 } 638 } 639 } 640 else 641 { 642 return ConnectionStatus.Already; 643 } 644 } 645 646 /*************************************************************************** 647 648 Returns: 649 the IP address of the connected socket, or the last attempted 650 connection, or "" if no connection has been attempted 651 652 ***************************************************************************/ 653 654 abstract protected cstring address_ ( ); 655 656 /*************************************************************************** 657 658 Returns: 659 the TCP port of the connected socket, or the last attempted 660 connection, or ushort.init if no connection has been attempted 661 662 ***************************************************************************/ 663 664 abstract protected ushort port_ ( ); 665 666 /*************************************************************************** 667 668 Called just before the socket is connected. The base class 669 implementation does nothing, but derived classes may override to add any 670 desired initialisation logic. 671 672 ***************************************************************************/ 673 674 protected void initSocket ( ) 675 { 676 } 677 678 /************************************************************************** 679 680 Disconnection cleanup handler for a subclass 681 682 **************************************************************************/ 683 684 protected void onDisconnect ( ) 685 { 686 } 687 688 /*************************************************************************** 689 690 Called from super.transmitLoop() in two circumstances: 691 1. Upon the initial call to transmitLoop() in connect(), above. 692 2. After an epoll wait, upon receipt of one or more registered 693 events. 694 695 Params: 696 events = events reported for socket 697 698 Returns: 699 Upon first invocation (which occurs automatically when 700 super.transmitLoop() is called, in connect(), above): 701 * false if connect() returned a code denoting either an error or 702 a successful connection (meaning there's no need to go into 703 epoll wait). 704 * true otherwise, to go into epoll wait. 705 706 Upon second invocation: 707 * false to not return to epoll wait. 708 709 ***************************************************************************/ 710 711 protected override bool transmit ( Event events ) 712 { 713 verify(this.transmit_calls <= 1); 714 715 scope ( exit ) this.transmit_calls++; 716 717 if ( this.transmit_calls > 0 ) 718 { 719 this.warning_e.enforce(!(events & Event.EPOLLHUP), 720 "Hangup on connect"); 721 return false; 722 } 723 else 724 { 725 return true; 726 } 727 } 728 }