1 /******************************************************************************
2 
3     Server socket listener using multiplexed non-blocking socket I/O
4 
5     Creates a server socket and a pool of connection handlers and registers
6     the server socket for incoming connection in a provided SelectDispatcher
7     instance. When a connection comes in, takes an IConnectionHandler instance
8     from the pool and assigns the incoming connection to the handler's socket.
9 
10     Usage example:
11         see documented unittest of SelectListener
12 
13     Copyright:
14         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
15         All rights reserved.
16 
17     License:
18         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
19         Alternatively, this file may be distributed under the terms of the Tango
20         3-Clause BSD License (see LICENSE_BSD.txt for details).
21 
22  ******************************************************************************/
23 
24 module ocean.net.server.SelectListener;
25 
26 
27 import ocean.transition;
28 import ocean.core.Verify;
29 
30 import ocean.io.select.client.model.ISelectClient;
31 import ocean.net.server.connection.IConnectionHandler;
32 import ocean.net.server.connpool.SelectListenerPool;
33 import ocean.net.server.connpool.ISelectListenerPoolInfo;
34 
35 import ocean.util.container.pool.model.IPoolInfo;
36 
37 import ocean.text.convert.Formatter;
38 
39 import core.stdc.errno:            errno;
40 
41 import ocean.sys.socket.model.ISocket;
42 
43 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
44 
45 
46 import ocean.util.log.Logger;
47 
48 /*******************************************************************************
49 
50     Static module logger
51 
52 *******************************************************************************/
53 
54 static private Logger log;
55 static this ( )
56 {
57     log = Log.lookup("ocean.net.server.SelectListener");
58 }
59 
60 /******************************************************************************
61 
62     SelectListener base class
63 
64     Contains all base functionality which is not related to the particular
65     IConnectionHandler subclass used in SelectListener.
66 
67  ******************************************************************************/
68 
69 abstract class ISelectListener : ISelectClient
70 {
71     import ocean.stdc.posix.sys.socket: accept, SOL_SOCKET, SO_ERROR,
72                                         SO_REUSEADDR, sockaddr;
73     import ocean.stdc.posix.netinet.in_: SOCK_STREAM;
74     import core.sys.posix.unistd:     close;
75 
76     /**************************************************************************
77 
78         Socket, memorises the address most recently passed to bind() or
79         connect() or obtained by accept().
80 
81      **************************************************************************/
82 
83     private ISocket socket;
84 
85     /**************************************************************************
86 
87         Termination flag; true prevents accepting new connections
88 
89      **************************************************************************/
90 
91     private bool terminated = false;
92 
93     /**************************************************************************
94 
95         Exception instance thrown in case of socket errors.
96 
97      **************************************************************************/
98 
99     private SocketError e;
100 
101     /**************************************************************************
102 
103         Constructor
104 
105         Creates the server socket -- a streaming socket of the family (aka
106         "domain") specified in `address` and the protocol according to
107         `protocol` -- and registers it for incoming connections.
108 
109         `address.sa_family` and `protocol` are passed to `socket(2)` together
110         with the `SOCK_STREAM` type, so the socket family defined by
111         `address.sa_family` is required to support streaming, and `protocol`
112         needs to be a streaming protocol. Socket families supporting streaming
113         include IPv4/IPv6 and UNIX domain sockets (`AF_LOCAL`).
114         `protocol == 0` makes `socket(2)` pick the default streaming protocol
115         for `address.sa_family`. For IPv4/IPv6 the default protocol is TCP.
116         UNIX domain sockets and some other families support only one streaming
117         protocol so for those the default is unambiguous.
118 
119         Standards:
120         Posix = http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html
121         Linux = http://linux.die.net/man/2/socket
122 
123         Params:
124             address    = the socket address and family, must support streaming
125             socket     = the server socket
126             backlog    = the maximum length to which the queue of pending
127                 connections for sockfd may grow. If a connection request arrives
128                 when the queue is full, the client may receive an error with an
129                 indication of ECONNREFUSED or, if the underlying protocol
130                 supports retransmission, the request may be ignored so that a
131                 later reattempt at connection succeeds.
132                 (from http://linux.die.net/man/2/listen)
133             protocol   = the socket protocol, for a streaming socket of the
134                 family specified in address, or 0 to use the default protocol
135                 for a streaming socket of the specified family
136 
137      **************************************************************************/
138 
139     protected this ( sockaddr* address, ISocket socket, int backlog = 32,
140         int protocol= 0)
141     {
142         this.socket = socket;
143 
144         this.e = new SocketError(this.socket);
145 
146         // SOCK_NONBLOCK is a Linux extension, which can be combined with the
147         // actual second argument (SOCK_STREAM) in order to mark the
148         // I/O operations to the socket as non-blocking.
149         this.e.enforce(
150             this.socket.socket(address.sa_family,
151                 SOCK_STREAM | SocketFlags.SOCK_NONBLOCK,
152                 protocol) >= 0,
153             "error creating socket"
154         );
155 
156         this.e.enforce(
157             !this.socket.setsockoptVal(SOL_SOCKET, SO_REUSEADDR, true),
158             "error enabling reuse of address"
159         );
160 
161         this.e.assertExSock(!this.socket.bind(address),
162                             "error binding socket", __FILE__, __LINE__);
163 
164         this.e.assertExSock(!this.socket.listen(backlog),
165                             "error listening on socket", __FILE__, __LINE__);
166     }
167 
168     /**************************************************************************
169 
170         Implements ISelectClient abstract method.
171 
172         Returns:
173             events to register the conduit for.
174 
175      **************************************************************************/
176 
177     public override Event events ( )
178     {
179         return Event.EPOLLIN;
180     }
181 
182     /**************************************************************************
183 
184         Implements ISelectClient abstract method.
185 
186         Returns:
187             conduit's OS file handle (fd)
188 
189      **************************************************************************/
190 
191     public override Handle fileHandle ( )
192     {
193         return this.socket.fileHandle;
194     }
195 
196     /**************************************************************************
197 
198         I/O event handler
199 
200         Called from SelectDispatcher during event loop.
201 
202         Params:
203              event = identifier of I/O event that just occurred on the device
204 
205         Returns:
206             true if the handler should be called again on next event occurrence
207             or false if this instance should be unregistered from the
208             SelectDispatcher (this is effectively a server shutdown).
209 
210         TODO: accept() could be called in a loop in this method, in order to
211         accept as many connections as possible each time the EPOLLIN event fires
212         for the listening socket
213 
214      **************************************************************************/
215 
216     final override bool handle ( Event event )
217     {
218         if (!this.terminated)
219         {
220             try
221             {
222                 IConnectionHandler handler = this.getConnectionHandler();
223                 this.acceptConnection(handler);
224             }
225             catch (Exception)
226             {
227                 /* Catch an exception (or object) thrown by
228                    getConnectionHandler() to prevent it from falling through
229                    to the dispatcher which would unregister the server socket. */
230                 this.declineConnection();
231             }
232         }
233 
234         return !this.terminated;
235     }
236 
237     /**************************************************************************
238 
239         Closes the server socket and sets this instance to terminated mode.
240 
241         TODO: Make it possible to reopen the server socket and resume operation?
242 
243         Returns:
244             true if this instance was already in to terminated mode or false
245             otherwise
246 
247      **************************************************************************/
248 
249     final bool terminate ( )
250     {
251         if (!this.terminated)
252         {
253             this.terminated = true;
254 
255             try
256             {
257                 this.e.enforce(
258                     !this.socket.shutdown(),
259                     "error on socket shutdown"
260                 );
261             }
262             finally
263             {
264                 this.socket.close();
265             }
266             return false;
267         }
268 
269         return true;
270     }
271 
272     /**************************************************************************
273 
274         Returns:
275             information interface to the connections pool
276 
277      **************************************************************************/
278 
279     abstract IPoolInfo poolInfo ( );
280 
281     /**************************************************************************
282 
283         Sets the limit of the number of connections. 0 disables the limitation.
284 
285         Notes:
286             - If limit is set to something other than 0, limit connection
287               handler objects will be created (so set it to a realistic value).
288             - If not 0, limit must be at least the number of currently busy
289               connections.
290 
291         Returns:
292             connection limit
293 
294      **************************************************************************/
295 
296     abstract size_t connection_limit ( size_t limit ) ;
297 
298     /**************************************************************************
299 
300         Returns:
301             the limit of the number of connections or 0 if limitation is
302             disabled.
303 
304      **************************************************************************/
305 
306     public size_t connection_limit ( )
307     {
308         auto n = this.poolInfo.limit;
309 
310         return (n == n.max)? 0 : n;
311     }
312 
313     /**************************************************************************
314 
315         Closes all connections and terminates the listener.
316 
317      **************************************************************************/
318 
319     abstract public void shutdown ( );
320 
321     /**************************************************************************
322 
323         Obtains a connection handler instance from the pool.
324 
325         Returns:
326             connection handler
327 
328      **************************************************************************/
329 
330     abstract protected IConnectionHandler getConnectionHandler ( );
331 
332     /**************************************************************************
333 
334         Accepts the next pending incoming client connection and assigns it to
335         a connection handler.
336 
337         Params:
338             handler = handler to assign connection to
339 
340      **************************************************************************/
341 
342     private void acceptConnection ( IConnectionHandler handler )
343     {
344         try
345         {
346             handler.assign(this.socket);
347 
348             handler.handleConnection();
349         }
350         catch (Exception e)
351         {
352             /* Catch an exception thrown by accept() or handleConnection()
353                (or noDelay()/blocking()) to prevent it from falling through
354                to the select dispatcher which would unregister the server
355                socket.
356 
357                'Too many open files' will be caught here.
358 
359                FIXME: If noDelay() or blocking() fails, the handler will
360                incorrectly assume that the connection is not open and will
361                not close it. Is this a relevant case? */
362             handler.error(e);   // will never throw exceptions
363 
364             handler.finalize();
365         }
366     }
367 
368     /**************************************************************************
369 
370         Accepts the next pending incoming client connection and closes it.
371 
372      **************************************************************************/
373 
374     private void declineConnection ( )
375     {
376         // This is using the C binding
377         if (close(accept(this.socket.fileHandle, null, null))) // returns non-zero on failure
378         {
379             .errno = 0;
380         }
381     }
382 }
383 
384 /******************************************************************************
385 
386     SelectListener class template
387 
388     The additional T constructor argument parameters must appear after those for
389     the mandatory IConnectionHandler constructor.
390 
391     Params:
392         T    = connection handler class
393         Args = additional constructor arguments for T
394 
395     TODO: try using the non-auto ctor pool, for template simplicity!
396 
397  ******************************************************************************/
398 
399 public class SelectListener ( T : IConnectionHandler, Args ... ) : ISelectListener
400 {
401     /**************************************************************************
402 
403         ObjectPool of connection handlers
404 
405      **************************************************************************/
406 
407     private alias SelectListenerPool!(T, Args) ConnPool;
408 
409     private ConnPool receiver_pool;
410 
411     /**************************************************************************
412 
413         String buffer used for connection logging.
414 
415      **************************************************************************/
416 
417     private mstring connection_log_buf;
418 
419     /**************************************************************************
420 
421         Constructor
422 
423         Creates the server socket and registers it for incoming connections.
424 
425         Params:
426             address    = the address of the socket
427             socket     = the server socket
428             dispatcher = SelectDispatcher instance to use
429             args       = additional T constructor arguments, might be empty
430             backlog    = (see ISelectListener ctor)
431 
432      **************************************************************************/
433 
434     public this ( sockaddr* address, ISocket socket, Args args, int backlog = 32 )
435     {
436         super(address, socket, backlog);
437 
438         this.receiver_pool = new ConnPool(&this.returnToPool, args);
439     }
440 
441     /**************************************************************************
442 
443         Obtains a connection handler instance from the pool.
444 
445         Returns:
446             connection handler
447 
448      **************************************************************************/
449 
450     protected override IConnectionHandler getConnectionHandler ( )
451     {
452         return this.receiver_pool.get();
453     }
454 
455     /**************************************************************************
456 
457         Sets the limit of the number of connections. 0 disables the limitation.
458 
459         Notes:
460             - If limit is set to something other than 0, limit connection
461               handler objects will be created (so set it to a realistic value).
462             - If not 0, limit must be at least the number of currently busy
463               connections.
464 
465         Returns:
466             limit
467 
468      **************************************************************************/
469 
470     public override size_t connection_limit ( size_t limit )
471     {
472         verify(!(limit && limit < this.poolInfo.num_busy),
473                 typeof(this).stringof ~ ".connection_limit: limit already exceeded");
474 
475         if (limit)
476         {
477             this.receiver_pool.setLimit(limit);
478         }
479         else
480         {
481             this.receiver_pool.limited = false;
482         }
483 
484         return limit;
485     }
486 
487     /**************************************************************************
488 
489         (Overriding wrapper to fix method matching.)
490 
491         Returns:
492             new limit of number of connections or 0 if unlimited.
493 
494      **************************************************************************/
495 
496     public override size_t connection_limit ( )
497     {
498         return super.connection_limit;
499     }
500 
501     /**************************************************************************
502 
503         Minimizes the connection pool to n connections by deleting idle
504         connection objects. If more than n connections are currently busy,
505         all idle connections are deleted.
506 
507         Params:
508             n = minimum number of connection objects to keep in the pool.
509 
510         Returns:
511             the number of connection object in the pool after minimizing, which
512             is the greater of n and the number of currently busy connections.
513 
514      **************************************************************************/
515 
516     public size_t minimize ( uint n = 0 )
517     out (still_existent)
518     {
519         assert (still_existent >= n);
520     }
521     body
522     {
523         size_t limit = this.receiver_pool.limit,
524                busy = this.receiver_pool.num_busy;
525 
526         scope (exit) this.receiver_pool.setLimit(limit);
527 
528         return this.receiver_pool.setLimit((n > busy)? n : busy);
529     }
530 
531     /**************************************************************************
532 
533         Returns:
534             information interface to the connections pool
535 
536      **************************************************************************/
537 
538     public override ISelectListenerPoolInfo poolInfo ( )
539     {
540         return this.receiver_pool;
541     }
542 
543     /***************************************************************************
544 
545         Writes connection information to log file.
546 
547     ***************************************************************************/
548 
549     public void connectionLog ( )
550     {
551         auto conns = this.poolInfo;
552 
553         log.info("Connection pool: {} busy, {} idle", conns.num_busy,
554             conns.num_idle);
555 
556         foreach ( i, conn; conns )
557         {
558             this.connection_log_buf.length = 0;
559             sformat(this.connection_log_buf, "{}: ", i);
560 
561             conn.formatInfo(this.connection_log_buf);
562 
563             log.info(this.connection_log_buf);
564         }
565     }
566 
567     /**************************************************************************
568 
569         Closes all connections and terminates the listener.
570 
571      **************************************************************************/
572 
573     public override void shutdown ( )
574     {
575         scope busy_connections = this.receiver_pool..new BusyItemsIterator;
576         foreach ( busy_connection; busy_connections )
577         {
578             /* FIXME: calling finalize here will cause errors in any connection
579              * handlers which are currently selected in epoll, as they will
580              * subsequently attempt to finalize themselves again.
581              *
582              * In practice this is of little import however, as the whole server
583              * is being shut down. It may be nice to find a clean way to avoid
584              * this though.
585              */
586             busy_connection.finalize();
587         }
588 
589         super.terminate();
590     }
591 
592     /**************************************************************************
593 
594         Called as the finalizer of class T. Returns connection into the object
595         pool.
596 
597         Params:
598             connection = connection hander instance to return into pool
599 
600      **************************************************************************/
601 
602     private void returnToPool ( IConnectionHandler connection )
603     {
604         verify(cast (T) connection !is null,
605                 typeof(this).stringof ~ ".returnToPool: connection is null");
606 
607         debug ( ConnectionHandler )
608             log.trace("[{}]: Returning to pool", connection.connection_id);
609 
610         this.receiver_pool.recycle(cast (T) connection);
611     }
612 }
613 
614 ///
615 unittest
616 {
617     // A shared resource, owned by the server. A reference is passed to each
618     // connection handler.
619     class SomeSharedResource
620     {
621     }
622 
623     // The connection handler class. The server owns a select listener instance.
624     // The select listener owns a pool of connection handlers. When a new
625     // connection is accepted by the select listener, a connection handler is
626     // taken from the pool (or allocated, if there are no free items in the
627     // pool) and set to handle the incoming connection.
628     // (Connection handler classes must not be nested, in order for the pool to
629     // be able to automatically allocate instances. In this case, it must be
630     // declared static, as the class definition is "nested" inside a unittest.)
631     static class MyConnectionHandler : IConnectionHandler
632     {
633         import ocean.net.server.connection.IConnectionHandlerInfo;
634         import ocean.sys.socket.AddressIPSocket;
635 
636         /// Reference to the global shared resource (passed to the ctor).
637         private SomeSharedResource shared_resource;
638 
639         /// Flag set when an IO error occurs (see error()).
640         private bool error_occurred;
641 
642         /***********************************************************************
643 
644             Constructs a connection handler.
645 
646             Params:
647                 finalize_dg = delegate required by the super class.
648                     Automatically set by SelectListenerPool to be
649                     SelectListener.returnToPool. Called by the super class when
650                     finalize() is called
651                 shared_resource = reference to a global shared resource
652 
653         ***********************************************************************/
654 
655         public this ( scope FinalizeDg finalize_dg, SomeSharedResource shared_resource )
656         {
657             this.shared_resource = shared_resource;
658 
659             super(new AddressIPSocket!(), finalize_dg, &this.error);
660         }
661 
662         /// The actual logic for handling a connection.
663         override public void handleConnection ( )
664         {
665             // Do something in here.
666 
667             // When you're finished, call finalize(), which recycles this
668             // connection into the pool owned by the select listener.
669             this.finalize();
670         }
671 
672         /// Tells the super class whether an I/O error occurred.
673         override protected bool io_error ( )
674         {
675             return this.error_occurred;
676         }
677 
678         /// Called by epoll when an I/O error occurs.
679         private void error ( Exception exception, Event event,
680             IConnectionHandlerInfo info )
681         {
682             this.error_occurred = true;
683         }
684 
685         /// Unregisters the socket from the epoll before closing it. Called by
686         /// finalize.
687         override protected void unregisterSocket ()
688         {
689         }
690     }
691 
692     // Top-level server class which owns the select listener and all global
693     // resources.
694     class MyServer
695     {
696         import ocean.sys.socket.AddressIPSocket;
697         import ocean.sys.socket.InetAddress;
698         import ocean.io.select.EpollSelectDispatcher;
699 
700         /// Select listener alias.
701         private alias SelectListener!(MyConnectionHandler, SomeSharedResource)
702             MySelectListener;
703 
704         /// Select listener instance.
705         private MySelectListener listener;
706 
707         /// Epoll instance.
708         private EpollSelectDispatcher epoll;
709 
710         /// Constructs a server.
711         public this ( )
712         {
713             InetAddress!(false) addr;
714             auto socket = new AddressIPSocket!();
715             auto shared_resource = new SomeSharedResource;
716 
717             this.listener = new MySelectListener(addr("127.0.0.1", 2009),
718                 socket, shared_resource);
719 
720             this.epoll = new EpollSelectDispatcher;
721         }
722 
723         /// Starts the server.
724         public void start ( )
725         {
726             // The listener is an ISelectClient, so must be registered with
727             // epoll in order to do anything.
728             this.epoll.register(this.listener);
729 
730             // The event loop must also be running.
731             this.epoll.eventLoop();
732         }
733 
734         /// Stops the server.
735         public void stop ( )
736         {
737             this.listener.shutdown();
738         }
739     }
740 }