1 /*******************************************************************************
2 
3     Base class for a non-blocking socket connection select client using a
4     fiber/coroutine to suspend operation while waiting for the connection to be
5     established and resume on that event (a Write event signifies that the
6     connection has been established).
7 
8     Copyright:
9         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
10         All rights reserved.
11 
12     License:
13         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
14         Alternatively, this file may be distributed under the terms of the Tango
15         3-Clause BSD License (see LICENSE_BSD.txt for details).
16 
17 *******************************************************************************/
18 
19 module ocean.io.select.protocol.fiber.FiberSocketConnection;
20 
21 
22 
23 
24 import ocean.transition;
25 
26 import ocean.core.Verify;
27 
28 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
29 
30 import ocean.io.select.EpollSelectDispatcher;
31 
32 import ocean.core.Array : copy;
33 
34 import ocean.sys.socket.AddressIPSocket,
35        ocean.sys.socket.InetAddress,
36        ocean.sys.socket.IPSocket: IIPSocket;
37 
38 
39 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
40 
41 import ocean.stdc.posix.sys.socket: SOL_SOCKET, IPPROTO_TCP, SO_KEEPALIVE;
42 
43 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN;
44 
45 debug ( EpollTiming ) import ocean.time.StopWatch;
46 debug ( ISelectClient )
47 {
48     import ocean.io.Stdout : Stderr;
49     import ocean.text.util.StringC;
50 }
51 
52 
53 
54 public class FiberSocketConnection ( bool IPv6 = false ) : IFiberSocketConnection
55 {
56     /**************************************************************************
57 
58         Alias of the address struct type.
59 
60      **************************************************************************/
61 
62     alias .InetAddress!(IPv6) InetAddress;
63 
64     /**************************************************************************
65 
66         Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false)
67         or sockaddr_in6 for IPv6 (IPv6 = true).
68 
69      **************************************************************************/
70 
71     alias InetAddress.Addr InAddr;
72 
73     /**************************************************************************
74 
75         Socket
76 
77      **************************************************************************/
78 
79     alias AddressIPSocket!(IPv6) IPSocket;
80 
81     protected IPSocket socket;
82 
83     /**************************************************************************
84 
85         Constructor.
86 
87         Params:
88             socket = IPSocket instance to use internally
89             fiber = fiber to be suspended when socket connection does not
90                 immediately succeed or fail
91 
92      **************************************************************************/
93 
94     public this ( IPSocket socket, SelectFiber fiber )
95     {
96         this.socket = socket;
97 
98         super(this.socket, fiber);
99     }
100 
101     /**************************************************************************
102 
103         Constructor.
104 
105         warning_e and socket_error may be the same object.
106 
107         Params:
108             socket       = IPSocket instance to use internally
109             fiber        = fiber to be suspended when socket connection does not
110                            immediately succeed or fail
111             warning_e    = exception to be thrown when the remote hung up
112             socket_error = exception to be thrown on socket error
113 
114      **************************************************************************/
115 
116     public this ( IPSocket socket, SelectFiber fiber,
117                   IOWarning warning_e, SocketError socket_error )
118     {
119         super(this.socket = socket, fiber, warning_e, socket_error);
120     }
121 
122     /**************************************************************************
123 
124         Attempts to connect to the remote host, suspending the fiber if
125         establishing the connection does not immediately succeed or fail. If a
126         connection to the same address and port is already established, the
127         Already flag is set in the return value. If a connection to a different
128         address and port is already established, this connection is closed and a
129         new connection is opened.
130 
131         Params:
132             address = remote IP address
133             port    = remote TCP port
134             force   = false: don't call connect() if currently connected to the
135                       same address and port; true: always call connect()
136         Returns:
137             ConnectionStatus.Connected if the connection was newly established
138             or ConnectionStatus.Connected | ConnectionStatus.Already if the
139             connection was already established.
140 
141         Throws:
142             - SocketError (IOException) on fatal I/O error,
143             - IOWarning if the remote hung up.
144 
145      **************************************************************************/
146 
147     override public ConnectionStatus connect ( cstring address, ushort port, bool force = false )
148     {
149         if (!this.sameAddress(address, port))
150         {
151             this.disconnect();
152         }
153 
154         return this.connect_(!this.socket.connect(address, port), force);
155     }
156 
157     /***************************************************************************
158 
159         Ditto.
160 
161         Params:
162             address = remote address
163             force   = false: don't call connect() if currently connected to the
164                       same address and port; true: always call connect()
165 
166         Returns:
167             see connect() above.
168 
169     ***************************************************************************/
170 
171     public ConnectionStatus connect ( InAddr address, bool force = false )
172     {
173         if (this.in_addr != address)
174         {
175             this.disconnect();
176         }
177 
178         return this.connect_(!this.socket.connect(address), force);
179     }
180 
181     /**************************************************************************
182 
183         Returns:
184             the remote address of the connected socket, or the last attempted
185             connection, or an empty address if no connection has been attempted
186 
187      **************************************************************************/
188 
189     public InAddr in_addr ( )
190     {
191         return this.connected_? this.socket.in_addr : InetAddress.addr_init;
192     }
193 
194     /***************************************************************************
195 
196         Returns:
197             the remote IP address of the connected socket, or the last attempted
198             connection, or "" if no connection has been attempted
199 
200     ***************************************************************************/
201 
202     override protected cstring address_ ( )
203     {
204         return this.socket.address;
205     }
206 
207     /***************************************************************************
208 
209         Returns:
210             the remote TCP port of the connected socket, or the last attempted
211             connection, or ushort.init if no connection has been attempted
212 
213     ***************************************************************************/
214 
215     override protected ushort port_ ( )
216     {
217         return this.socket.port;
218     }
219 
220     /***************************************************************************
221 
222         Compares ip_address_str and port with the current address and port.
223 
224         Params:
225             ip_address_str = string with the IP address to compare with the
226                              current address
227             port           = port to compare with the current
228 
229         Returns:
230             true if ip_address_str and port are the same as the current or false
231             if not.
232 
233         Throws:
234             SocketError if ip_address_str does not contain a valid IP address.
235 
236      ***************************************************************************/
237 
238     public bool sameAddress ( InAddr addr )
239     {
240         return addr == this.socket.in_addr;
241     }
242 
243     /***************************************************************************
244 
245         Compares ip_address_str and port with the current address and port.
246 
247         Params:
248             ip_address_str = string with the IP address to compare with the
249                              current address
250             port           = port to compare with the current
251 
252         Returns:
253             true if ip_address_str and port are the same as the current or false
254             if not.
255 
256         Throws:
257             SocketError if ip_address_str does not contain a valid IP address.
258 
259     ***************************************************************************/
260 
261     public bool sameAddress ( cstring ip_address_str, ushort port )
262     {
263         InetAddress address;
264 
265         this.socket_error.enforce(address.inet_pton(ip_address_str) == 1,
266             "invalid IP address");
267 
268         address.port = port;
269 
270         return this.sameAddress(address.addr);
271     }
272 }
273 
274 ///
275 unittest
276 {
277     alias FiberSocketConnection!(true) IPV6;
278     alias FiberSocketConnection!(false) IPV4;
279 }
280 
281 
282 public class IFiberSocketConnection : IFiberSelectProtocol
283 {
284     /**************************************************************************
285 
286         This alias for chainable methods
287 
288      **************************************************************************/
289 
290     public alias typeof (this) This;
291 
292     /**************************************************************************
293 
294         Connection status as returned by connect() and disconnect().
295 
296      **************************************************************************/
297 
298     public enum ConnectionStatus : uint
299     {
300         Disconnected = 0,
301         Connected    = 1 << 0,
302         Already      = 1 << 1
303     }
304 
305     /**************************************************************************
306 
307         Socket
308 
309      **************************************************************************/
310 
311     protected IIPSocket socket;
312 
313     /**************************************************************************
314 
315         Socket error exception
316 
317      **************************************************************************/
318 
319     private SocketError socket_error;
320 
321     /**************************************************************************
322 
323         Current connection status
324 
325      **************************************************************************/
326 
327     protected bool connected_ = false;
328 
329     /**************************************************************************
330 
331         Count of the number of times transmit() has been invoked since the call
332         to super.transmitLoop.
333 
334      **************************************************************************/
335 
336     private uint transmit_calls;
337 
338     /**************************************************************************
339 
340         Delegate which is called (in EpollTiming debug mode) after a socket
341         connection is established.
342 
343         FIXME: the logging of connection times was intended to be done directly
344         in this module, not via a delegate, but dmd bugs with varargs made this
345         impossible. The delegate solution is ok though.
346 
347      **************************************************************************/
348 
349     debug ( EpollTiming )
350     {
351         private alias void delegate ( ulong microsec ) ConnectionTimeDg;
352         public ConnectionTimeDg connection_time_dg;
353     }
354 
355     /**************************************************************************
356 
357         Constructor.
358 
359         warning_e and socket_error may be the same object.
360 
361         Params:
362             socket       = IPSocket instance to use internally
363             fiber        = fiber to be suspended when socket connection does not
364                            immediately succeed or fail
365             warning_e    = exception to be thrown when the remote hung up
366             socket_error = exception to be thrown on socket error
367 
368      **************************************************************************/
369 
370     protected this ( IIPSocket socket, SelectFiber fiber,
371                      IOWarning warning_e, SocketError socket_error )
372     {
373         this.socket = socket;
374 
375         this.socket_error = socket_error;
376 
377         super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error);
378     }
379 
380     /**************************************************************************
381 
382         Constructor.
383 
384         Params:
385             socket       = IPSocket instance to use internally
386             fiber        = fiber to be suspended when socket connection does not
387                            immediately succeed or fail
388 
389      **************************************************************************/
390 
391     protected this ( IIPSocket socket, SelectFiber fiber )
392     {
393         this(socket, fiber, new IOWarning(socket), new SocketError(socket));
394     }
395 
396     /**************************************************************************
397 
398         Returns:
399             true if the socket is currently connected or false if not.
400 
401      **************************************************************************/
402 
403     public bool connected ( )
404     {
405         return this.connected_;
406     }
407 
408     /***************************************************************************
409 
410         Returns:
411             the IP address of the connected socket, or the last attempted
412             connection, or "" if no connection has been attempted
413 
414      ***************************************************************************/
415 
416     public cstring address ( )
417     {
418         return this.connected_? this.address_ : "";
419     }
420 
421     /***************************************************************************
422 
423         Returns:
424             the TCP port of the connected socket, or the last attempted
425             connection, or ushort.init if no connection has been attempted
426 
427      ***************************************************************************/
428 
429     public ushort port ( )
430     {
431         return this.connected_? this.port_ : ushort.init;
432     }
433 
434     /**************************************************************************
435 
436         Attempts to connect to the remote host, suspending the fiber if
437         establishing the connection does not immediately succeed or fail. If a
438         connection to the same address and port is already established, the
439         Already flag is set in the return value. If a connection to a different
440         address and port is already established, this connection is closed and a
441         new connection is opened.
442 
443         Params:
444             address = remote IP address
445             port    = remote TCP port
446             force   = false: don't call connect() if currently connected to the
447                       same address and port; true: always call connect()
448 
449         Returns:
450             ConnectionStatus.Connected if the connection was newly established
451             or ConnectionStatus.Connected | ConnectionStatus.Already if the
452             connection was already established.
453 
454         Throws:
455             - SocketError (IOException) on fatal I/O error,
456             - IOWarning if the remote hung up.
457 
458      **************************************************************************/
459 
460     abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false );
461 
462     /**************************************************************************
463 
464         Disconnects from provided address (if connected).
465 
466         Params:
467             force = false: disconnect only if connected; true: force disconnect
468 
469         Returns:
470             the connection status before disconnecting.
471 
472      **************************************************************************/
473 
474     public ConnectionStatus disconnect ( bool force = false )
475     out
476     {
477         assert (!this.connected_);
478     }
479     body
480     {
481         if (this.connected_ || force)
482         {
483             this.onDisconnect();
484             this.socket.shutdown();
485             this.socket.close();
486         }
487 
488         scope (exit) this.connected_ = false;
489 
490         with (ConnectionStatus) return this.connected_?
491                                     Disconnected :
492                                     Already | Disconnected;
493     }
494 
495     /**************************************************************************
496 
497         Establishes a non-blocking socket connection according to the POSIX
498         specification for connect():
499 
500             "If the connection cannot be established immediately and O_NONBLOCK
501             is set for the file descriptor for the socket, connect() shall fail
502             and set errno to [EINPROGRESS], but the connection request shall not
503             be aborted, and the connection shall be established asynchronously.
504             [...]
505             When the connection has been established asynchronously, select()
506             and poll() shall indicate that the file descriptor for the socket is
507             ready for writing."
508 
509         Calls connect_syscall, which should forward to connect() to establish a
510         connection. If connect_syscall returns false, errno is evaluated to
511         obtain the connection status.
512         If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so
513         that this method returns when the socket is ready for writing or throws
514         when a connection error was detected.
515 
516         Params:
517             connect_syscall = should call connect() and return true if connect()
518                               returns 0 or false otherwise
519 
520             force           = false: don't call connect_syscall if currently
521                               connected to the same address and port; true:
522                               always call connect_syscall
523 
524         Returns:
525             - ConnectionStatus.Connected if the connection was newly
526               established, either because connect_syscall returned true or after
527               the socket became ready for writing,
528             - ConnectionStatus.Connected | ConnectionStatus.Already if connect()
529               failed with EISCONN.
530 
531         Throws:
532             - SocketError (IOException) if connect_syscall fails with an error
533               other than EINPROGRESS/EINTR or EISCONN or if a socket error was
534               detected,
535             - IOWarning if the remote hung up.
536 
537         Out:
538             The socket is connected, the returned status is never Disconnected.
539 
540         Note: The POSIX specification says about connect() failing with EINTR:
541 
542             "If connect() is interrupted by a signal that is caught while
543             blocked waiting to establish a connection, connect() shall fail and
544             set errno to EINTR, but the connection request shall not be aborted,
545             and the connection shall be established asynchronously."
546 
547         It remains unclear whether a nonblocking connect() can also fail with
548         EINTR or not. Assuming that, if it is possible, it has the same meaning
549         as for blocking connect(), we handle EINTR in the same way as
550         EINPROGRESS. TODO: Remove handling of EINTR or this note when this is
551         clarified.
552 
553      **************************************************************************/
554 
555     protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force )
556     out (status)
557     {
558         assert (this.connected_);
559         assert (status);
560     }
561     body
562     {
563         // Create a socket if it is currently closed.
564         if (this.socket.fileHandle < 0)
565         {
566             this.connected_ = false;
567 
568             this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0,
569                 "error creating socket", __FILE__, __LINE__);
570 
571             this.initSocket();
572         }
573 
574         if (!this.connected_ || force)
575         {
576             debug ( EpollTiming )
577             {
578                 StopWatch sw;
579                 sw.start;
580 
581                 scope ( success )
582                 {
583                     if ( this.connection_time_dg )
584                     {
585                         this.connection_time_dg(sw.microsec);
586                     }
587                 }
588             }
589 
590             if ( connect_syscall )
591             {
592                 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket",
593                     this.address, this.port).flush();
594                 this.connected_ = true;
595                 return ConnectionStatus.Connected;
596             }
597             else
598             {
599                 int errnum = .errno;
600 
601                 debug ( ISelectClient )
602                 {
603                     import core.stdc.string : strerror;
604                     Stderr.formatln("[{}:{}]: {}",
605                         this.address_, this.port_, StringC.toDString(
606                             strerror(errnum))).flush();
607                 }
608 
609                 switch (errnum)
610                 {
611                     case EISCONN:
612                         this.connected_ = true;
613                         return ConnectionStatus.Already;
614 
615                     case EINTR, // TODO: Might never be reported, see note above.
616                          EINPROGRESS,
617                          EALREADY:
618                          debug ( ISelectClient )
619                          {
620                              Stderr.formatln("[{}:{}]: waiting for the socket to become writable",
621                                  this.address_, this.port_).flush();
622 
623                              scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable",
624                                                             this.address_, this.port_).flush();
625 
626                              scope (success) Stderr.formatln("[{}:{}]: socket has become writable",
627                                                             this.address_, this.port_).flush();
628                          }
629                         this.transmit_calls = 0;
630                         this.transmitLoop();
631                         this.connected_ = true;
632                         return ConnectionStatus.Connected;
633 
634                     default:
635                         throw this.socket_error.setSock(errnum,
636                             "error establishing connection", __FILE__, __LINE__);
637                 }
638             }
639         }
640         else
641         {
642             return ConnectionStatus.Already;
643         }
644     }
645 
646     /***************************************************************************
647 
648         Returns:
649             the IP address of the connected socket, or the last attempted
650             connection, or "" if no connection has been attempted
651 
652      ***************************************************************************/
653 
654     abstract protected cstring address_ ( );
655 
656     /***************************************************************************
657 
658         Returns:
659             the TCP port of the connected socket, or the last attempted
660             connection, or ushort.init if no connection has been attempted
661 
662      ***************************************************************************/
663 
664     abstract protected ushort port_ ( );
665 
666     /***************************************************************************
667 
668         Called just before the socket is connected. The base class
669         implementation does nothing, but derived classes may override to add any
670         desired initialisation logic.
671 
672     ***************************************************************************/
673 
674     protected void initSocket ( )
675     {
676     }
677 
678     /**************************************************************************
679 
680         Disconnection cleanup handler for a subclass
681 
682      **************************************************************************/
683 
684     protected void onDisconnect ( )
685     {
686     }
687 
688     /***************************************************************************
689 
690         Called from super.transmitLoop() in two circumstances:
691             1. Upon the initial call to transmitLoop() in connect(), above.
692             2. After an epoll wait, upon receipt of one or more registered
693                events.
694 
695         Params:
696             events = events reported for socket
697 
698         Returns:
699             Upon first invocation (which occurs automatically when
700             super.transmitLoop() is called, in connect(), above):
701                 * false if connect() returned a code denoting either an error or
702                   a successful connection (meaning there's no need to go into
703                   epoll wait).
704                 * true otherwise, to go into epoll wait.
705 
706             Upon second invocation:
707                 * false to not return to epoll wait.
708 
709     ***************************************************************************/
710 
711     protected override bool transmit ( Event events )
712     {
713         verify(this.transmit_calls <= 1);
714 
715         scope ( exit ) this.transmit_calls++;
716 
717         if ( this.transmit_calls > 0 )
718         {
719             this.warning_e.enforce(!(events & Event.EPOLLHUP),
720                                    "Hangup on connect");
721             return false;
722         }
723         else
724         {
725             return true;
726         }
727     }
728 }