1 /******************************************************************************* 2 3 Base class for a non-blocking socket connection select client using a 4 fiber/coroutine to suspend operation while waiting for the connection to be 5 established and resume on that event (a Write event signifies that the 6 connection has been established). 7 8 Copyright: 9 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 10 All rights reserved. 11 12 License: 13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 14 Alternatively, this file may be distributed under the terms of the Tango 15 3-Clause BSD License (see LICENSE_BSD.txt for details). 16 17 *******************************************************************************/ 18 19 module ocean.io.select.protocol.fiber.FiberSocketConnection; 20 21 import ocean.core.Array : copy; 22 import ocean.core.Verify; 23 import ocean.io.select.EpollSelectDispatcher; 24 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 25 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError; 26 import ocean.meta.types.Qualifiers; 27 import ocean.sys.socket.AddressIPSocket; 28 import ocean.sys.socket.InetAddress; 29 import ocean.sys.socket.IPSocket: IIPSocket; 30 debug (EpollTiming) import ocean.time.StopWatch; 31 debug (ISelectClient) 32 { 33 import ocean.io.Stdout : Stderr; 34 import ocean.text.util.StringC; 35 } 36 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN; 37 38 39 /// Ditto 40 public class FiberSocketConnection (bool IPv6 = false) : IFiberSocketConnection 41 { 42 /************************************************************************** 43 44 Alias of the address struct type. 45 46 **************************************************************************/ 47 48 alias .InetAddress!(IPv6) InetAddress; 49 50 /************************************************************************** 51 52 Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false) 53 or sockaddr_in6 for IPv6 (IPv6 = true). 54 55 **************************************************************************/ 56 57 alias InetAddress.Addr InAddr; 58 59 /************************************************************************** 60 61 Socket 62 63 **************************************************************************/ 64 65 alias AddressIPSocket!(IPv6) IPSocket; 66 67 protected IPSocket socket; 68 69 /************************************************************************** 70 71 Constructor. 72 73 Params: 74 socket = IPSocket instance to use internally 75 fiber = fiber to be suspended when socket connection does not 76 immediately succeed or fail 77 78 **************************************************************************/ 79 80 public this ( IPSocket socket, SelectFiber fiber ) 81 { 82 this.socket = socket; 83 84 super(this.socket, fiber); 85 } 86 87 /************************************************************************** 88 89 Constructor. 90 91 warning_e and socket_error may be the same object. 92 93 Params: 94 socket = IPSocket instance to use internally 95 fiber = fiber to be suspended when socket connection does not 96 immediately succeed or fail 97 warning_e = exception to be thrown when the remote hung up 98 socket_error = exception to be thrown on socket error 99 100 **************************************************************************/ 101 102 public this ( IPSocket socket, SelectFiber fiber, 103 IOWarning warning_e, SocketError socket_error ) 104 { 105 super(this.socket = socket, fiber, warning_e, socket_error); 106 } 107 108 /************************************************************************** 109 110 Attempts to connect to the remote host, suspending the fiber if 111 establishing the connection does not immediately succeed or fail. If a 112 connection to the same address and port is already established, the 113 Already flag is set in the return value. If a connection to a different 114 address and port is already established, this connection is closed and a 115 new connection is opened. 116 117 Params: 118 address = remote IP address 119 port = remote TCP port 120 force = false: don't call connect() if currently connected to the 121 same address and port; true: always call connect() 122 Returns: 123 ConnectionStatus.Connected if the connection was newly established 124 or ConnectionStatus.Connected | ConnectionStatus.Already if the 125 connection was already established. 126 127 Throws: 128 - SocketError (IOException) on fatal I/O error, 129 - IOWarning if the remote hung up. 130 131 **************************************************************************/ 132 133 override public ConnectionStatus connect ( cstring address, ushort port, bool force = false ) 134 { 135 if (!this.sameAddress(address, port)) 136 { 137 this.disconnect(); 138 } 139 140 return this.connect_(!this.socket.connect(address, port), force); 141 } 142 143 /*************************************************************************** 144 145 Ditto. 146 147 Params: 148 address = remote address 149 force = false: don't call connect() if currently connected to the 150 same address and port; true: always call connect() 151 152 Returns: 153 see connect() above. 154 155 ***************************************************************************/ 156 157 public ConnectionStatus connect ( InAddr address, bool force = false ) 158 { 159 if (this.in_addr != address) 160 { 161 this.disconnect(); 162 } 163 164 return this.connect_(!this.socket.connect(address), force); 165 } 166 167 /************************************************************************** 168 169 Returns: 170 the remote address of the connected socket, or the last attempted 171 connection, or an empty address if no connection has been attempted 172 173 **************************************************************************/ 174 175 public InAddr in_addr ( ) 176 { 177 return this.connected_? this.socket.in_addr : InetAddress.addr_init; 178 } 179 180 /*************************************************************************** 181 182 Returns: 183 the remote IP address of the connected socket, or the last attempted 184 connection, or "" if no connection has been attempted 185 186 ***************************************************************************/ 187 188 override protected cstring address_ ( ) 189 { 190 return this.socket.address; 191 } 192 193 /*************************************************************************** 194 195 Returns: 196 the remote TCP port of the connected socket, or the last attempted 197 connection, or ushort.init if no connection has been attempted 198 199 ***************************************************************************/ 200 201 override protected ushort port_ ( ) 202 { 203 return this.socket.port; 204 } 205 206 /*************************************************************************** 207 208 Compares ip_address_str and port with the current address and port. 209 210 Params: 211 ip_address_str = string with the IP address to compare with the 212 current address 213 port = port to compare with the current 214 215 Returns: 216 true if ip_address_str and port are the same as the current or false 217 if not. 218 219 Throws: 220 SocketError if ip_address_str does not contain a valid IP address. 221 222 ***************************************************************************/ 223 224 public bool sameAddress ( InAddr addr ) 225 { 226 return addr == this.socket.in_addr; 227 } 228 229 /*************************************************************************** 230 231 Compares ip_address_str and port with the current address and port. 232 233 Params: 234 ip_address_str = string with the IP address to compare with the 235 current address 236 port = port to compare with the current 237 238 Returns: 239 true if ip_address_str and port are the same as the current or false 240 if not. 241 242 Throws: 243 SocketError if ip_address_str does not contain a valid IP address. 244 245 ***************************************************************************/ 246 247 public bool sameAddress ( cstring ip_address_str, ushort port ) 248 { 249 InetAddress address; 250 251 this.socket_error.enforce(address.inet_pton(ip_address_str) == 1, 252 "invalid IP address"); 253 254 address.port = port; 255 256 return this.sameAddress(address.addr); 257 } 258 } 259 260 /// 261 unittest 262 { 263 alias FiberSocketConnection!(true) IPV6; 264 alias FiberSocketConnection!(false) IPV4; 265 } 266 267 268 public class IFiberSocketConnection : IFiberSelectProtocol 269 { 270 /************************************************************************** 271 272 This alias for chainable methods 273 274 **************************************************************************/ 275 276 public alias typeof (this) This; 277 278 /************************************************************************** 279 280 Connection status as returned by connect() and disconnect(). 281 282 **************************************************************************/ 283 284 public enum ConnectionStatus : uint 285 { 286 Disconnected = 0, 287 Connected = 1 << 0, 288 Already = 1 << 1 289 } 290 291 /************************************************************************** 292 293 Socket 294 295 **************************************************************************/ 296 297 protected IIPSocket socket; 298 299 /************************************************************************** 300 301 Socket error exception 302 303 **************************************************************************/ 304 305 private SocketError socket_error; 306 307 /************************************************************************** 308 309 Current connection status 310 311 **************************************************************************/ 312 313 protected bool connected_ = false; 314 315 /************************************************************************** 316 317 Count of the number of times transmit() has been invoked since the call 318 to super.transmitLoop. 319 320 **************************************************************************/ 321 322 private uint transmit_calls; 323 324 /************************************************************************** 325 326 Delegate which is called (in EpollTiming debug mode) after a socket 327 connection is established. 328 329 FIXME: the logging of connection times was intended to be done directly 330 in this module, not via a delegate, but dmd bugs with varargs made this 331 impossible. The delegate solution is ok though. 332 333 **************************************************************************/ 334 335 debug ( EpollTiming ) 336 { 337 private alias void delegate ( ulong microsec ) ConnectionTimeDg; 338 public ConnectionTimeDg connection_time_dg; 339 } 340 341 /************************************************************************** 342 343 Constructor. 344 345 warning_e and socket_error may be the same object. 346 347 Params: 348 socket = IPSocket instance to use internally 349 fiber = fiber to be suspended when socket connection does not 350 immediately succeed or fail 351 warning_e = exception to be thrown when the remote hung up 352 socket_error = exception to be thrown on socket error 353 354 **************************************************************************/ 355 356 protected this ( IIPSocket socket, SelectFiber fiber, 357 IOWarning warning_e, SocketError socket_error ) 358 { 359 this.socket = socket; 360 361 this.socket_error = socket_error; 362 363 super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error); 364 } 365 366 /************************************************************************** 367 368 Constructor. 369 370 Params: 371 socket = IPSocket instance to use internally 372 fiber = fiber to be suspended when socket connection does not 373 immediately succeed or fail 374 375 **************************************************************************/ 376 377 protected this ( IIPSocket socket, SelectFiber fiber ) 378 { 379 this(socket, fiber, new IOWarning(socket), new SocketError(socket)); 380 } 381 382 /************************************************************************** 383 384 Returns: 385 true if the socket is currently connected or false if not. 386 387 **************************************************************************/ 388 389 public bool connected ( ) 390 { 391 return this.connected_; 392 } 393 394 /*************************************************************************** 395 396 Returns: 397 the IP address of the connected socket, or the last attempted 398 connection, or "" if no connection has been attempted 399 400 ***************************************************************************/ 401 402 public cstring address ( ) 403 { 404 return this.connected_? this.address_ : ""; 405 } 406 407 /*************************************************************************** 408 409 Returns: 410 the TCP port of the connected socket, or the last attempted 411 connection, or ushort.init if no connection has been attempted 412 413 ***************************************************************************/ 414 415 public ushort port ( ) 416 { 417 return this.connected_? this.port_ : ushort.init; 418 } 419 420 /************************************************************************** 421 422 Attempts to connect to the remote host, suspending the fiber if 423 establishing the connection does not immediately succeed or fail. If a 424 connection to the same address and port is already established, the 425 Already flag is set in the return value. If a connection to a different 426 address and port is already established, this connection is closed and a 427 new connection is opened. 428 429 Params: 430 address = remote IP address 431 port = remote TCP port 432 force = false: don't call connect() if currently connected to the 433 same address and port; true: always call connect() 434 435 Returns: 436 ConnectionStatus.Connected if the connection was newly established 437 or ConnectionStatus.Connected | ConnectionStatus.Already if the 438 connection was already established. 439 440 Throws: 441 - SocketError (IOException) on fatal I/O error, 442 - IOWarning if the remote hung up. 443 444 **************************************************************************/ 445 446 abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false ); 447 448 /************************************************************************** 449 450 Disconnects from provided address (if connected). 451 452 Params: 453 force = false: disconnect only if connected; true: force disconnect 454 455 Returns: 456 the connection status before disconnecting. 457 458 **************************************************************************/ 459 460 public ConnectionStatus disconnect ( bool force = false ) 461 out 462 { 463 assert (!this.connected_); 464 } 465 do 466 { 467 if (this.connected_ || force) 468 { 469 this.onDisconnect(); 470 this.socket.shutdown(); 471 this.socket.close(); 472 } 473 474 scope (exit) this.connected_ = false; 475 476 with (ConnectionStatus) return this.connected_? 477 Disconnected : 478 Already | Disconnected; 479 } 480 481 /************************************************************************** 482 483 Establishes a non-blocking socket connection according to the POSIX 484 specification for connect(): 485 486 "If the connection cannot be established immediately and O_NONBLOCK 487 is set for the file descriptor for the socket, connect() shall fail 488 and set errno to [EINPROGRESS], but the connection request shall not 489 be aborted, and the connection shall be established asynchronously. 490 [...] 491 When the connection has been established asynchronously, select() 492 and poll() shall indicate that the file descriptor for the socket is 493 ready for writing." 494 495 Calls connect_syscall, which should forward to connect() to establish a 496 connection. If connect_syscall returns false, errno is evaluated to 497 obtain the connection status. 498 If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so 499 that this method returns when the socket is ready for writing or throws 500 when a connection error was detected. 501 502 Params: 503 connect_syscall = should call connect() and return true if connect() 504 returns 0 or false otherwise 505 506 force = false: don't call connect_syscall if currently 507 connected to the same address and port; true: 508 always call connect_syscall 509 510 Returns: 511 - ConnectionStatus.Connected if the connection was newly 512 established, either because connect_syscall returned true or after 513 the socket became ready for writing, 514 - ConnectionStatus.Connected | ConnectionStatus.Already if connect() 515 failed with EISCONN. 516 517 Throws: 518 - SocketError (IOException) if connect_syscall fails with an error 519 other than EINPROGRESS/EINTR or EISCONN or if a socket error was 520 detected, 521 - IOWarning if the remote hung up. 522 523 Out: 524 The socket is connected, the returned status is never Disconnected. 525 526 Note: The POSIX specification says about connect() failing with EINTR: 527 528 "If connect() is interrupted by a signal that is caught while 529 blocked waiting to establish a connection, connect() shall fail and 530 set errno to EINTR, but the connection request shall not be aborted, 531 and the connection shall be established asynchronously." 532 533 It remains unclear whether a nonblocking connect() can also fail with 534 EINTR or not. Assuming that, if it is possible, it has the same meaning 535 as for blocking connect(), we handle EINTR in the same way as 536 EINPROGRESS. TODO: Remove handling of EINTR or this note when this is 537 clarified. 538 539 **************************************************************************/ 540 541 protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force ) 542 out (status) 543 { 544 assert (this.connected_); 545 assert (status); 546 } 547 do 548 { 549 // Create a socket if it is currently closed. 550 if (this.socket.fileHandle < 0) 551 { 552 this.connected_ = false; 553 this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0, 554 "error creating socket", __FILE__, __LINE__); 555 this.initSocket(); 556 } 557 558 if (!this.connected_ || force) 559 { 560 debug ( EpollTiming ) 561 { 562 StopWatch sw; 563 sw.start; 564 565 scope ( success ) 566 { 567 if ( this.connection_time_dg ) 568 { 569 this.connection_time_dg(sw.microsec); 570 } 571 } 572 } 573 574 if ( connect_syscall ) 575 { 576 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket", 577 this.address, this.port).flush(); 578 this.connected_ = true; 579 return ConnectionStatus.Connected; 580 } 581 else 582 { 583 int errnum = .errno; 584 585 debug ( ISelectClient ) 586 { 587 import core.stdc.string : strerror; 588 Stderr.formatln("[{}:{}]: {}", 589 this.address_, this.port_, StringC.toDString( 590 strerror(errnum))).flush(); 591 } 592 593 switch (errnum) 594 { 595 case EISCONN: 596 this.connected_ = true; 597 return ConnectionStatus.Already; 598 599 case EINTR, // TODO: Might never be reported, see note above. 600 EINPROGRESS, 601 EALREADY: 602 debug ( ISelectClient ) 603 { 604 Stderr.formatln("[{}:{}]: waiting for the socket to become writable", 605 this.address_, this.port_).flush(); 606 607 scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable", 608 this.address_, this.port_).flush(); 609 610 scope (success) Stderr.formatln("[{}:{}]: socket has become writable", 611 this.address_, this.port_).flush(); 612 } 613 this.transmit_calls = 0; 614 this.transmitLoop(); 615 this.connected_ = true; 616 return ConnectionStatus.Connected; 617 618 default: 619 throw this.socket_error.setSock(errnum, 620 "error establishing connection", __FILE__, __LINE__); 621 } 622 } 623 } 624 else 625 { 626 return ConnectionStatus.Already; 627 } 628 } 629 630 /*************************************************************************** 631 632 Returns: 633 the IP address of the connected socket, or the last attempted 634 connection, or "" if no connection has been attempted 635 636 ***************************************************************************/ 637 638 abstract protected cstring address_ ( ); 639 640 /*************************************************************************** 641 642 Returns: 643 the TCP port of the connected socket, or the last attempted 644 connection, or ushort.init if no connection has been attempted 645 646 ***************************************************************************/ 647 648 abstract protected ushort port_ ( ); 649 650 /*************************************************************************** 651 652 Called just before the socket is connected. The base class 653 implementation does nothing, but derived classes may override to add any 654 desired initialisation logic. 655 656 ***************************************************************************/ 657 658 protected void initSocket ( ) 659 { 660 } 661 662 /************************************************************************** 663 664 Disconnection cleanup handler for a subclass 665 666 **************************************************************************/ 667 668 protected void onDisconnect ( ) 669 { 670 } 671 672 /*************************************************************************** 673 674 Called from super.transmitLoop() in two circumstances: 675 1. Upon the initial call to transmitLoop() in connect(), above. 676 2. After an epoll wait, upon receipt of one or more registered 677 events. 678 679 Params: 680 events = events reported for socket 681 682 Returns: 683 Upon first invocation (which occurs automatically when 684 super.transmitLoop() is called, in connect(), above): 685 * false if connect() returned a code denoting either an error or 686 a successful connection (meaning there's no need to go into 687 epoll wait). 688 * true otherwise, to go into epoll wait. 689 690 Upon second invocation: 691 * false to not return to epoll wait. 692 693 ***************************************************************************/ 694 695 protected override bool transmit ( Event events ) 696 { 697 verify(this.transmit_calls <= 1); 698 699 scope ( exit ) this.transmit_calls++; 700 701 if ( this.transmit_calls > 0 ) 702 { 703 this.warning_e.enforce(!(events & Event.EPOLLHUP), 704 "Hangup on connect"); 705 return false; 706 } 707 else 708 { 709 return true; 710 } 711 } 712 }