1 /*******************************************************************************
2 
3     Base class for a non-blocking socket connection select client using a
4     fiber/coroutine to suspend operation while waiting for the connection to be
5     established and resume on that event (a Write event signifies that the
6     connection has been established).
7 
8     Copyright:
9         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
10         All rights reserved.
11 
12     License:
13         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
14         Alternatively, this file may be distributed under the terms of the Tango
15         3-Clause BSD License (see LICENSE_BSD.txt for details).
16 
17 *******************************************************************************/
18 
19 module ocean.io.select.protocol.fiber.FiberSocketConnection;
20 
21 import ocean.core.Array : copy;
22 import ocean.core.Verify;
23 import ocean.io.select.EpollSelectDispatcher;
24 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
25 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
26 import ocean.meta.types.Qualifiers;
27 import ocean.sys.socket.AddressIPSocket;
28 import ocean.sys.socket.InetAddress;
29 import ocean.sys.socket.IPSocket: IIPSocket;
30 debug (EpollTiming) import ocean.time.StopWatch;
31 debug (ISelectClient)
32 {
33     import ocean.io.Stdout : Stderr;
34     import ocean.text.util.StringC;
35 }
36 import core.stdc.errno: errno, EINPROGRESS, EINTR, EALREADY, EISCONN;
37 
38 
39 /// Ditto
40 public class FiberSocketConnection (bool IPv6 = false) : IFiberSocketConnection
41 {
42     /**************************************************************************
43 
44         Alias of the address struct type.
45 
46      **************************************************************************/
47 
48     alias .InetAddress!(IPv6) InetAddress;
49 
50     /**************************************************************************
51 
52         Alias of the binary address type, sockaddr_in for IPv4 (IPv6 = false)
53         or sockaddr_in6 for IPv6 (IPv6 = true).
54 
55      **************************************************************************/
56 
57     alias InetAddress.Addr InAddr;
58 
59     /**************************************************************************
60 
61         Socket
62 
63      **************************************************************************/
64 
65     alias AddressIPSocket!(IPv6) IPSocket;
66 
67     protected IPSocket socket;
68 
69     /**************************************************************************
70 
71         Constructor.
72 
73         Params:
74             socket = IPSocket instance to use internally
75             fiber = fiber to be suspended when socket connection does not
76                 immediately succeed or fail
77 
78      **************************************************************************/
79 
80     public this ( IPSocket socket, SelectFiber fiber )
81     {
82         this.socket = socket;
83 
84         super(this.socket, fiber);
85     }
86 
87     /**************************************************************************
88 
89         Constructor.
90 
91         warning_e and socket_error may be the same object.
92 
93         Params:
94             socket       = IPSocket instance to use internally
95             fiber        = fiber to be suspended when socket connection does not
96                            immediately succeed or fail
97             warning_e    = exception to be thrown when the remote hung up
98             socket_error = exception to be thrown on socket error
99 
100      **************************************************************************/
101 
102     public this ( IPSocket socket, SelectFiber fiber,
103                   IOWarning warning_e, SocketError socket_error )
104     {
105         super(this.socket = socket, fiber, warning_e, socket_error);
106     }
107 
108     /**************************************************************************
109 
110         Attempts to connect to the remote host, suspending the fiber if
111         establishing the connection does not immediately succeed or fail. If a
112         connection to the same address and port is already established, the
113         Already flag is set in the return value. If a connection to a different
114         address and port is already established, this connection is closed and a
115         new connection is opened.
116 
117         Params:
118             address = remote IP address
119             port    = remote TCP port
120             force   = false: don't call connect() if currently connected to the
121                       same address and port; true: always call connect()
122         Returns:
123             ConnectionStatus.Connected if the connection was newly established
124             or ConnectionStatus.Connected | ConnectionStatus.Already if the
125             connection was already established.
126 
127         Throws:
128             - SocketError (IOException) on fatal I/O error,
129             - IOWarning if the remote hung up.
130 
131      **************************************************************************/
132 
133     override public ConnectionStatus connect ( cstring address, ushort port, bool force = false )
134     {
135         if (!this.sameAddress(address, port))
136         {
137             this.disconnect();
138         }
139 
140         return this.connect_(!this.socket.connect(address, port), force);
141     }
142 
143     /***************************************************************************
144 
145         Ditto.
146 
147         Params:
148             address = remote address
149             force   = false: don't call connect() if currently connected to the
150                       same address and port; true: always call connect()
151 
152         Returns:
153             see connect() above.
154 
155     ***************************************************************************/
156 
157     public ConnectionStatus connect ( InAddr address, bool force = false )
158     {
159         if (this.in_addr != address)
160         {
161             this.disconnect();
162         }
163 
164         return this.connect_(!this.socket.connect(address), force);
165     }
166 
167     /**************************************************************************
168 
169         Returns:
170             the remote address of the connected socket, or the last attempted
171             connection, or an empty address if no connection has been attempted
172 
173      **************************************************************************/
174 
175     public InAddr in_addr ( )
176     {
177         return this.connected_? this.socket.in_addr : InetAddress.addr_init;
178     }
179 
180     /***************************************************************************
181 
182         Returns:
183             the remote IP address of the connected socket, or the last attempted
184             connection, or "" if no connection has been attempted
185 
186     ***************************************************************************/
187 
188     override protected cstring address_ ( )
189     {
190         return this.socket.address;
191     }
192 
193     /***************************************************************************
194 
195         Returns:
196             the remote TCP port of the connected socket, or the last attempted
197             connection, or ushort.init if no connection has been attempted
198 
199     ***************************************************************************/
200 
201     override protected ushort port_ ( )
202     {
203         return this.socket.port;
204     }
205 
206     /***************************************************************************
207 
208         Compares ip_address_str and port with the current address and port.
209 
210         Params:
211             ip_address_str = string with the IP address to compare with the
212                              current address
213             port           = port to compare with the current
214 
215         Returns:
216             true if ip_address_str and port are the same as the current or false
217             if not.
218 
219         Throws:
220             SocketError if ip_address_str does not contain a valid IP address.
221 
222      ***************************************************************************/
223 
224     public bool sameAddress ( InAddr addr )
225     {
226         return addr == this.socket.in_addr;
227     }
228 
229     /***************************************************************************
230 
231         Compares ip_address_str and port with the current address and port.
232 
233         Params:
234             ip_address_str = string with the IP address to compare with the
235                              current address
236             port           = port to compare with the current
237 
238         Returns:
239             true if ip_address_str and port are the same as the current or false
240             if not.
241 
242         Throws:
243             SocketError if ip_address_str does not contain a valid IP address.
244 
245     ***************************************************************************/
246 
247     public bool sameAddress ( cstring ip_address_str, ushort port )
248     {
249         InetAddress address;
250 
251         this.socket_error.enforce(address.inet_pton(ip_address_str) == 1,
252             "invalid IP address");
253 
254         address.port = port;
255 
256         return this.sameAddress(address.addr);
257     }
258 }
259 
260 ///
261 unittest
262 {
263     alias FiberSocketConnection!(true) IPV6;
264     alias FiberSocketConnection!(false) IPV4;
265 }
266 
267 
268 public class IFiberSocketConnection : IFiberSelectProtocol
269 {
270     /**************************************************************************
271 
272         This alias for chainable methods
273 
274      **************************************************************************/
275 
276     public alias typeof (this) This;
277 
278     /**************************************************************************
279 
280         Connection status as returned by connect() and disconnect().
281 
282      **************************************************************************/
283 
284     public enum ConnectionStatus : uint
285     {
286         Disconnected = 0,
287         Connected    = 1 << 0,
288         Already      = 1 << 1
289     }
290 
291     /**************************************************************************
292 
293         Socket
294 
295      **************************************************************************/
296 
297     protected IIPSocket socket;
298 
299     /**************************************************************************
300 
301         Socket error exception
302 
303      **************************************************************************/
304 
305     private SocketError socket_error;
306 
307     /**************************************************************************
308 
309         Current connection status
310 
311      **************************************************************************/
312 
313     protected bool connected_ = false;
314 
315     /**************************************************************************
316 
317         Count of the number of times transmit() has been invoked since the call
318         to super.transmitLoop.
319 
320      **************************************************************************/
321 
322     private uint transmit_calls;
323 
324     /**************************************************************************
325 
326         Delegate which is called (in EpollTiming debug mode) after a socket
327         connection is established.
328 
329         FIXME: the logging of connection times was intended to be done directly
330         in this module, not via a delegate, but dmd bugs with varargs made this
331         impossible. The delegate solution is ok though.
332 
333      **************************************************************************/
334 
335     debug ( EpollTiming )
336     {
337         private alias void delegate ( ulong microsec ) ConnectionTimeDg;
338         public ConnectionTimeDg connection_time_dg;
339     }
340 
341     /**************************************************************************
342 
343         Constructor.
344 
345         warning_e and socket_error may be the same object.
346 
347         Params:
348             socket       = IPSocket instance to use internally
349             fiber        = fiber to be suspended when socket connection does not
350                            immediately succeed or fail
351             warning_e    = exception to be thrown when the remote hung up
352             socket_error = exception to be thrown on socket error
353 
354      **************************************************************************/
355 
356     protected this ( IIPSocket socket, SelectFiber fiber,
357                      IOWarning warning_e, SocketError socket_error )
358     {
359         this.socket = socket;
360 
361         this.socket_error = socket_error;
362 
363         super(socket, Event.EPOLLOUT, fiber, warning_e, socket_error);
364     }
365 
366     /**************************************************************************
367 
368         Constructor.
369 
370         Params:
371             socket       = IPSocket instance to use internally
372             fiber        = fiber to be suspended when socket connection does not
373                            immediately succeed or fail
374 
375      **************************************************************************/
376 
377     protected this ( IIPSocket socket, SelectFiber fiber )
378     {
379         this(socket, fiber, new IOWarning(socket), new SocketError(socket));
380     }
381 
382     /**************************************************************************
383 
384         Returns:
385             true if the socket is currently connected or false if not.
386 
387      **************************************************************************/
388 
389     public bool connected ( )
390     {
391         return this.connected_;
392     }
393 
394     /***************************************************************************
395 
396         Returns:
397             the IP address of the connected socket, or the last attempted
398             connection, or "" if no connection has been attempted
399 
400      ***************************************************************************/
401 
402     public cstring address ( )
403     {
404         return this.connected_? this.address_ : "";
405     }
406 
407     /***************************************************************************
408 
409         Returns:
410             the TCP port of the connected socket, or the last attempted
411             connection, or ushort.init if no connection has been attempted
412 
413      ***************************************************************************/
414 
415     public ushort port ( )
416     {
417         return this.connected_? this.port_ : ushort.init;
418     }
419 
420     /**************************************************************************
421 
422         Attempts to connect to the remote host, suspending the fiber if
423         establishing the connection does not immediately succeed or fail. If a
424         connection to the same address and port is already established, the
425         Already flag is set in the return value. If a connection to a different
426         address and port is already established, this connection is closed and a
427         new connection is opened.
428 
429         Params:
430             address = remote IP address
431             port    = remote TCP port
432             force   = false: don't call connect() if currently connected to the
433                       same address and port; true: always call connect()
434 
435         Returns:
436             ConnectionStatus.Connected if the connection was newly established
437             or ConnectionStatus.Connected | ConnectionStatus.Already if the
438             connection was already established.
439 
440         Throws:
441             - SocketError (IOException) on fatal I/O error,
442             - IOWarning if the remote hung up.
443 
444      **************************************************************************/
445 
446     abstract public ConnectionStatus connect ( cstring address, ushort port, bool force = false );
447 
448     /**************************************************************************
449 
450         Disconnects from provided address (if connected).
451 
452         Params:
453             force = false: disconnect only if connected; true: force disconnect
454 
455         Returns:
456             the connection status before disconnecting.
457 
458      **************************************************************************/
459 
460     public ConnectionStatus disconnect ( bool force = false )
461     out
462     {
463         assert (!this.connected_);
464     }
465     do
466     {
467         if (this.connected_ || force)
468         {
469             this.onDisconnect();
470             this.socket.shutdown();
471             this.socket.close();
472         }
473 
474         scope (exit) this.connected_ = false;
475 
476         with (ConnectionStatus) return this.connected_?
477                                     Disconnected :
478                                     Already | Disconnected;
479     }
480 
481     /**************************************************************************
482 
483         Establishes a non-blocking socket connection according to the POSIX
484         specification for connect():
485 
486             "If the connection cannot be established immediately and O_NONBLOCK
487             is set for the file descriptor for the socket, connect() shall fail
488             and set errno to [EINPROGRESS], but the connection request shall not
489             be aborted, and the connection shall be established asynchronously.
490             [...]
491             When the connection has been established asynchronously, select()
492             and poll() shall indicate that the file descriptor for the socket is
493             ready for writing."
494 
495         Calls connect_syscall, which should forward to connect() to establish a
496         connection. If connect_syscall returns false, errno is evaluated to
497         obtain the connection status.
498         If it is EINPROGRESS (or EINTR, see below) the fiber is suspended so
499         that this method returns when the socket is ready for writing or throws
500         when a connection error was detected.
501 
502         Params:
503             connect_syscall = should call connect() and return true if connect()
504                               returns 0 or false otherwise
505 
506             force           = false: don't call connect_syscall if currently
507                               connected to the same address and port; true:
508                               always call connect_syscall
509 
510         Returns:
511             - ConnectionStatus.Connected if the connection was newly
512               established, either because connect_syscall returned true or after
513               the socket became ready for writing,
514             - ConnectionStatus.Connected | ConnectionStatus.Already if connect()
515               failed with EISCONN.
516 
517         Throws:
518             - SocketError (IOException) if connect_syscall fails with an error
519               other than EINPROGRESS/EINTR or EISCONN or if a socket error was
520               detected,
521             - IOWarning if the remote hung up.
522 
523         Out:
524             The socket is connected, the returned status is never Disconnected.
525 
526         Note: The POSIX specification says about connect() failing with EINTR:
527 
528             "If connect() is interrupted by a signal that is caught while
529             blocked waiting to establish a connection, connect() shall fail and
530             set errno to EINTR, but the connection request shall not be aborted,
531             and the connection shall be established asynchronously."
532 
533         It remains unclear whether a nonblocking connect() can also fail with
534         EINTR or not. Assuming that, if it is possible, it has the same meaning
535         as for blocking connect(), we handle EINTR in the same way as
536         EINPROGRESS. TODO: Remove handling of EINTR or this note when this is
537         clarified.
538 
539      **************************************************************************/
540 
541     protected ConnectionStatus connect_ ( lazy bool connect_syscall, bool force )
542     out (status)
543     {
544         assert (this.connected_);
545         assert (status);
546     }
547     do
548     {
549         // Create a socket if it is currently closed.
550         if (this.socket.fileHandle < 0)
551         {
552             this.connected_ = false;
553             this.socket_error.assertExSock(this.socket.tcpSocket(true) >= 0,
554                 "error creating socket", __FILE__, __LINE__);
555             this.initSocket();
556         }
557 
558         if (!this.connected_ || force)
559         {
560             debug ( EpollTiming )
561             {
562                 StopWatch sw;
563                 sw.start;
564 
565                 scope ( success )
566                 {
567                     if ( this.connection_time_dg )
568                     {
569                         this.connection_time_dg(sw.microsec);
570                     }
571                 }
572             }
573 
574             if ( connect_syscall )
575             {
576                 debug ( ISelectClient ) Stderr.formatln("[{}:{}]: Connected to socket",
577                     this.address, this.port).flush();
578                 this.connected_ = true;
579                 return ConnectionStatus.Connected;
580             }
581             else
582             {
583                 int errnum = .errno;
584 
585                 debug ( ISelectClient )
586                 {
587                     import core.stdc.string : strerror;
588                     Stderr.formatln("[{}:{}]: {}",
589                         this.address_, this.port_, StringC.toDString(
590                             strerror(errnum))).flush();
591                 }
592 
593                 switch (errnum)
594                 {
595                     case EISCONN:
596                         this.connected_ = true;
597                         return ConnectionStatus.Already;
598 
599                     case EINTR, // TODO: Might never be reported, see note above.
600                          EINPROGRESS,
601                          EALREADY:
602                          debug ( ISelectClient )
603                          {
604                              Stderr.formatln("[{}:{}]: waiting for the socket to become writable",
605                                  this.address_, this.port_).flush();
606 
607                              scope (failure) Stderr.formatln("[{}:{}]: error while waiting for the socket to become writable",
608                                                             this.address_, this.port_).flush();
609 
610                              scope (success) Stderr.formatln("[{}:{}]: socket has become writable",
611                                                             this.address_, this.port_).flush();
612                          }
613                         this.transmit_calls = 0;
614                         this.transmitLoop();
615                         this.connected_ = true;
616                         return ConnectionStatus.Connected;
617 
618                     default:
619                         throw this.socket_error.setSock(errnum,
620                             "error establishing connection", __FILE__, __LINE__);
621                 }
622             }
623         }
624         else
625         {
626             return ConnectionStatus.Already;
627         }
628     }
629 
630     /***************************************************************************
631 
632         Returns:
633             the IP address of the connected socket, or the last attempted
634             connection, or "" if no connection has been attempted
635 
636      ***************************************************************************/
637 
638     abstract protected cstring address_ ( );
639 
640     /***************************************************************************
641 
642         Returns:
643             the TCP port of the connected socket, or the last attempted
644             connection, or ushort.init if no connection has been attempted
645 
646      ***************************************************************************/
647 
648     abstract protected ushort port_ ( );
649 
650     /***************************************************************************
651 
652         Called just before the socket is connected. The base class
653         implementation does nothing, but derived classes may override to add any
654         desired initialisation logic.
655 
656     ***************************************************************************/
657 
658     protected void initSocket ( )
659     {
660     }
661 
662     /**************************************************************************
663 
664         Disconnection cleanup handler for a subclass
665 
666      **************************************************************************/
667 
668     protected void onDisconnect ( )
669     {
670     }
671 
672     /***************************************************************************
673 
674         Called from super.transmitLoop() in two circumstances:
675             1. Upon the initial call to transmitLoop() in connect(), above.
676             2. After an epoll wait, upon receipt of one or more registered
677                events.
678 
679         Params:
680             events = events reported for socket
681 
682         Returns:
683             Upon first invocation (which occurs automatically when
684             super.transmitLoop() is called, in connect(), above):
685                 * false if connect() returned a code denoting either an error or
686                   a successful connection (meaning there's no need to go into
687                   epoll wait).
688                 * true otherwise, to go into epoll wait.
689 
690             Upon second invocation:
691                 * false to not return to epoll wait.
692 
693     ***************************************************************************/
694 
695     protected override bool transmit ( Event events )
696     {
697         verify(this.transmit_calls <= 1);
698 
699         scope ( exit ) this.transmit_calls++;
700 
701         if ( this.transmit_calls > 0 )
702         {
703             this.warning_e.enforce(!(events & Event.EPOLLHUP),
704                                    "Hangup on connect");
705             return false;
706         }
707         else
708         {
709             return true;
710         }
711     }
712 }