1 /******************************************************************************
2 
3     Server socket listener using multiplexed non-blocking socket I/O
4 
5     Creates a server socket and a pool of connection handlers and registers
6     the server socket for incoming connection in a provided SelectDispatcher
7     instance. When a connection comes in, takes an IConnectionHandler instance
8     from the pool and assigns the incoming connection to the handler's socket.
9 
10     Usage example:
11         see documented unittest of SelectListener
12 
13     Copyright:
14         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
15         All rights reserved.
16 
17     License:
18         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
19         Alternatively, this file may be distributed under the terms of the Tango
20         3-Clause BSD License (see LICENSE_BSD.txt for details).
21 
22  ******************************************************************************/
23 
24 module ocean.net.server.SelectListener;
25 
26 
27 import ocean.meta.types.Qualifiers;
28 import ocean.core.Verify;
29 import ocean.io.select.client.model.ISelectClient;
30 import ocean.net.server.connection.IConnectionHandler;
31 import ocean.net.server.connpool.SelectListenerPool;
32 import ocean.net.server.connpool.ISelectListenerPoolInfo;
33 import ocean.util.container.pool.model.IPoolInfo;
34 import ocean.text.convert.Formatter;
35 import ocean.sys.socket.model.ISocket;
36 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
37 import ocean.util.log.Logger;
38 import core.stdc.errno: errno;
39 
40 /*******************************************************************************
41 
42     Static module logger
43 
44 *******************************************************************************/
45 
46 private Logger log;
47 static this ( )
48 {
49     log = Log.lookup("ocean.net.server.SelectListener");
50 }
51 
52 /******************************************************************************
53 
54     SelectListener base class
55 
56     Contains all base functionality which is not related to the particular
57     IConnectionHandler subclass used in SelectListener.
58 
59  ******************************************************************************/
60 
61 abstract class ISelectListener : ISelectClient
62 {
63     import core.sys.posix.netinet.in_: SOCK_STREAM;
64     import core.sys.posix.sys.socket: accept, SOL_SOCKET, SO_REUSEADDR, sockaddr;
65     import core.sys.posix.unistd:     close;
66 
67     /**************************************************************************
68 
69         Socket, memorises the address most recently passed to bind() or
70         connect() or obtained by accept().
71 
72      **************************************************************************/
73 
74     private ISocket socket;
75 
76     /**************************************************************************
77 
78         Termination flag; true prevents accepting new connections
79 
80      **************************************************************************/
81 
82     private bool terminated = false;
83 
84     /**************************************************************************
85 
86         Exception instance thrown in case of socket errors.
87 
88      **************************************************************************/
89 
90     private SocketError e;
91 
92     /**************************************************************************
93 
94         Constructor
95 
96         Creates the server socket -- a streaming socket of the family (aka
97         "domain") specified in `address` and the protocol according to
98         `protocol` -- and registers it for incoming connections.
99 
100         `address.sa_family` and `protocol` are passed to `socket(2)` together
101         with the `SOCK_STREAM` type, so the socket family defined by
102         `address.sa_family` is required to support streaming, and `protocol`
103         needs to be a streaming protocol. Socket families supporting streaming
104         include IPv4/IPv6 and UNIX domain sockets (`AF_LOCAL`).
105         `protocol == 0` makes `socket(2)` pick the default streaming protocol
106         for `address.sa_family`. For IPv4/IPv6 the default protocol is TCP.
107         UNIX domain sockets and some other families support only one streaming
108         protocol so for those the default is unambiguous.
109 
110         Standards:
111         Posix = http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html
112         Linux = http://linux.die.net/man/2/socket
113 
114         Params:
115             address    = the socket address and family, must support streaming
116             socket     = the server socket
117             backlog    = the maximum length to which the queue of pending
118                 connections for sockfd may grow. If a connection request arrives
119                 when the queue is full, the client may receive an error with an
120                 indication of ECONNREFUSED or, if the underlying protocol
121                 supports retransmission, the request may be ignored so that a
122                 later reattempt at connection succeeds.
123                 (from http://linux.die.net/man/2/listen)
124             protocol   = the socket protocol, for a streaming socket of the
125                 family specified in address, or 0 to use the default protocol
126                 for a streaming socket of the specified family
127 
128      **************************************************************************/
129 
130     protected this ( sockaddr* address, ISocket socket, int backlog = 32,
131         int protocol= 0)
132     {
133         this.socket = socket;
134 
135         this.e = new SocketError(this.socket);
136 
137         // SOCK_NONBLOCK is a Linux extension, which can be combined with the
138         // actual second argument (SOCK_STREAM) in order to mark the
139         // I/O operations to the socket as non-blocking.
140         this.e.enforce(
141             this.socket.socket(address.sa_family,
142                 SOCK_STREAM | SocketFlags.SOCK_NONBLOCK,
143                 protocol) >= 0,
144             "error creating socket"
145         );
146 
147         this.e.enforce(
148             !this.socket.setsockoptVal(SOL_SOCKET, SO_REUSEADDR, true),
149             "error enabling reuse of address"
150         );
151 
152         this.e.assertExSock(!this.socket.bind(address),
153                             "error binding socket", __FILE__, __LINE__);
154 
155         this.e.assertExSock(!this.socket.listen(backlog),
156                             "error listening on socket", __FILE__, __LINE__);
157     }
158 
159     /**************************************************************************
160 
161         Implements ISelectClient abstract method.
162 
163         Returns:
164             events to register the conduit for.
165 
166      **************************************************************************/
167 
168     public override Event events ( )
169     {
170         return Event.EPOLLIN;
171     }
172 
173     /**************************************************************************
174 
175         Implements ISelectClient abstract method.
176 
177         Returns:
178             conduit's OS file handle (fd)
179 
180      **************************************************************************/
181 
182     public override Handle fileHandle ( )
183     {
184         return this.socket.fileHandle;
185     }
186 
187     /**************************************************************************
188 
189         I/O event handler
190 
191         Called from SelectDispatcher during event loop.
192 
193         Params:
194              event = identifier of I/O event that just occurred on the device
195 
196         Returns:
197             true if the handler should be called again on next event occurrence
198             or false if this instance should be unregistered from the
199             SelectDispatcher (this is effectively a server shutdown).
200 
201         TODO: accept() could be called in a loop in this method, in order to
202         accept as many connections as possible each time the EPOLLIN event fires
203         for the listening socket
204 
205      **************************************************************************/
206 
207     final override bool handle ( Event event )
208     {
209         if (!this.terminated)
210         {
211             try
212             {
213                 IConnectionHandler handler = this.getConnectionHandler();
214                 this.acceptConnection(handler);
215             }
216             catch (Exception)
217             {
218                 /* Catch an exception (or object) thrown by
219                    getConnectionHandler() to prevent it from falling through
220                    to the dispatcher which would unregister the server socket. */
221                 this.declineConnection();
222             }
223         }
224 
225         return !this.terminated;
226     }
227 
228     /**************************************************************************
229 
230         Closes the server socket and sets this instance to terminated mode.
231 
232         TODO: Make it possible to reopen the server socket and resume operation?
233 
234         Returns:
235             true if this instance was already in to terminated mode or false
236             otherwise
237 
238      **************************************************************************/
239 
240     final bool terminate ( )
241     {
242         if (!this.terminated)
243         {
244             this.terminated = true;
245 
246             try
247             {
248                 this.e.enforce(
249                     !this.socket.shutdown(),
250                     "error on socket shutdown"
251                 );
252             }
253             finally
254             {
255                 this.socket.close();
256             }
257             return false;
258         }
259 
260         return true;
261     }
262 
263     /**************************************************************************
264 
265         Returns:
266             information interface to the connections pool
267 
268      **************************************************************************/
269 
270     abstract IPoolInfo poolInfo ( );
271 
272     /**************************************************************************
273 
274         Sets the limit of the number of connections. 0 disables the limitation.
275 
276         Notes:
277             - If limit is set to something other than 0, limit connection
278               handler objects will be created (so set it to a realistic value).
279             - If not 0, limit must be at least the number of currently busy
280               connections.
281 
282         Returns:
283             connection limit
284 
285      **************************************************************************/
286 
287     abstract size_t connection_limit ( size_t limit ) ;
288 
289     /**************************************************************************
290 
291         Returns:
292             the limit of the number of connections or 0 if limitation is
293             disabled.
294 
295      **************************************************************************/
296 
297     public size_t connection_limit ( )
298     {
299         auto n = this.poolInfo.limit;
300 
301         return (n == n.max)? 0 : n;
302     }
303 
304     /**************************************************************************
305 
306         Closes all connections and terminates the listener.
307 
308      **************************************************************************/
309 
310     abstract public void shutdown ( );
311 
312     /**************************************************************************
313 
314         Obtains a connection handler instance from the pool.
315 
316         Returns:
317             connection handler
318 
319      **************************************************************************/
320 
321     abstract protected IConnectionHandler getConnectionHandler ( );
322 
323     /**************************************************************************
324 
325         Accepts the next pending incoming client connection and assigns it to
326         a connection handler.
327 
328         Params:
329             handler = handler to assign connection to
330 
331      **************************************************************************/
332 
333     private void acceptConnection ( IConnectionHandler handler )
334     {
335         try
336         {
337             handler.assign(this.socket);
338 
339             handler.handleConnection();
340         }
341         catch (Exception e)
342         {
343             /* Catch an exception thrown by accept() or handleConnection()
344                (or noDelay()/blocking()) to prevent it from falling through
345                to the select dispatcher which would unregister the server
346                socket.
347 
348                'Too many open files' will be caught here.
349 
350                FIXME: If noDelay() or blocking() fails, the handler will
351                incorrectly assume that the connection is not open and will
352                not close it. Is this a relevant case? */
353             handler.error(e);   // will never throw exceptions
354 
355             handler.finalize();
356         }
357     }
358 
359     /**************************************************************************
360 
361         Accepts the next pending incoming client connection and closes it.
362 
363      **************************************************************************/
364 
365     private void declineConnection ( )
366     {
367         // This is using the C binding
368         if (close(accept(this.socket.fileHandle, null, null))) // returns non-zero on failure
369         {
370             .errno = 0;
371         }
372     }
373 }
374 
375 /******************************************************************************
376 
377     SelectListener class template
378 
379     The additional T constructor argument parameters must appear after those for
380     the mandatory IConnectionHandler constructor.
381 
382     Params:
383         T    = connection handler class
384         Args = additional constructor arguments for T
385 
386     TODO: try using the non-auto ctor pool, for template simplicity!
387 
388  ******************************************************************************/
389 
390 public class SelectListener ( T : IConnectionHandler, Args ... ) : ISelectListener
391 {
392     /**************************************************************************
393 
394         ObjectPool of connection handlers
395 
396      **************************************************************************/
397 
398     private alias SelectListenerPool!(T, Args) ConnPool;
399 
400     private ConnPool receiver_pool;
401 
402     /**************************************************************************
403 
404         String buffer used for connection logging.
405 
406      **************************************************************************/
407 
408     private mstring connection_log_buf;
409 
410     /**************************************************************************
411 
412         Constructor
413 
414         Creates the server socket and registers it for incoming connections.
415 
416         Params:
417             address    = the address of the socket
418             socket     = the server socket
419             dispatcher = SelectDispatcher instance to use
420             args       = additional T constructor arguments, might be empty
421             backlog    = (see ISelectListener ctor)
422 
423      **************************************************************************/
424 
425     public this ( sockaddr* address, ISocket socket, Args args, int backlog = 32 )
426     {
427         super(address, socket, backlog);
428 
429         this.receiver_pool = new ConnPool(&this.returnToPool, args);
430     }
431 
432     /**************************************************************************
433 
434         Obtains a connection handler instance from the pool.
435 
436         Returns:
437             connection handler
438 
439      **************************************************************************/
440 
441     protected override IConnectionHandler getConnectionHandler ( )
442     {
443         return this.receiver_pool.get();
444     }
445 
446     /**************************************************************************
447 
448         Sets the limit of the number of connections. 0 disables the limitation.
449 
450         Notes:
451             - If limit is set to something other than 0, limit connection
452               handler objects will be created (so set it to a realistic value).
453             - If not 0, limit must be at least the number of currently busy
454               connections.
455 
456         Returns:
457             limit
458 
459      **************************************************************************/
460 
461     public override size_t connection_limit ( size_t limit )
462     {
463         verify(!(limit && limit < this.poolInfo.num_busy),
464                 typeof(this).stringof ~ ".connection_limit: limit already exceeded");
465 
466         if (limit)
467         {
468             this.receiver_pool.setLimit(limit);
469         }
470         else
471         {
472             this.receiver_pool.limited = false;
473         }
474 
475         return limit;
476     }
477 
478     /**************************************************************************
479 
480         (Overriding wrapper to fix method matching.)
481 
482         Returns:
483             new limit of number of connections or 0 if unlimited.
484 
485      **************************************************************************/
486 
487     public override size_t connection_limit ( )
488     {
489         return super.connection_limit;
490     }
491 
492     /**************************************************************************
493 
494         Minimizes the connection pool to n connections by deleting idle
495         connection objects. If more than n connections are currently busy,
496         all idle connections are deleted.
497 
498         Params:
499             n = minimum number of connection objects to keep in the pool.
500 
501         Returns:
502             the number of connection object in the pool after minimizing, which
503             is the greater of n and the number of currently busy connections.
504 
505      **************************************************************************/
506 
507     public size_t minimize ( uint n = 0 )
508     out (still_existent)
509     {
510         assert (still_existent >= n);
511     }
512     do
513     {
514         size_t limit = this.receiver_pool.limit,
515                busy = this.receiver_pool.num_busy;
516 
517         scope (exit) this.receiver_pool.setLimit(limit);
518 
519         return this.receiver_pool.setLimit((n > busy)? n : busy);
520     }
521 
522     /**************************************************************************
523 
524         Returns:
525             information interface to the connections pool
526 
527      **************************************************************************/
528 
529     public override ISelectListenerPoolInfo poolInfo ( )
530     {
531         return this.receiver_pool;
532     }
533 
534     /***************************************************************************
535 
536         Writes connection information to log file.
537 
538     ***************************************************************************/
539 
540     public void connectionLog ( )
541     {
542         auto conns = this.poolInfo;
543 
544         log.info("Connection pool: {} busy, {} idle", conns.num_busy,
545             conns.num_idle);
546 
547         foreach ( i, conn; conns )
548         {
549             this.connection_log_buf.length = 0;
550             sformat(this.connection_log_buf, "{}: ", i);
551 
552             conn.formatInfo(this.connection_log_buf);
553 
554             log.info(this.connection_log_buf);
555         }
556     }
557 
558     /**************************************************************************
559 
560         Closes all connections and terminates the listener.
561 
562      **************************************************************************/
563 
564     public override void shutdown ( )
565     {
566         scope busy_connections = this.receiver_pool..new BusyItemsIterator;
567         foreach ( busy_connection; busy_connections )
568         {
569             /* FIXME: calling finalize here will cause errors in any connection
570              * handlers which are currently selected in epoll, as they will
571              * subsequently attempt to finalize themselves again.
572              *
573              * In practice this is of little import however, as the whole server
574              * is being shut down. It may be nice to find a clean way to avoid
575              * this though.
576              */
577             busy_connection.finalize();
578         }
579 
580         super.terminate();
581     }
582 
583     /**************************************************************************
584 
585         Called as the finalizer of class T. Returns connection into the object
586         pool.
587 
588         Params:
589             connection = connection hander instance to return into pool
590 
591      **************************************************************************/
592 
593     private void returnToPool ( IConnectionHandler connection )
594     {
595         verify(cast (T) connection !is null,
596                 typeof(this).stringof ~ ".returnToPool: connection is null");
597 
598         debug ( ConnectionHandler )
599             log.trace("[{}]: Returning to pool", connection.connection_id);
600 
601         this.receiver_pool.recycle(cast (T) connection);
602     }
603 }
604 
605 ///
606 unittest
607 {
608     // A shared resource, owned by the server. A reference is passed to each
609     // connection handler.
610     class SomeSharedResource
611     {
612     }
613 
614     // The connection handler class. The server owns a select listener instance.
615     // The select listener owns a pool of connection handlers. When a new
616     // connection is accepted by the select listener, a connection handler is
617     // taken from the pool (or allocated, if there are no free items in the
618     // pool) and set to handle the incoming connection.
619     // (Connection handler classes must not be nested, in order for the pool to
620     // be able to automatically allocate instances. In this case, it must be
621     // declared static, as the class definition is "nested" inside a unittest.)
622     static class MyConnectionHandler : IConnectionHandler
623     {
624         import ocean.net.server.connection.IConnectionHandlerInfo;
625         import ocean.sys.socket.AddressIPSocket;
626 
627         /// Reference to the global shared resource (passed to the ctor).
628         private SomeSharedResource shared_resource;
629 
630         /// Flag set when an IO error occurs (see error()).
631         private bool error_occurred;
632 
633         /***********************************************************************
634 
635             Constructs a connection handler.
636 
637             Params:
638                 finalize_dg = delegate required by the super class.
639                     Automatically set by SelectListenerPool to be
640                     SelectListener.returnToPool. Called by the super class when
641                     finalize() is called
642                 shared_resource = reference to a global shared resource
643 
644         ***********************************************************************/
645 
646         public this ( scope FinalizeDg finalize_dg, SomeSharedResource shared_resource )
647         {
648             this.shared_resource = shared_resource;
649 
650             super(new AddressIPSocket!(), finalize_dg, &this.error);
651         }
652 
653         /// The actual logic for handling a connection.
654         override public void handleConnection ( )
655         {
656             // Do something in here.
657 
658             // When you're finished, call finalize(), which recycles this
659             // connection into the pool owned by the select listener.
660             this.finalize();
661         }
662 
663         /// Tells the super class whether an I/O error occurred.
664         override protected bool io_error ( )
665         {
666             return this.error_occurred;
667         }
668 
669         /// Called by epoll when an I/O error occurs.
670         private void error ( Exception exception, Event event,
671             IConnectionHandlerInfo info )
672         {
673             this.error_occurred = true;
674         }
675 
676         /// Unregisters the socket from the epoll before closing it. Called by
677         /// finalize.
678         override protected void unregisterSocket ()
679         {
680         }
681     }
682 
683     // Top-level server class which owns the select listener and all global
684     // resources.
685     class MyServer
686     {
687         import ocean.sys.socket.AddressIPSocket;
688         import ocean.sys.socket.InetAddress;
689         import ocean.io.select.EpollSelectDispatcher;
690 
691         /// Select listener alias.
692         private alias SelectListener!(MyConnectionHandler, SomeSharedResource)
693             MySelectListener;
694 
695         /// Select listener instance.
696         private MySelectListener listener;
697 
698         /// Epoll instance.
699         private EpollSelectDispatcher epoll;
700 
701         /// Constructs a server.
702         public this ( )
703         {
704             InetAddress!(false) addr;
705             auto socket = new AddressIPSocket!();
706             auto shared_resource = new SomeSharedResource;
707 
708             this.listener = new MySelectListener(addr("127.0.0.1", 2009),
709                 socket, shared_resource);
710 
711             this.epoll = new EpollSelectDispatcher;
712         }
713 
714         /// Starts the server.
715         public void start ( )
716         {
717             // The listener is an ISelectClient, so must be registered with
718             // epoll in order to do anything.
719             this.epoll.register(this.listener);
720 
721             // The event loop must also be running.
722             this.epoll.eventLoop();
723         }
724 
725         /// Stops the server.
726         public void stop ( )
727         {
728             this.listener.shutdown();
729         }
730     }
731 }