1 /******************************************************************************* 2 3 Base class for a non-blocking socket connection select client using a 4 fiber/coroutine to suspend operation while waiting for the connection to be 5 established and resume on that event (a Write event signifies that the 6 connection has been established). 7 8 Copyright: 9 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 10 All rights reserved. 11 12 License: 13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 14 Alternatively, this file may be distributed under the terms of the Tango 15 3-Clause BSD License (see LICENSE_BSD.txt for details). 16 17 *******************************************************************************/ 18 19 module ocean.io.select.protocol.fiber.FiberSocketConnection; 20 21 22 23 24 import ocean.meta.types.Qualifiers; 25 26 import ocean.core.Verify; 27 28 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 29 30 import ocean.io.select.EpollSelectDispatcher; 31 32 import ocean.core.Array : copy; 33 34 import ocean.sys.socket.AddressIPSocket, 35 ocean.sys.socket.InetAddress, 36 ocean.sys.socket.IPSocket: IIPSocket; 37 38 39 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError; 40 41 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN; 42 43 debug ( EpollTiming ) import ocean.time.StopWatch; 44 debug ( ISelectClient ) 45 { 46 import ocean.io.Stdout : Stderr; 47 import ocean.text.util.StringC; 48 } 49 50 51 52 public class FiberSocketConnection ( bool IPv6 = false ) : IFiberSocketConnection 53 { 54 /************************************************************************** 55 56 Alias of the address struct type. 57 58 **************************************************************************/ 59 60 alias .InetAddress!(IPv6) InetAddress; 61 62 /************************************************************************** 63 64 Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false) 65 or sockaddr_in6 for IPv6 (IPv6 = true). 66 67 **************************************************************************/ 68 69 alias InetAddress.Addr InAddr; 70 71 /************************************************************************** 72 73 Socket 74 75 **************************************************************************/ 76 77 alias AddressIPSocket!(IPv6) IPSocket; 78 79 protected IPSocket socket; 80 81 /************************************************************************** 82 83 Constructor. 84 85 Params: 86 socket = IPSocket instance to use internally 87 fiber = fiber to be suspended when socket connection does not 88 immediately succeed or fail 89 90 **************************************************************************/ 91 92 public this ( IPSocket socket, SelectFiber fiber ) 93 { 94 this.socket = socket; 95 96 super(this.socket, fiber); 97 } 98 99 /************************************************************************** 100 101 Constructor. 102 103 warning_e and socket_error may be the same object. 104 105 Params: 106 socket = IPSocket instance to use internally 107 fiber = fiber to be suspended when socket connection does not 108 immediately succeed or fail 109 warning_e = exception to be thrown when the remote hung up 110 socket_error = exception to be thrown on socket error 111 112 **************************************************************************/ 113 114 public this ( IPSocket socket, SelectFiber fiber, 115 IOWarning warning_e, SocketError socket_error ) 116 { 117 super(this.socket = socket, fiber, warning_e, socket_error); 118 } 119 120 /************************************************************************** 121 122 Attempts to connect to the remote host, suspending the fiber if 123 establishing the connection does not immediately succeed or fail. If a 124 connection to the same address and port is already established, the 125 Already flag is set in the return value. If a connection to a different 126 address and port is already established, this connection is closed and a 127 new connection is opened. 128 129 Params: 130 address = remote IP address 131 port = remote TCP port 132 force = false: don't call connect() if currently connected to the 133 same address and port; true: always call connect() 134 Returns: 135 ConnectionStatus.Connected if the connection was newly established 136 or ConnectionStatus.Connected | ConnectionStatus.Already if the 137 connection was already established. 138 139 Throws: 140 - SocketError (IOException) on fatal I/O error, 141 - IOWarning if the remote hung up. 142 143 **************************************************************************/ 144 145 override public ConnectionStatus connect ( cstring address, ushort port, bool force = false ) 146 { 147 if (!this.sameAddress(address, port)) 148 { 149 this.disconnect(); 150 } 151 152 return this.connect_(!this.socket.connect(address, port), force); 153 } 154 155 /*************************************************************************** 156 157 Ditto. 158 159 Params: 160 address = remote address 161 force = false: don't call connect() if currently connected to the 162 same address and port; true: always call connect() 163 164 Returns: 165 see connect() above. 166 167 ***************************************************************************/ 168 169 public ConnectionStatus connect ( InAddr address, bool force = false ) 170 { 171 if (this.in_addr != address) 172 { 173 this.disconnect(); 174 } 175 176 return this.connect_(!this.socket.connect(address), force); 177 } 178 179 /************************************************************************** 180 181 Returns: 182 the remote address of the connected socket, or the last attempted 183 connection, or an empty address if no connection has been attempted 184 185 **************************************************************************/ 186 187 public InAddr in_addr ( ) 188 { 189 return this.connected_? this.socket.in_addr : InetAddress.addr_init; 190 } 191 192 /*************************************************************************** 193 194 Returns: 195 the remote IP address of the connected socket, or the last attempted 196 connection, or "" if no connection has been attempted 197 198 ***************************************************************************/ 199 200 override protected cstring address_ ( ) 201 { 202 return this.socket.address; 203 } 204 205 /*************************************************************************** 206 207 Returns: 208 the remote TCP port of the connected socket, or the last attempted 209 connection, or ushort.init if no connection has been attempted 210 211 ***************************************************************************/ 212 213 override protected ushort port_ ( ) 214 { 215 return this.socket.port; 216 } 217 218 /*************************************************************************** 219 220 Compares ip_address_str and port with the current address and port. 221 222 Params: 223 ip_address_str = string with the IP address to compare with the 224 current address 225 port = port to compare with the current 226 227 Returns: 228 true if ip_address_str and port are the same as the current or false 229 if not. 230 231 Throws: 232 SocketError if ip_address_str does not contain a valid IP address. 233 234 ***************************************************************************/ 235 236 public bool sameAddress ( InAddr addr ) 237 { 238 return addr == this.socket.in_addr; 239 } 240 241 /*************************************************************************** 242 243 Compares ip_address_str and port with the current address and port. 244 245 Params: 246 ip_address_str = string with the IP address to compare with the 247 current address 248 port = port to compare with the current 249 250 Returns: 251 true if ip_address_str and port are the same as the current or false 252 if not. 253 254 Throws: 255 SocketError if ip_address_str does not contain a valid IP address. 256 257 ***************************************************************************/ 258 259 public bool sameAddress ( cstring ip_address_str, ushort port ) 260 { 261 InetAddress address; 262 263 this.socket_error.enforce(address.inet_pton(ip_address_str) == 1, 264 "invalid IP address"); 265 266 address.port = port; 267 268 return this.sameAddress(address.addr); 269 } 270 } 271 272 /// 273 unittest 274 { 275 alias FiberSocketConnection!(true) IPV6; 276 alias FiberSocketConnection!(false) IPV4; 277 } 278 279 280 public class IFiberSocketConnection : IFiberSelectProtocol 281 { 282 /************************************************************************** 283 284 This alias for chainable methods 285 286 **************************************************************************/ 287 288 public alias typeof (this) This; 289 290 /************************************************************************** 291 292 Connection status as returned by connect() and disconnect(). 293 294 **************************************************************************/ 295 296 public enum ConnectionStatus : uint 297 { 298 Disconnected = 0, 299 Connected = 1 << 0, 300 Already = 1 << 1 301 } 302 303 /************************************************************************** 304 305 Socket 306 307 **************************************************************************/ 308 309 protected IIPSocket socket; 310 311 /************************************************************************** 312 313 Socket error exception 314 315 **************************************************************************/ 316 317 private SocketError socket_error; 318 319 /************************************************************************** 320 321 Current connection status 322 323 **************************************************************************/ 324 325 protected bool connected_ = false; 326 327 /************************************************************************** 328 329 Count of the number of times transmit() has been invoked since the call 330 to super.transmitLoop. 331 332 **************************************************************************/ 333 334 private uint transmit_calls; 335 336 /************************************************************************** 337 338 Delegate which is called (in EpollTiming debug mode) after a socket 339 connection is established. 340 341 FIXME: the logging of connection times was intended to be done directly 342 in this module, not via a delegate, but dmd bugs with varargs made this 343 impossible. The delegate solution is ok though. 344 345 **************************************************************************/ 346 347 debug ( EpollTiming ) 348 { 349 private alias void delegate ( ulong microsec ) ConnectionTimeDg; 350 public ConnectionTimeDg connection_time_dg; 351 } 352 353 /************************************************************************** 354 355 Constructor. 356 357 warning_e and socket_error may be the same object. 358 359 Params: 360 socket = IPSocket instance to use internally 361 fiber = fiber to be suspended when socket connection does not 362 immediately succeed or fail 363 warning_e = exception to be thrown when the remote hung up 364 socket_error = exception to be thrown on socket error 365 366 **************************************************************************/ 367 368 protected this ( IIPSocket socket, SelectFiber fiber, 369 IOWarning warning_e, SocketError socket_error ) 370 { 371 this.socket = socket; 372 373 this.socket_error = socket_error; 374 375 super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error); 376 } 377 378 /************************************************************************** 379 380 Constructor. 381 382 Params: 383 socket = IPSocket instance to use internally 384 fiber = fiber to be suspended when socket connection does not 385 immediately succeed or fail 386 387 **************************************************************************/ 388 389 protected this ( IIPSocket socket, SelectFiber fiber ) 390 { 391 this(socket, fiber, new IOWarning(socket), new SocketError(socket)); 392 } 393 394 /************************************************************************** 395 396 Returns: 397 true if the socket is currently connected or false if not. 398 399 **************************************************************************/ 400 401 public bool connected ( ) 402 { 403 return this.connected_; 404 } 405 406 /*************************************************************************** 407 408 Returns: 409 the IP address of the connected socket, or the last attempted 410 connection, or "" if no connection has been attempted 411 412 ***************************************************************************/ 413 414 public cstring address ( ) 415 { 416 return this.connected_? this.address_ : ""; 417 } 418 419 /*************************************************************************** 420 421 Returns: 422 the TCP port of the connected socket, or the last attempted 423 connection, or ushort.init if no connection has been attempted 424 425 ***************************************************************************/ 426 427 public ushort port ( ) 428 { 429 return this.connected_? this.port_ : ushort.init; 430 } 431 432 /************************************************************************** 433 434 Attempts to connect to the remote host, suspending the fiber if 435 establishing the connection does not immediately succeed or fail. If a 436 connection to the same address and port is already established, the 437 Already flag is set in the return value. If a connection to a different 438 address and port is already established, this connection is closed and a 439 new connection is opened. 440 441 Params: 442 address = remote IP address 443 port = remote TCP port 444 force = false: don't call connect() if currently connected to the 445 same address and port; true: always call connect() 446 447 Returns: 448 ConnectionStatus.Connected if the connection was newly established 449 or ConnectionStatus.Connected | ConnectionStatus.Already if the 450 connection was already established. 451 452 Throws: 453 - SocketError (IOException) on fatal I/O error, 454 - IOWarning if the remote hung up. 455 456 **************************************************************************/ 457 458 abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false ); 459 460 /************************************************************************** 461 462 Disconnects from provided address (if connected). 463 464 Params: 465 force = false: disconnect only if connected; true: force disconnect 466 467 Returns: 468 the connection status before disconnecting. 469 470 **************************************************************************/ 471 472 public ConnectionStatus disconnect ( bool force = false ) 473 out 474 { 475 assert (!this.connected_); 476 } 477 do 478 { 479 if (this.connected_ || force) 480 { 481 this.onDisconnect(); 482 this.socket.shutdown(); 483 this.socket.close(); 484 } 485 486 scope (exit) this.connected_ = false; 487 488 with (ConnectionStatus) return this.connected_? 489 Disconnected : 490 Already | Disconnected; 491 } 492 493 /************************************************************************** 494 495 Establishes a non-blocking socket connection according to the POSIX 496 specification for connect(): 497 498 "If the connection cannot be established immediately and O_NONBLOCK 499 is set for the file descriptor for the socket, connect() shall fail 500 and set errno to [EINPROGRESS], but the connection request shall not 501 be aborted, and the connection shall be established asynchronously. 502 [...] 503 When the connection has been established asynchronously, select() 504 and poll() shall indicate that the file descriptor for the socket is 505 ready for writing." 506 507 Calls connect_syscall, which should forward to connect() to establish a 508 connection. If connect_syscall returns false, errno is evaluated to 509 obtain the connection status. 510 If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so 511 that this method returns when the socket is ready for writing or throws 512 when a connection error was detected. 513 514 Params: 515 connect_syscall = should call connect() and return true if connect() 516 returns 0 or false otherwise 517 518 force = false: don't call connect_syscall if currently 519 connected to the same address and port; true: 520 always call connect_syscall 521 522 Returns: 523 - ConnectionStatus.Connected if the connection was newly 524 established, either because connect_syscall returned true or after 525 the socket became ready for writing, 526 - ConnectionStatus.Connected | ConnectionStatus.Already if connect() 527 failed with EISCONN. 528 529 Throws: 530 - SocketError (IOException) if connect_syscall fails with an error 531 other than EINPROGRESS/EINTR or EISCONN or if a socket error was 532 detected, 533 - IOWarning if the remote hung up. 534 535 Out: 536 The socket is connected, the returned status is never Disconnected. 537 538 Note: The POSIX specification says about connect() failing with EINTR: 539 540 "If connect() is interrupted by a signal that is caught while 541 blocked waiting to establish a connection, connect() shall fail and 542 set errno to EINTR, but the connection request shall not be aborted, 543 and the connection shall be established asynchronously." 544 545 It remains unclear whether a nonblocking connect() can also fail with 546 EINTR or not. Assuming that, if it is possible, it has the same meaning 547 as for blocking connect(), we handle EINTR in the same way as 548 EINPROGRESS. TODO: Remove handling of EINTR or this note when this is 549 clarified. 550 551 **************************************************************************/ 552 553 protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force ) 554 out (status) 555 { 556 assert (this.connected_); 557 assert (status); 558 } 559 do 560 { 561 // Create a socket if it is currently closed. 562 if (this.socket.fileHandle < 0) 563 { 564 this.connected_ = false; 565 566 this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0, 567 "error creating socket", __FILE__, __LINE__); 568 569 this.initSocket(); 570 } 571 572 if (!this.connected_ || force) 573 { 574 debug ( EpollTiming ) 575 { 576 StopWatch sw; 577 sw.start; 578 579 scope ( success ) 580 { 581 if ( this.connection_time_dg ) 582 { 583 this.connection_time_dg(sw.microsec); 584 } 585 } 586 } 587 588 if ( connect_syscall ) 589 { 590 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket", 591 this.address, this.port).flush(); 592 this.connected_ = true; 593 return ConnectionStatus.Connected; 594 } 595 else 596 { 597 int errnum = .errno; 598 599 debug ( ISelectClient ) 600 { 601 import core.stdc..string : strerror; 602 Stderr.formatln("[{}:{}]: {}", 603 this.address_, this.port_, StringC.toDString( 604 strerror(errnum))).flush(); 605 } 606 607 switch (errnum) 608 { 609 case EISCONN: 610 this.connected_ = true; 611 return ConnectionStatus.Already; 612 613 case EINTR, // TODO: Might never be reported, see note above. 614 EINPROGRESS, 615 EALREADY: 616 debug ( ISelectClient ) 617 { 618 Stderr.formatln("[{}:{}]: waiting for the socket to become writable", 619 this.address_, this.port_).flush(); 620 621 scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable", 622 this.address_, this.port_).flush(); 623 624 scope (success) Stderr.formatln("[{}:{}]: socket has become writable", 625 this.address_, this.port_).flush(); 626 } 627 this.transmit_calls = 0; 628 this.transmitLoop(); 629 this.connected_ = true; 630 return ConnectionStatus.Connected; 631 632 default: 633 throw this.socket_error.setSock(errnum, 634 "error establishing connection", __FILE__, __LINE__); 635 } 636 } 637 } 638 else 639 { 640 return ConnectionStatus.Already; 641 } 642 } 643 644 /*************************************************************************** 645 646 Returns: 647 the IP address of the connected socket, or the last attempted 648 connection, or "" if no connection has been attempted 649 650 ***************************************************************************/ 651 652 abstract protected cstring address_ ( ); 653 654 /*************************************************************************** 655 656 Returns: 657 the TCP port of the connected socket, or the last attempted 658 connection, or ushort.init if no connection has been attempted 659 660 ***************************************************************************/ 661 662 abstract protected ushort port_ ( ); 663 664 /*************************************************************************** 665 666 Called just before the socket is connected. The base class 667 implementation does nothing, but derived classes may override to add any 668 desired initialisation logic. 669 670 ***************************************************************************/ 671 672 protected void initSocket ( ) 673 { 674 } 675 676 /************************************************************************** 677 678 Disconnection cleanup handler for a subclass 679 680 **************************************************************************/ 681 682 protected void onDisconnect ( ) 683 { 684 } 685 686 /*************************************************************************** 687 688 Called from super.transmitLoop() in two circumstances: 689 1. Upon the initial call to transmitLoop() in connect(), above. 690 2. After an epoll wait, upon receipt of one or more registered 691 events. 692 693 Params: 694 events = events reported for socket 695 696 Returns: 697 Upon first invocation (which occurs automatically when 698 super.transmitLoop() is called, in connect(), above): 699 * false if connect() returned a code denoting either an error or 700 a successful connection (meaning there's no need to go into 701 epoll wait). 702 * true otherwise, to go into epoll wait. 703 704 Upon second invocation: 705 * false to not return to epoll wait. 706 707 ***************************************************************************/ 708 709 protected override bool transmit ( Event events ) 710 { 711 verify(this.transmit_calls <= 1); 712 713 scope ( exit ) this.transmit_calls++; 714 715 if ( this.transmit_calls > 0 ) 716 { 717 this.warning_e.enforce(!(events & Event.EPOLLHUP), 718 "Hangup on connect"); 719 return false; 720 } 721 else 722 { 723 return true; 724 } 725 } 726 }