1 /****************************************************************************** 2 3 Server socket listener using multiplexed non-blocking socket I/O 4 5 Creates a server socket and a pool of connection handlers and registers 6 the server socket for incoming connection in a provided SelectDispatcher 7 instance. When a connection comes in, takes an IConnectionHandler instance 8 from the pool and assigns the incoming connection to the handler's socket. 9 10 Usage example: 11 see documented unittest of SelectListener 12 13 Copyright: 14 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 15 All rights reserved. 16 17 License: 18 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 19 Alternatively, this file may be distributed under the terms of the Tango 20 3-Clause BSD License (see LICENSE_BSD.txt for details). 21 22 ******************************************************************************/ 23 24 module ocean.net.server.SelectListener; 25 26 27 import ocean.meta.types.Qualifiers; 28 import ocean.core.Verify; 29 import ocean.io.select.client.model.ISelectClient; 30 import ocean.net.server.connection.IConnectionHandler; 31 import ocean.net.server.connpool.SelectListenerPool; 32 import ocean.net.server.connpool.ISelectListenerPoolInfo; 33 import ocean.util.container.pool.model.IPoolInfo; 34 import ocean.text.convert.Formatter; 35 import ocean.sys.socket.model.ISocket; 36 import ocean.io.select.protocol.generic.ErrnoIOException: SocketError; 37 import ocean.util.log.Logger; 38 import core.stdc.errno: errno; 39 40 /******************************************************************************* 41 42 Static module logger 43 44 *******************************************************************************/ 45 46 private Logger log; 47 static this ( ) 48 { 49 log = Log.lookup("ocean.net.server.SelectListener"); 50 } 51 52 /****************************************************************************** 53 54 SelectListener base class 55 56 Contains all base functionality which is not related to the particular 57 IConnectionHandler subclass used in SelectListener. 58 59 ******************************************************************************/ 60 61 abstract class ISelectListener : ISelectClient 62 { 63 import core.sys.posix.netinet.in_: SOCK_STREAM; 64 import core.sys.posix.sys.socket: accept, SOL_SOCKET, SO_REUSEADDR, sockaddr; 65 import core.sys.posix.unistd: close; 66 67 /************************************************************************** 68 69 Socket, memorises the address most recently passed to bind() or 70 connect() or obtained by accept(). 71 72 **************************************************************************/ 73 74 private ISocket socket; 75 76 /************************************************************************** 77 78 Termination flag; true prevents accepting new connections 79 80 **************************************************************************/ 81 82 private bool terminated = false; 83 84 /************************************************************************** 85 86 Exception instance thrown in case of socket errors. 87 88 **************************************************************************/ 89 90 private SocketError e; 91 92 /************************************************************************** 93 94 Constructor 95 96 Creates the server socket -- a streaming socket of the family (aka 97 "domain") specified in `address` and the protocol according to 98 `protocol` -- and registers it for incoming connections. 99 100 `address.sa_family` and `protocol` are passed to `socket(2)` together 101 with the `SOCK_STREAM` type, so the socket family defined by 102 `address.sa_family` is required to support streaming, and `protocol` 103 needs to be a streaming protocol. Socket families supporting streaming 104 include IPv4/IPv6 and UNIX domain sockets (`AF_LOCAL`). 105 `protocol == 0` makes `socket(2)` pick the default streaming protocol 106 for `address.sa_family`. For IPv4/IPv6 the default protocol is TCP. 107 UNIX domain sockets and some other families support only one streaming 108 protocol so for those the default is unambiguous. 109 110 Standards: 111 Posix = http://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html 112 Linux = http://linux.die.net/man/2/socket 113 114 Params: 115 address = the socket address and family, must support streaming 116 socket = the server socket 117 backlog = the maximum length to which the queue of pending 118 connections for sockfd may grow. If a connection request arrives 119 when the queue is full, the client may receive an error with an 120 indication of ECONNREFUSED or, if the underlying protocol 121 supports retransmission, the request may be ignored so that a 122 later reattempt at connection succeeds. 123 (from http://linux.die.net/man/2/listen) 124 protocol = the socket protocol, for a streaming socket of the 125 family specified in address, or 0 to use the default protocol 126 for a streaming socket of the specified family 127 128 **************************************************************************/ 129 130 protected this ( sockaddr* address, ISocket socket, int backlog = 32, 131 int protocol= 0) 132 { 133 this.socket = socket; 134 135 this.e = new SocketError(this.socket); 136 137 // SOCK_NONBLOCK is a Linux extension, which can be combined with the 138 // actual second argument (SOCK_STREAM) in order to mark the 139 // I/O operations to the socket as non-blocking. 140 this.e.enforce( 141 this.socket.socket(address.sa_family, 142 SOCK_STREAM | SocketFlags.SOCK_NONBLOCK, 143 protocol) >= 0, 144 "error creating socket" 145 ); 146 147 this.e.enforce( 148 !this.socket.setsockoptVal(SOL_SOCKET, SO_REUSEADDR, true), 149 "error enabling reuse of address" 150 ); 151 152 this.e.assertExSock(!this.socket.bind(address), 153 "error binding socket", __FILE__, __LINE__); 154 155 this.e.assertExSock(!this.socket.listen(backlog), 156 "error listening on socket", __FILE__, __LINE__); 157 } 158 159 /************************************************************************** 160 161 Implements ISelectClient abstract method. 162 163 Returns: 164 events to register the conduit for. 165 166 **************************************************************************/ 167 168 public override Event events ( ) 169 { 170 return Event.EPOLLIN; 171 } 172 173 /************************************************************************** 174 175 Implements ISelectClient abstract method. 176 177 Returns: 178 conduit's OS file handle (fd) 179 180 **************************************************************************/ 181 182 public override Handle fileHandle ( ) 183 { 184 return this.socket.fileHandle; 185 } 186 187 /************************************************************************** 188 189 I/O event handler 190 191 Called from SelectDispatcher during event loop. 192 193 Params: 194 event = identifier of I/O event that just occurred on the device 195 196 Returns: 197 true if the handler should be called again on next event occurrence 198 or false if this instance should be unregistered from the 199 SelectDispatcher (this is effectively a server shutdown). 200 201 TODO: accept() could be called in a loop in this method, in order to 202 accept as many connections as possible each time the EPOLLIN event fires 203 for the listening socket 204 205 **************************************************************************/ 206 207 final override bool handle ( Event event ) 208 { 209 if (!this.terminated) 210 { 211 try 212 { 213 IConnectionHandler handler = this.getConnectionHandler(); 214 this.acceptConnection(handler); 215 } 216 catch (Exception) 217 { 218 /* Catch an exception (or object) thrown by 219 getConnectionHandler() to prevent it from falling through 220 to the dispatcher which would unregister the server socket. */ 221 this.declineConnection(); 222 } 223 } 224 225 return !this.terminated; 226 } 227 228 /************************************************************************** 229 230 Closes the server socket and sets this instance to terminated mode. 231 232 TODO: Make it possible to reopen the server socket and resume operation? 233 234 Returns: 235 true if this instance was already in to terminated mode or false 236 otherwise 237 238 **************************************************************************/ 239 240 final bool terminate ( ) 241 { 242 if (!this.terminated) 243 { 244 this.terminated = true; 245 246 try 247 { 248 this.e.enforce( 249 !this.socket.shutdown(), 250 "error on socket shutdown" 251 ); 252 } 253 finally 254 { 255 this.socket.close(); 256 } 257 return false; 258 } 259 260 return true; 261 } 262 263 /************************************************************************** 264 265 Returns: 266 information interface to the connections pool 267 268 **************************************************************************/ 269 270 abstract IPoolInfo poolInfo ( ); 271 272 /************************************************************************** 273 274 Sets the limit of the number of connections. 0 disables the limitation. 275 276 Notes: 277 - If limit is set to something other than 0, limit connection 278 handler objects will be created (so set it to a realistic value). 279 - If not 0, limit must be at least the number of currently busy 280 connections. 281 282 Returns: 283 connection limit 284 285 **************************************************************************/ 286 287 abstract size_t connection_limit ( size_t limit ) ; 288 289 /************************************************************************** 290 291 Returns: 292 the limit of the number of connections or 0 if limitation is 293 disabled. 294 295 **************************************************************************/ 296 297 public size_t connection_limit ( ) 298 { 299 auto n = this.poolInfo.limit; 300 301 return (n == n.max)? 0 : n; 302 } 303 304 /************************************************************************** 305 306 Closes all connections and terminates the listener. 307 308 **************************************************************************/ 309 310 abstract public void shutdown ( ); 311 312 /************************************************************************** 313 314 Obtains a connection handler instance from the pool. 315 316 Returns: 317 connection handler 318 319 **************************************************************************/ 320 321 abstract protected IConnectionHandler getConnectionHandler ( ); 322 323 /************************************************************************** 324 325 Accepts the next pending incoming client connection and assigns it to 326 a connection handler. 327 328 Params: 329 handler = handler to assign connection to 330 331 **************************************************************************/ 332 333 private void acceptConnection ( IConnectionHandler handler ) 334 { 335 try 336 { 337 handler.assign(this.socket); 338 339 handler.handleConnection(); 340 } 341 catch (Exception e) 342 { 343 /* Catch an exception thrown by accept() or handleConnection() 344 (or noDelay()/blocking()) to prevent it from falling through 345 to the select dispatcher which would unregister the server 346 socket. 347 348 'Too many open files' will be caught here. 349 350 FIXME: If noDelay() or blocking() fails, the handler will 351 incorrectly assume that the connection is not open and will 352 not close it. Is this a relevant case? */ 353 handler.error(e); // will never throw exceptions 354 355 handler.finalize(); 356 } 357 } 358 359 /************************************************************************** 360 361 Accepts the next pending incoming client connection and closes it. 362 363 **************************************************************************/ 364 365 private void declineConnection ( ) 366 { 367 // This is using the C binding 368 if (close(accept(this.socket.fileHandle, null, null))) // returns non-zero on failure 369 { 370 .errno = 0; 371 } 372 } 373 } 374 375 /****************************************************************************** 376 377 SelectListener class template 378 379 The additional T constructor argument parameters must appear after those for 380 the mandatory IConnectionHandler constructor. 381 382 Params: 383 T = connection handler class 384 Args = additional constructor arguments for T 385 386 TODO: try using the non-auto ctor pool, for template simplicity! 387 388 ******************************************************************************/ 389 390 public class SelectListener ( T : IConnectionHandler, Args ... ) : ISelectListener 391 { 392 /************************************************************************** 393 394 ObjectPool of connection handlers 395 396 **************************************************************************/ 397 398 private alias SelectListenerPool!(T, Args) ConnPool; 399 400 private ConnPool receiver_pool; 401 402 /************************************************************************** 403 404 String buffer used for connection logging. 405 406 **************************************************************************/ 407 408 private mstring connection_log_buf; 409 410 /************************************************************************** 411 412 Constructor 413 414 Creates the server socket and registers it for incoming connections. 415 416 Params: 417 address = the address of the socket 418 socket = the server socket 419 dispatcher = SelectDispatcher instance to use 420 args = additional T constructor arguments, might be empty 421 backlog = (see ISelectListener ctor) 422 423 **************************************************************************/ 424 425 public this ( sockaddr* address, ISocket socket, Args args, int backlog = 32 ) 426 { 427 super(address, socket, backlog); 428 429 this.receiver_pool = new ConnPool(&this.returnToPool, args); 430 } 431 432 /************************************************************************** 433 434 Obtains a connection handler instance from the pool. 435 436 Returns: 437 connection handler 438 439 **************************************************************************/ 440 441 protected override IConnectionHandler getConnectionHandler ( ) 442 { 443 return this.receiver_pool.get(); 444 } 445 446 /************************************************************************** 447 448 Sets the limit of the number of connections. 0 disables the limitation. 449 450 Notes: 451 - If limit is set to something other than 0, limit connection 452 handler objects will be created (so set it to a realistic value). 453 - If not 0, limit must be at least the number of currently busy 454 connections. 455 456 Returns: 457 limit 458 459 **************************************************************************/ 460 461 public override size_t connection_limit ( size_t limit ) 462 { 463 verify(!(limit && limit < this.poolInfo.num_busy), 464 typeof(this).stringof ~ ".connection_limit: limit already exceeded"); 465 466 if (limit) 467 { 468 this.receiver_pool.setLimit(limit); 469 } 470 else 471 { 472 this.receiver_pool.limited = false; 473 } 474 475 return limit; 476 } 477 478 /************************************************************************** 479 480 (Overriding wrapper to fix method matching.) 481 482 Returns: 483 new limit of number of connections or 0 if unlimited. 484 485 **************************************************************************/ 486 487 public override size_t connection_limit ( ) 488 { 489 return super.connection_limit; 490 } 491 492 /************************************************************************** 493 494 Minimizes the connection pool to n connections by deleting idle 495 connection objects. If more than n connections are currently busy, 496 all idle connections are deleted. 497 498 Params: 499 n = minimum number of connection objects to keep in the pool. 500 501 Returns: 502 the number of connection object in the pool after minimizing, which 503 is the greater of n and the number of currently busy connections. 504 505 **************************************************************************/ 506 507 public size_t minimize ( uint n = 0 ) 508 out (still_existent) 509 { 510 assert (still_existent >= n); 511 } 512 do 513 { 514 size_t limit = this.receiver_pool.limit, 515 busy = this.receiver_pool.num_busy; 516 517 scope (exit) this.receiver_pool.setLimit(limit); 518 519 return this.receiver_pool.setLimit((n > busy)? n : busy); 520 } 521 522 /************************************************************************** 523 524 Returns: 525 information interface to the connections pool 526 527 **************************************************************************/ 528 529 public override ISelectListenerPoolInfo poolInfo ( ) 530 { 531 return this.receiver_pool; 532 } 533 534 /*************************************************************************** 535 536 Writes connection information to log file. 537 538 ***************************************************************************/ 539 540 public void connectionLog ( ) 541 { 542 auto conns = this.poolInfo; 543 544 log.info("Connection pool: {} busy, {} idle", conns.num_busy, 545 conns.num_idle); 546 547 foreach ( i, conn; conns ) 548 { 549 this.connection_log_buf.length = 0; 550 sformat(this.connection_log_buf, "{}: ", i); 551 552 conn.formatInfo(this.connection_log_buf); 553 554 log.info(this.connection_log_buf); 555 } 556 } 557 558 /************************************************************************** 559 560 Closes all connections and terminates the listener. 561 562 **************************************************************************/ 563 564 public override void shutdown ( ) 565 { 566 scope busy_connections = this.receiver_pool..new BusyItemsIterator; 567 foreach ( busy_connection; busy_connections ) 568 { 569 /* FIXME: calling finalize here will cause errors in any connection 570 * handlers which are currently selected in epoll, as they will 571 * subsequently attempt to finalize themselves again. 572 * 573 * In practice this is of little import however, as the whole server 574 * is being shut down. It may be nice to find a clean way to avoid 575 * this though. 576 */ 577 busy_connection.finalize(); 578 } 579 580 super.terminate(); 581 } 582 583 /************************************************************************** 584 585 Called as the finalizer of class T. Returns connection into the object 586 pool. 587 588 Params: 589 connection = connection hander instance to return into pool 590 591 **************************************************************************/ 592 593 private void returnToPool ( IConnectionHandler connection ) 594 { 595 verify(cast (T) connection !is null, 596 typeof(this).stringof ~ ".returnToPool: connection is null"); 597 598 debug ( ConnectionHandler ) 599 log.trace("[{}]: Returning to pool", connection.connection_id); 600 601 this.receiver_pool.recycle(cast (T) connection); 602 } 603 } 604 605 /// 606 unittest 607 { 608 // A shared resource, owned by the server. A reference is passed to each 609 // connection handler. 610 class SomeSharedResource 611 { 612 } 613 614 // The connection handler class. The server owns a select listener instance. 615 // The select listener owns a pool of connection handlers. When a new 616 // connection is accepted by the select listener, a connection handler is 617 // taken from the pool (or allocated, if there are no free items in the 618 // pool) and set to handle the incoming connection. 619 // (Connection handler classes must not be nested, in order for the pool to 620 // be able to automatically allocate instances. In this case, it must be 621 // declared static, as the class definition is "nested" inside a unittest.) 622 static class MyConnectionHandler : IConnectionHandler 623 { 624 import ocean.net.server.connection.IConnectionHandlerInfo; 625 import ocean.sys.socket.AddressIPSocket; 626 627 /// Reference to the global shared resource (passed to the ctor). 628 private SomeSharedResource shared_resource; 629 630 /// Flag set when an IO error occurs (see error()). 631 private bool error_occurred; 632 633 /*********************************************************************** 634 635 Constructs a connection handler. 636 637 Params: 638 finalize_dg = delegate required by the super class. 639 Automatically set by SelectListenerPool to be 640 SelectListener.returnToPool. Called by the super class when 641 finalize() is called 642 shared_resource = reference to a global shared resource 643 644 ***********************************************************************/ 645 646 public this ( scope FinalizeDg finalize_dg, SomeSharedResource shared_resource ) 647 { 648 this.shared_resource = shared_resource; 649 650 super(new AddressIPSocket!(), finalize_dg, &this.error); 651 } 652 653 /// The actual logic for handling a connection. 654 override public void handleConnection ( ) 655 { 656 // Do something in here. 657 658 // When you're finished, call finalize(), which recycles this 659 // connection into the pool owned by the select listener. 660 this.finalize(); 661 } 662 663 /// Tells the super class whether an I/O error occurred. 664 override protected bool io_error ( ) 665 { 666 return this.error_occurred; 667 } 668 669 /// Called by epoll when an I/O error occurs. 670 private void error ( Exception exception, Event event, 671 IConnectionHandlerInfo info ) 672 { 673 this.error_occurred = true; 674 } 675 676 /// Unregisters the socket from the epoll before closing it. Called by 677 /// finalize. 678 override protected void unregisterSocket () 679 { 680 } 681 } 682 683 // Top-level server class which owns the select listener and all global 684 // resources. 685 class MyServer 686 { 687 import ocean.sys.socket.AddressIPSocket; 688 import ocean.sys.socket.InetAddress; 689 import ocean.io.select.EpollSelectDispatcher; 690 691 /// Select listener alias. 692 private alias SelectListener!(MyConnectionHandler, SomeSharedResource) 693 MySelectListener; 694 695 /// Select listener instance. 696 private MySelectListener listener; 697 698 /// Epoll instance. 699 private EpollSelectDispatcher epoll; 700 701 /// Constructs a server. 702 public this ( ) 703 { 704 InetAddress!(false) addr; 705 auto socket = new AddressIPSocket!(); 706 auto shared_resource = new SomeSharedResource; 707 708 this.listener = new MySelectListener(addr("127.0.0.1", 2009), 709 socket, shared_resource); 710 711 this.epoll = new EpollSelectDispatcher; 712 } 713 714 /// Starts the server. 715 public void start ( ) 716 { 717 // The listener is an ISelectClient, so must be registered with 718 // epoll in order to do anything. 719 this.epoll.register(this.listener); 720 721 // The event loop must also be running. 722 this.epoll.eventLoop(); 723 } 724 725 /// Stops the server. 726 public void stop ( ) 727 { 728 this.listener.shutdown(); 729 } 730 } 731 }