1 /****************************************************************************** 2 3 Server socket listener using multiplexed non-blocking socket I/O 4 5 Creates a server socket and a pool of connection handlers and registers 6 the server socket for incoming connection in a provided SelectDispatcher 7 instance. When a connection comes in, takes an IConnectionHandler instance 8 from the pool and assigns the incoming connection to the handler's socket. 9 10 Usage example: 11 see documented unittest of SelectListener 12 13 Copyright: 14 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 15 All rights reserved. 16 17 License: 18 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 19 Alternatively, this file may be distributed under the terms of the Tango 20 3-Clause BSD License (see LICENSE_BSD.txt for details). 21 22 ******************************************************************************/ 23 24 module ocean.net.server.SelectListener; 25 26 27 import ocean.transition; 28 import ocean.core.Verify; 29 30 import ocean.io.select.client.model.ISelectClient; 31 import ocean.net.server.connection.IConnectionHandler; 32 import ocean.net.server.connpool.SelectListenerPool; 33 import ocean.net.server.connpool.ISelectListenerPoolInfo; 34 35 import ocean.util.container.pool.model.IPoolInfo; 36 37 import ocean.text.convert.Formatter; 38 39 import core.stdc.errno: errno; 40 41 import ocean.sys.socket.model.ISocket; 42 43 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError; 44 45 46 import ocean.util.log.Logger; 47 48 /******************************************************************************* 49 50 Static module logger 51 52 *******************************************************************************/ 53 54 static private Logger log; 55 static this ( ) 56 { 57 log = Log.lookup("ocean.net.server.SelectListener"); 58 } 59 60 /****************************************************************************** 61 62 SelectListener base class 63 64 Contains all base functionality which is not related to the particular 65 IConnectionHandler subclass used in SelectListener. 66 67 ******************************************************************************/ 68 69 abstract class ISelectListener : ISelectClient 70 { 71 import ocean.stdc.posix.sys.socket: accept, SOL_SOCKET, SO_ERROR, 72 SO_REUSEADDR, sockaddr; 73 import ocean.stdc.posix.netinet.in_: SOCK_STREAM; 74 import core.sys.posix.unistd: close; 75 76 /************************************************************************** 77 78 Socket, memorises the address most recently passed to bind() or 79 connect() or obtained by accept(). 80 81 **************************************************************************/ 82 83 private ISocket socket; 84 85 /************************************************************************** 86 87 Termination flag; true prevents accepting new connections 88 89 **************************************************************************/ 90 91 private bool terminated = false; 92 93 /************************************************************************** 94 95 Exception instance thrown in case of socket errors. 96 97 **************************************************************************/ 98 99 private SocketError e; 100 101 /************************************************************************** 102 103 Constructor 104 105 Creates the server socket -- a streaming socket of the family (aka 106 "domain") specified in `address` and the protocol according to 107 `protocol` -- and registers it for incoming connections. 108 109 `address.sa_family` and `protocol` are passed to `socket(2)` together 110 with the `SOCK_STREAM` type, so the socket family defined by 111 `address.sa_family` is required to support streaming, and `protocol` 112 needs to be a streaming protocol. Socket families supporting streaming 113 include IPv4/IPv6 and UNIX domain sockets (`AF_LOCAL`). 114 `protocol == 0` makes `socket(2)` pick the default streaming protocol 115 for `address.sa_family`. For IPv4/IPv6 the default protocol is TCP. 116 UNIX domain sockets and some other families support only one streaming 117 protocol so for those the default is unambiguous. 118 119 Standards: 120 Posix = http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html 121 Linux = http://linux.die.net/man/2/socket 122 123 Params: 124 address = the socket address and family, must support streaming 125 socket = the server socket 126 backlog = the maximum length to which the queue of pending 127 connections for sockfd may grow. If a connection request arrives 128 when the queue is full, the client may receive an error with an 129 indication of ECONNREFUSED or, if the underlying protocol 130 supports retransmission, the request may be ignored so that a 131 later reattempt at connection succeeds. 132 (from http://linux.die.net/man/2/listen) 133 protocol = the socket protocol, for a streaming socket of the 134 family specified in address, or 0 to use the default protocol 135 for a streaming socket of the specified family 136 137 **************************************************************************/ 138 139 protected this ( sockaddr* address, ISocket socket, int backlog = 32, 140 int protocol= 0) 141 { 142 this.socket = socket; 143 144 this.e = new SocketError(this.socket); 145 146 // SOCK_NONBLOCK is a Linux extension, which can be combined with the 147 // actual second argument (SOCK_STREAM) in order to mark the 148 // I/O operations to the socket as non-blocking. 149 this.e.enforce( 150 this.socket.socket(address.sa_family, 151 SOCK_STREAM | SocketFlags.SOCK_NONBLOCK, 152 protocol) >= 0, 153 "error creating socket" 154 ); 155 156 this.e.enforce( 157 !this.socket.setsockoptVal(SOL_SOCKET, SO_REUSEADDR, true), 158 "error enabling reuse of address" 159 ); 160 161 this.e.assertExSock(!this.socket.bind(address), 162 "error binding socket", __FILE__, __LINE__); 163 164 this.e.assertExSock(!this.socket.listen(backlog), 165 "error listening on socket", __FILE__, __LINE__); 166 } 167 168 /************************************************************************** 169 170 Implements ISelectClient abstract method. 171 172 Returns: 173 events to register the conduit for. 174 175 **************************************************************************/ 176 177 public override Event events ( ) 178 { 179 return Event.EPOLLIN; 180 } 181 182 /************************************************************************** 183 184 Implements ISelectClient abstract method. 185 186 Returns: 187 conduit's OS file handle (fd) 188 189 **************************************************************************/ 190 191 public override Handle fileHandle ( ) 192 { 193 return this.socket.fileHandle; 194 } 195 196 /************************************************************************** 197 198 I/O event handler 199 200 Called from SelectDispatcher during event loop. 201 202 Params: 203 event = identifier of I/O event that just occurred on the device 204 205 Returns: 206 true if the handler should be called again on next event occurrence 207 or false if this instance should be unregistered from the 208 SelectDispatcher (this is effectively a server shutdown). 209 210 TODO: accept() could be called in a loop in this method, in order to 211 accept as many connections as possible each time the EPOLLIN event fires 212 for the listening socket 213 214 **************************************************************************/ 215 216 final override bool handle ( Event event ) 217 { 218 if (!this.terminated) 219 { 220 try 221 { 222 IConnectionHandler handler = this.getConnectionHandler(); 223 this.acceptConnection(handler); 224 } 225 catch (Exception) 226 { 227 /* Catch an exception (or object) thrown by 228 getConnectionHandler() to prevent it from falling through 229 to the dispatcher which would unregister the server socket. */ 230 this.declineConnection(); 231 } 232 } 233 234 return !this.terminated; 235 } 236 237 /************************************************************************** 238 239 Closes the server socket and sets this instance to terminated mode. 240 241 TODO: Make it possible to reopen the server socket and resume operation? 242 243 Returns: 244 true if this instance was already in to terminated mode or false 245 otherwise 246 247 **************************************************************************/ 248 249 final bool terminate ( ) 250 { 251 if (!this.terminated) 252 { 253 this.terminated = true; 254 255 try 256 { 257 this.e.enforce( 258 !this.socket.shutdown(), 259 "error on socket shutdown" 260 ); 261 } 262 finally 263 { 264 this.socket.close(); 265 } 266 return false; 267 } 268 269 return true; 270 } 271 272 /************************************************************************** 273 274 Returns: 275 information interface to the connections pool 276 277 **************************************************************************/ 278 279 abstract IPoolInfo poolInfo ( ); 280 281 /************************************************************************** 282 283 Sets the limit of the number of connections. 0 disables the limitation. 284 285 Notes: 286 - If limit is set to something other than 0, limit connection 287 handler objects will be created (so set it to a realistic value). 288 - If not 0, limit must be at least the number of currently busy 289 connections. 290 291 Returns: 292 connection limit 293 294 **************************************************************************/ 295 296 abstract size_t connection_limit ( size_t limit ) ; 297 298 /************************************************************************** 299 300 Returns: 301 the limit of the number of connections or 0 if limitation is 302 disabled. 303 304 **************************************************************************/ 305 306 public size_t connection_limit ( ) 307 { 308 auto n = this.poolInfo.limit; 309 310 return (n == n.max)? 0 : n; 311 } 312 313 /************************************************************************** 314 315 Closes all connections and terminates the listener. 316 317 **************************************************************************/ 318 319 abstract public void shutdown ( ); 320 321 /************************************************************************** 322 323 Obtains a connection handler instance from the pool. 324 325 Returns: 326 connection handler 327 328 **************************************************************************/ 329 330 abstract protected IConnectionHandler getConnectionHandler ( ); 331 332 /************************************************************************** 333 334 Accepts the next pending incoming client connection and assigns it to 335 a connection handler. 336 337 Params: 338 handler = handler to assign connection to 339 340 **************************************************************************/ 341 342 private void acceptConnection ( IConnectionHandler handler ) 343 { 344 try 345 { 346 handler.assign(this.socket); 347 348 handler.handleConnection(); 349 } 350 catch (Exception e) 351 { 352 /* Catch an exception thrown by accept() or handleConnection() 353 (or noDelay()/blocking()) to prevent it from falling through 354 to the select dispatcher which would unregister the server 355 socket. 356 357 'Too many open files' will be caught here. 358 359 FIXME: If noDelay() or blocking() fails, the handler will 360 incorrectly assume that the connection is not open and will 361 not close it. Is this a relevant case? */ 362 handler.error(e); // will never throw exceptions 363 364 handler.finalize(); 365 } 366 } 367 368 /************************************************************************** 369 370 Accepts the next pending incoming client connection and closes it. 371 372 **************************************************************************/ 373 374 private void declineConnection ( ) 375 { 376 // This is using the C binding 377 if (close(accept(this.socket.fileHandle, null, null))) // returns non-zero on failure 378 { 379 .errno = 0; 380 } 381 } 382 } 383 384 /****************************************************************************** 385 386 SelectListener class template 387 388 The additional T constructor argument parameters must appear after those for 389 the mandatory IConnectionHandler constructor. 390 391 Params: 392 T = connection handler class 393 Args = additional constructor arguments for T 394 395 TODO: try using the non-auto ctor pool, for template simplicity! 396 397 ******************************************************************************/ 398 399 public class SelectListener ( T : IConnectionHandler, Args ... ) : ISelectListener 400 { 401 /************************************************************************** 402 403 ObjectPool of connection handlers 404 405 **************************************************************************/ 406 407 private alias SelectListenerPool!(T, Args) ConnPool; 408 409 private ConnPool receiver_pool; 410 411 /************************************************************************** 412 413 String buffer used for connection logging. 414 415 **************************************************************************/ 416 417 private mstring connection_log_buf; 418 419 /************************************************************************** 420 421 Constructor 422 423 Creates the server socket and registers it for incoming connections. 424 425 Params: 426 address = the address of the socket 427 socket = the server socket 428 dispatcher = SelectDispatcher instance to use 429 args = additional T constructor arguments, might be empty 430 backlog = (see ISelectListener ctor) 431 432 **************************************************************************/ 433 434 public this ( sockaddr* address, ISocket socket, Args args, int backlog = 32 ) 435 { 436 super(address, socket, backlog); 437 438 this.receiver_pool = new ConnPool(&this.returnToPool, args); 439 } 440 441 /************************************************************************** 442 443 Obtains a connection handler instance from the pool. 444 445 Returns: 446 connection handler 447 448 **************************************************************************/ 449 450 protected override IConnectionHandler getConnectionHandler ( ) 451 { 452 return this.receiver_pool.get(); 453 } 454 455 /************************************************************************** 456 457 Sets the limit of the number of connections. 0 disables the limitation. 458 459 Notes: 460 - If limit is set to something other than 0, limit connection 461 handler objects will be created (so set it to a realistic value). 462 - If not 0, limit must be at least the number of currently busy 463 connections. 464 465 Returns: 466 limit 467 468 **************************************************************************/ 469 470 public override size_t connection_limit ( size_t limit ) 471 { 472 verify(!(limit && limit < this.poolInfo.num_busy), 473 typeof(this).stringof ~ ".connection_limit: limit already exceeded"); 474 475 if (limit) 476 { 477 this.receiver_pool.setLimit(limit); 478 } 479 else 480 { 481 this.receiver_pool.limited = false; 482 } 483 484 return limit; 485 } 486 487 /************************************************************************** 488 489 (Overriding wrapper to fix method matching.) 490 491 Returns: 492 new limit of number of connections or 0 if unlimited. 493 494 **************************************************************************/ 495 496 public override size_t connection_limit ( ) 497 { 498 return super.connection_limit; 499 } 500 501 /************************************************************************** 502 503 Minimizes the connection pool to n connections by deleting idle 504 connection objects. If more than n connections are currently busy, 505 all idle connections are deleted. 506 507 Params: 508 n = minimum number of connection objects to keep in the pool. 509 510 Returns: 511 the number of connection object in the pool after minimizing, which 512 is the greater of n and the number of currently busy connections. 513 514 **************************************************************************/ 515 516 public size_t minimize ( uint n = 0 ) 517 out (still_existent) 518 { 519 assert (still_existent >= n); 520 } 521 body 522 { 523 size_t limit = this.receiver_pool.limit, 524 busy = this.receiver_pool.num_busy; 525 526 scope (exit) this.receiver_pool.setLimit(limit); 527 528 return this.receiver_pool.setLimit((n > busy)? n : busy); 529 } 530 531 /************************************************************************** 532 533 Returns: 534 information interface to the connections pool 535 536 **************************************************************************/ 537 538 public override ISelectListenerPoolInfo poolInfo ( ) 539 { 540 return this.receiver_pool; 541 } 542 543 /*************************************************************************** 544 545 Writes connection information to log file. 546 547 ***************************************************************************/ 548 549 public void connectionLog ( ) 550 { 551 auto conns = this.poolInfo; 552 553 log.info("Connection pool: {} busy, {} idle", conns.num_busy, 554 conns.num_idle); 555 556 foreach ( i, conn; conns ) 557 { 558 this.connection_log_buf.length = 0; 559 sformat(this.connection_log_buf, "{}: ", i); 560 561 conn.formatInfo(this.connection_log_buf); 562 563 log.info(this.connection_log_buf); 564 } 565 } 566 567 /************************************************************************** 568 569 Closes all connections and terminates the listener. 570 571 **************************************************************************/ 572 573 public override void shutdown ( ) 574 { 575 scope busy_connections = this.receiver_pool..new BusyItemsIterator; 576 foreach ( busy_connection; busy_connections ) 577 { 578 /* FIXME: calling finalize here will cause errors in any connection 579 * handlers which are currently selected in epoll, as they will 580 * subsequently attempt to finalize themselves again. 581 * 582 * In practice this is of little import however, as the whole server 583 * is being shut down. It may be nice to find a clean way to avoid 584 * this though. 585 */ 586 busy_connection.finalize(); 587 } 588 589 super.terminate(); 590 } 591 592 /************************************************************************** 593 594 Called as the finalizer of class T. Returns connection into the object 595 pool. 596 597 Params: 598 connection = connection hander instance to return into pool 599 600 **************************************************************************/ 601 602 private void returnToPool ( IConnectionHandler connection ) 603 { 604 verify(cast (T) connection !is null, 605 typeof(this).stringof ~ ".returnToPool: connection is null"); 606 607 debug ( ConnectionHandler ) 608 log.trace("[{}]: Returning to pool", connection.connection_id); 609 610 this.receiver_pool.recycle(cast (T) connection); 611 } 612 } 613 614 /// 615 unittest 616 { 617 // A shared resource, owned by the server. A reference is passed to each 618 // connection handler. 619 class SomeSharedResource 620 { 621 } 622 623 // The connection handler class. The server owns a select listener instance. 624 // The select listener owns a pool of connection handlers. When a new 625 // connection is accepted by the select listener, a connection handler is 626 // taken from the pool (or allocated, if there are no free items in the 627 // pool) and set to handle the incoming connection. 628 // (Connection handler classes must not be nested, in order for the pool to 629 // be able to automatically allocate instances. In this case, it must be 630 // declared static, as the class definition is "nested" inside a unittest.) 631 static class MyConnectionHandler : IConnectionHandler 632 { 633 import ocean.net.server.connection.IConnectionHandlerInfo; 634 import ocean.sys.socket.AddressIPSocket; 635 636 /// Reference to the global shared resource (passed to the ctor). 637 private SomeSharedResource shared_resource; 638 639 /// Flag set when an IO error occurs (see error()). 640 private bool error_occurred; 641 642 /*********************************************************************** 643 644 Constructs a connection handler. 645 646 Params: 647 finalize_dg = delegate required by the super class. 648 Automatically set by SelectListenerPool to be 649 SelectListener.returnToPool. Called by the super class when 650 finalize() is called 651 shared_resource = reference to a global shared resource 652 653 ***********************************************************************/ 654 655 public this ( scope FinalizeDg finalize_dg, SomeSharedResource shared_resource ) 656 { 657 this.shared_resource = shared_resource; 658 659 super(new AddressIPSocket!(), finalize_dg, &this.error); 660 } 661 662 /// The actual logic for handling a connection. 663 override public void handleConnection ( ) 664 { 665 // Do something in here. 666 667 // When you're finished, call finalize(), which recycles this 668 // connection into the pool owned by the select listener. 669 this.finalize(); 670 } 671 672 /// Tells the super class whether an I/O error occurred. 673 override protected bool io_error ( ) 674 { 675 return this.error_occurred; 676 } 677 678 /// Called by epoll when an I/O error occurs. 679 private void error ( Exception exception, Event event, 680 IConnectionHandlerInfo info ) 681 { 682 this.error_occurred = true; 683 } 684 685 /// Unregisters the socket from the epoll before closing it. Called by 686 /// finalize. 687 override protected void unregisterSocket () 688 { 689 } 690 } 691 692 // Top-level server class which owns the select listener and all global 693 // resources. 694 class MyServer 695 { 696 import ocean.sys.socket.AddressIPSocket; 697 import ocean.sys.socket.InetAddress; 698 import ocean.io.select.EpollSelectDispatcher; 699 700 /// Select listener alias. 701 private alias SelectListener!(MyConnectionHandler, SomeSharedResource) 702 MySelectListener; 703 704 /// Select listener instance. 705 private MySelectListener listener; 706 707 /// Epoll instance. 708 private EpollSelectDispatcher epoll; 709 710 /// Constructs a server. 711 public this ( ) 712 { 713 InetAddress!(false) addr; 714 auto socket = new AddressIPSocket!(); 715 auto shared_resource = new SomeSharedResource; 716 717 this.listener = new MySelectListener(addr("127.0.0.1", 2009), 718 socket, shared_resource); 719 720 this.epoll = new EpollSelectDispatcher; 721 } 722 723 /// Starts the server. 724 public void start ( ) 725 { 726 // The listener is an ISelectClient, so must be registered with 727 // epoll in order to do anything. 728 this.epoll.register(this.listener); 729 730 // The event loop must also be running. 731 this.epoll.eventLoop(); 732 } 733 734 /// Stops the server. 735 public void stop ( ) 736 { 737 this.listener.shutdown(); 738 } 739 } 740 }