1 /*******************************************************************************
2 
3     Base class for a non-blocking socket connection select client using a
4     fiber/coroutine to suspend operation while waiting for the connection to be
5     established and resume on that event (a Write event signifies that the
6     connection has been established).
7 
8     Copyright:
9         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
10         All rights reserved.
11 
12     License:
13         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
14         Alternatively, this file may be distributed under the terms of the Tango
15         3-Clause BSD License (see LICENSE_BSD.txt for details).
16 
17 *******************************************************************************/
18 
19 module ocean.io.select.protocol.fiber.FiberSocketConnection;
20 
21 
22 
23 
24 import ocean.meta.types.Qualifiers;
25 
26 import ocean.core.Verify;
27 
28 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
29 
30 import ocean.io.select.EpollSelectDispatcher;
31 
32 import ocean.core.Array : copy;
33 
34 import ocean.sys.socket.AddressIPSocket,
35        ocean.sys.socket.InetAddress,
36        ocean.sys.socket.IPSocket: IIPSocket;
37 
38 
39 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
40 
41 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN;
42 
43 debug ( EpollTiming ) import ocean.time.StopWatch;
44 debug ( ISelectClient )
45 {
46     import ocean.io.Stdout : Stderr;
47     import ocean.text.util.StringC;
48 }
49 
50 
51 
52 public class FiberSocketConnection ( bool IPv6 = false ) : IFiberSocketConnection
53 {
54     /**************************************************************************
55 
56         Alias of the address struct type.
57 
58      **************************************************************************/
59 
60     alias .InetAddress!(IPv6) InetAddress;
61 
62     /**************************************************************************
63 
64         Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false)
65         or sockaddr_in6 for IPv6 (IPv6 = true).
66 
67      **************************************************************************/
68 
69     alias InetAddress.Addr InAddr;
70 
71     /**************************************************************************
72 
73         Socket
74 
75      **************************************************************************/
76 
77     alias AddressIPSocket!(IPv6) IPSocket;
78 
79     protected IPSocket socket;
80 
81     /**************************************************************************
82 
83         Constructor.
84 
85         Params:
86             socket = IPSocket instance to use internally
87             fiber = fiber to be suspended when socket connection does not
88                 immediately succeed or fail
89 
90      **************************************************************************/
91 
92     public this ( IPSocket socket, SelectFiber fiber )
93     {
94         this.socket = socket;
95 
96         super(this.socket, fiber);
97     }
98 
99     /**************************************************************************
100 
101         Constructor.
102 
103         warning_e and socket_error may be the same object.
104 
105         Params:
106             socket       = IPSocket instance to use internally
107             fiber        = fiber to be suspended when socket connection does not
108                            immediately succeed or fail
109             warning_e    = exception to be thrown when the remote hung up
110             socket_error = exception to be thrown on socket error
111 
112      **************************************************************************/
113 
114     public this ( IPSocket socket, SelectFiber fiber,
115                   IOWarning warning_e, SocketError socket_error )
116     {
117         super(this.socket = socket, fiber, warning_e, socket_error);
118     }
119 
120     /**************************************************************************
121 
122         Attempts to connect to the remote host, suspending the fiber if
123         establishing the connection does not immediately succeed or fail. If a
124         connection to the same address and port is already established, the
125         Already flag is set in the return value. If a connection to a different
126         address and port is already established, this connection is closed and a
127         new connection is opened.
128 
129         Params:
130             address = remote IP address
131             port    = remote TCP port
132             force   = false: don't call connect() if currently connected to the
133                       same address and port; true: always call connect()
134         Returns:
135             ConnectionStatus.Connected if the connection was newly established
136             or ConnectionStatus.Connected | ConnectionStatus.Already if the
137             connection was already established.
138 
139         Throws:
140             - SocketError (IOException) on fatal I/O error,
141             - IOWarning if the remote hung up.
142 
143      **************************************************************************/
144 
145     override public ConnectionStatus connect ( cstring address, ushort port, bool force = false )
146     {
147         if (!this.sameAddress(address, port))
148         {
149             this.disconnect();
150         }
151 
152         return this.connect_(!this.socket.connect(address, port), force);
153     }
154 
155     /***************************************************************************
156 
157         Ditto.
158 
159         Params:
160             address = remote address
161             force   = false: don't call connect() if currently connected to the
162                       same address and port; true: always call connect()
163 
164         Returns:
165             see connect() above.
166 
167     ***************************************************************************/
168 
169     public ConnectionStatus connect ( InAddr address, bool force = false )
170     {
171         if (this.in_addr != address)
172         {
173             this.disconnect();
174         }
175 
176         return this.connect_(!this.socket.connect(address), force);
177     }
178 
179     /**************************************************************************
180 
181         Returns:
182             the remote address of the connected socket, or the last attempted
183             connection, or an empty address if no connection has been attempted
184 
185      **************************************************************************/
186 
187     public InAddr in_addr ( )
188     {
189         return this.connected_? this.socket.in_addr : InetAddress.addr_init;
190     }
191 
192     /***************************************************************************
193 
194         Returns:
195             the remote IP address of the connected socket, or the last attempted
196             connection, or "" if no connection has been attempted
197 
198     ***************************************************************************/
199 
200     override protected cstring address_ ( )
201     {
202         return this.socket.address;
203     }
204 
205     /***************************************************************************
206 
207         Returns:
208             the remote TCP port of the connected socket, or the last attempted
209             connection, or ushort.init if no connection has been attempted
210 
211     ***************************************************************************/
212 
213     override protected ushort port_ ( )
214     {
215         return this.socket.port;
216     }
217 
218     /***************************************************************************
219 
220         Compares ip_address_str and port with the current address and port.
221 
222         Params:
223             ip_address_str = string with the IP address to compare with the
224                              current address
225             port           = port to compare with the current
226 
227         Returns:
228             true if ip_address_str and port are the same as the current or false
229             if not.
230 
231         Throws:
232             SocketError if ip_address_str does not contain a valid IP address.
233 
234      ***************************************************************************/
235 
236     public bool sameAddress ( InAddr addr )
237     {
238         return addr == this.socket.in_addr;
239     }
240 
241     /***************************************************************************
242 
243         Compares ip_address_str and port with the current address and port.
244 
245         Params:
246             ip_address_str = string with the IP address to compare with the
247                              current address
248             port           = port to compare with the current
249 
250         Returns:
251             true if ip_address_str and port are the same as the current or false
252             if not.
253 
254         Throws:
255             SocketError if ip_address_str does not contain a valid IP address.
256 
257     ***************************************************************************/
258 
259     public bool sameAddress ( cstring ip_address_str, ushort port )
260     {
261         InetAddress address;
262 
263         this.socket_error.enforce(address.inet_pton(ip_address_str) == 1,
264             "invalid IP address");
265 
266         address.port = port;
267 
268         return this.sameAddress(address.addr);
269     }
270 }
271 
272 ///
273 unittest
274 {
275     alias FiberSocketConnection!(true) IPV6;
276     alias FiberSocketConnection!(false) IPV4;
277 }
278 
279 
280 public class IFiberSocketConnection : IFiberSelectProtocol
281 {
282     /**************************************************************************
283 
284         This alias for chainable methods
285 
286      **************************************************************************/
287 
288     public alias typeof (this) This;
289 
290     /**************************************************************************
291 
292         Connection status as returned by connect() and disconnect().
293 
294      **************************************************************************/
295 
296     public enum ConnectionStatus : uint
297     {
298         Disconnected = 0,
299         Connected    = 1 << 0,
300         Already      = 1 << 1
301     }
302 
303     /**************************************************************************
304 
305         Socket
306 
307      **************************************************************************/
308 
309     protected IIPSocket socket;
310 
311     /**************************************************************************
312 
313         Socket error exception
314 
315      **************************************************************************/
316 
317     private SocketError socket_error;
318 
319     /**************************************************************************
320 
321         Current connection status
322 
323      **************************************************************************/
324 
325     protected bool connected_ = false;
326 
327     /**************************************************************************
328 
329         Count of the number of times transmit() has been invoked since the call
330         to super.transmitLoop.
331 
332      **************************************************************************/
333 
334     private uint transmit_calls;
335 
336     /**************************************************************************
337 
338         Delegate which is called (in EpollTiming debug mode) after a socket
339         connection is established.
340 
341         FIXME: the logging of connection times was intended to be done directly
342         in this module, not via a delegate, but dmd bugs with varargs made this
343         impossible. The delegate solution is ok though.
344 
345      **************************************************************************/
346 
347     debug ( EpollTiming )
348     {
349         private alias void delegate ( ulong microsec ) ConnectionTimeDg;
350         public ConnectionTimeDg connection_time_dg;
351     }
352 
353     /**************************************************************************
354 
355         Constructor.
356 
357         warning_e and socket_error may be the same object.
358 
359         Params:
360             socket       = IPSocket instance to use internally
361             fiber        = fiber to be suspended when socket connection does not
362                            immediately succeed or fail
363             warning_e    = exception to be thrown when the remote hung up
364             socket_error = exception to be thrown on socket error
365 
366      **************************************************************************/
367 
368     protected this ( IIPSocket socket, SelectFiber fiber,
369                      IOWarning warning_e, SocketError socket_error )
370     {
371         this.socket = socket;
372 
373         this.socket_error = socket_error;
374 
375         super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error);
376     }
377 
378     /**************************************************************************
379 
380         Constructor.
381 
382         Params:
383             socket       = IPSocket instance to use internally
384             fiber        = fiber to be suspended when socket connection does not
385                            immediately succeed or fail
386 
387      **************************************************************************/
388 
389     protected this ( IIPSocket socket, SelectFiber fiber )
390     {
391         this(socket, fiber, new IOWarning(socket), new SocketError(socket));
392     }
393 
394     /**************************************************************************
395 
396         Returns:
397             true if the socket is currently connected or false if not.
398 
399      **************************************************************************/
400 
401     public bool connected ( )
402     {
403         return this.connected_;
404     }
405 
406     /***************************************************************************
407 
408         Returns:
409             the IP address of the connected socket, or the last attempted
410             connection, or "" if no connection has been attempted
411 
412      ***************************************************************************/
413 
414     public cstring address ( )
415     {
416         return this.connected_? this.address_ : "";
417     }
418 
419     /***************************************************************************
420 
421         Returns:
422             the TCP port of the connected socket, or the last attempted
423             connection, or ushort.init if no connection has been attempted
424 
425      ***************************************************************************/
426 
427     public ushort port ( )
428     {
429         return this.connected_? this.port_ : ushort.init;
430     }
431 
432     /**************************************************************************
433 
434         Attempts to connect to the remote host, suspending the fiber if
435         establishing the connection does not immediately succeed or fail. If a
436         connection to the same address and port is already established, the
437         Already flag is set in the return value. If a connection to a different
438         address and port is already established, this connection is closed and a
439         new connection is opened.
440 
441         Params:
442             address = remote IP address
443             port    = remote TCP port
444             force   = false: don't call connect() if currently connected to the
445                       same address and port; true: always call connect()
446 
447         Returns:
448             ConnectionStatus.Connected if the connection was newly established
449             or ConnectionStatus.Connected | ConnectionStatus.Already if the
450             connection was already established.
451 
452         Throws:
453             - SocketError (IOException) on fatal I/O error,
454             - IOWarning if the remote hung up.
455 
456      **************************************************************************/
457 
458     abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false );
459 
460     /**************************************************************************
461 
462         Disconnects from provided address (if connected).
463 
464         Params:
465             force = false: disconnect only if connected; true: force disconnect
466 
467         Returns:
468             the connection status before disconnecting.
469 
470      **************************************************************************/
471 
472     public ConnectionStatus disconnect ( bool force = false )
473     out
474     {
475         assert (!this.connected_);
476     }
477     do
478     {
479         if (this.connected_ || force)
480         {
481             this.onDisconnect();
482             this.socket.shutdown();
483             this.socket.close();
484         }
485 
486         scope (exit) this.connected_ = false;
487 
488         with (ConnectionStatus) return this.connected_?
489                                     Disconnected :
490                                     Already | Disconnected;
491     }
492 
493     /**************************************************************************
494 
495         Establishes a non-blocking socket connection according to the POSIX
496         specification for connect():
497 
498             "If the connection cannot be established immediately and O_NONBLOCK
499             is set for the file descriptor for the socket, connect() shall fail
500             and set errno to [EINPROGRESS], but the connection request shall not
501             be aborted, and the connection shall be established asynchronously.
502             [...]
503             When the connection has been established asynchronously, select()
504             and poll() shall indicate that the file descriptor for the socket is
505             ready for writing."
506 
507         Calls connect_syscall, which should forward to connect() to establish a
508         connection. If connect_syscall returns false, errno is evaluated to
509         obtain the connection status.
510         If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so
511         that this method returns when the socket is ready for writing or throws
512         when a connection error was detected.
513 
514         Params:
515             connect_syscall = should call connect() and return true if connect()
516                               returns 0 or false otherwise
517 
518             force           = false: don't call connect_syscall if currently
519                               connected to the same address and port; true:
520                               always call connect_syscall
521 
522         Returns:
523             - ConnectionStatus.Connected if the connection was newly
524               established, either because connect_syscall returned true or after
525               the socket became ready for writing,
526             - ConnectionStatus.Connected | ConnectionStatus.Already if connect()
527               failed with EISCONN.
528 
529         Throws:
530             - SocketError (IOException) if connect_syscall fails with an error
531               other than EINPROGRESS/EINTR or EISCONN or if a socket error was
532               detected,
533             - IOWarning if the remote hung up.
534 
535         Out:
536             The socket is connected, the returned status is never Disconnected.
537 
538         Note: The POSIX specification says about connect() failing with EINTR:
539 
540             "If connect() is interrupted by a signal that is caught while
541             blocked waiting to establish a connection, connect() shall fail and
542             set errno to EINTR, but the connection request shall not be aborted,
543             and the connection shall be established asynchronously."
544 
545         It remains unclear whether a nonblocking connect() can also fail with
546         EINTR or not. Assuming that, if it is possible, it has the same meaning
547         as for blocking connect(), we handle EINTR in the same way as
548         EINPROGRESS. TODO: Remove handling of EINTR or this note when this is
549         clarified.
550 
551      **************************************************************************/
552 
553     protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force )
554     out (status)
555     {
556         assert (this.connected_);
557         assert (status);
558     }
559     do
560     {
561         // Create a socket if it is currently closed.
562         if (this.socket.fileHandle < 0)
563         {
564             this.connected_ = false;
565 
566             this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0,
567                 "error creating socket", __FILE__, __LINE__);
568 
569             this.initSocket();
570         }
571 
572         if (!this.connected_ || force)
573         {
574             debug ( EpollTiming )
575             {
576                 StopWatch sw;
577                 sw.start;
578 
579                 scope ( success )
580                 {
581                     if ( this.connection_time_dg )
582                     {
583                         this.connection_time_dg(sw.microsec);
584                     }
585                 }
586             }
587 
588             if ( connect_syscall )
589             {
590                 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket",
591                     this.address, this.port).flush();
592                 this.connected_ = true;
593                 return ConnectionStatus.Connected;
594             }
595             else
596             {
597                 int errnum = .errno;
598 
599                 debug ( ISelectClient )
600                 {
601                     import core.stdc..string : strerror;
602                     Stderr.formatln("[{}:{}]: {}",
603                         this.address_, this.port_, StringC.toDString(
604                             strerror(errnum))).flush();
605                 }
606 
607                 switch (errnum)
608                 {
609                     case EISCONN:
610                         this.connected_ = true;
611                         return ConnectionStatus.Already;
612 
613                     case EINTR, // TODO: Might never be reported, see note above.
614                          EINPROGRESS,
615                          EALREADY:
616                          debug ( ISelectClient )
617                          {
618                              Stderr.formatln("[{}:{}]: waiting for the socket to become writable",
619                                  this.address_, this.port_).flush();
620 
621                              scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable",
622                                                             this.address_, this.port_).flush();
623 
624                              scope (success) Stderr.formatln("[{}:{}]: socket has become writable",
625                                                             this.address_, this.port_).flush();
626                          }
627                         this.transmit_calls = 0;
628                         this.transmitLoop();
629                         this.connected_ = true;
630                         return ConnectionStatus.Connected;
631 
632                     default:
633                         throw this.socket_error.setSock(errnum,
634                             "error establishing connection", __FILE__, __LINE__);
635                 }
636             }
637         }
638         else
639         {
640             return ConnectionStatus.Already;
641         }
642     }
643 
644     /***************************************************************************
645 
646         Returns:
647             the IP address of the connected socket, or the last attempted
648             connection, or "" if no connection has been attempted
649 
650      ***************************************************************************/
651 
652     abstract protected cstring address_ ( );
653 
654     /***************************************************************************
655 
656         Returns:
657             the TCP port of the connected socket, or the last attempted
658             connection, or ushort.init if no connection has been attempted
659 
660      ***************************************************************************/
661 
662     abstract protected ushort port_ ( );
663 
664     /***************************************************************************
665 
666         Called just before the socket is connected. The base class
667         implementation does nothing, but derived classes may override to add any
668         desired initialisation logic.
669 
670     ***************************************************************************/
671 
672     protected void initSocket ( )
673     {
674     }
675 
676     /**************************************************************************
677 
678         Disconnection cleanup handler for a subclass
679 
680      **************************************************************************/
681 
682     protected void onDisconnect ( )
683     {
684     }
685 
686     /***************************************************************************
687 
688         Called from super.transmitLoop() in two circumstances:
689             1. Upon the initial call to transmitLoop() in connect(), above.
690             2. After an epoll wait, upon receipt of one or more registered
691                events.
692 
693         Params:
694             events = events reported for socket
695 
696         Returns:
697             Upon first invocation (which occurs automatically when
698             super.transmitLoop() is called, in connect(), above):
699                 * false if connect() returned a code denoting either an error or
700                   a successful connection (meaning there's no need to go into
701                   epoll wait).
702                 * true otherwise, to go into epoll wait.
703 
704             Upon second invocation:
705                 * false to not return to epoll wait.
706 
707     ***************************************************************************/
708 
709     protected override bool transmit ( Event events )
710     {
711         verify(this.transmit_calls <= 1);
712 
713         scope ( exit ) this.transmit_calls++;
714 
715         if ( this.transmit_calls > 0 )
716         {
717             this.warning_e.enforce(!(events & Event.EPOLLHUP),
718                                    "Hangup on connect");
719             return false;
720         }
721         else
722         {
723             return true;
724         }
725     }
726 }