1 /******************************************************************************* 2 3 Queue that provides a notification mechanism for when new items were added 4 5 Generic interfaces and logic for RequestQueues and related classes. 6 7 Genericly speaking, a request handler delegate is being registered at 8 the queue (ready()). The notifying queue will then call notify() to inform 9 it about a new item added, notify() is expected to call pop() to receive 10 that item. It should keep calling pop() until no items are left 11 and then re-register at the queue and wait for another call to notify(). 12 In other words: 13 14 1. NotifyingQueue.ready(¬ify) 15 16 2. NotifyingQueue.ready calls notify() 17 18 3. notify() calls NotifyingQueue.pop(); 19 20 * pop() returned a request: notify() processes data, back to 3. 21 22 * pop() returned null: continue to 4. 23 24 4. notify() calls NotifyingQueue.ready(¬ify) 25 26 A more simple solution like this was considered: 27 28 1. NotifyingQueue.ready(¬ify) 29 30 2. NotifyingQueue calls notify(Request) 31 32 3. notify() processes, back to 1. 33 34 But was decided against because it would cause a stack overflow for fibers, 35 as a RequestHandler needs to call RequestQueue.ready() and if fibers are 36 involved that call will be issued from within the fiber. 37 If ready() calls notify again another processing of a request in the fiber 38 will happen, causing another call to ready() leading to a recursion. 39 40 Now we require that the fiber calls pop in a loop. 41 42 Usage example for a hypothetical client who writes numbers to a socket 43 --- 44 module NotifyingQueueExample; 45 46 import ocean.util.container.queue.NotifyingQueue; 47 48 void main ( ) 49 { 50 auto notifying_queue = new NotifyingByteQueue(1024 * 40); 51 52 void notee ( ) 53 { 54 while (true) 55 { 56 auto popped = cast(char[]) notifying_queue.pop() 57 58 if ( popped !is null ) 59 { 60 Stdout.formatln("Popped from the queue: {}", popped); 61 } 62 else break; 63 } 64 65 notifying_queue.ready(¬ee); 66 } 67 68 notifying_queue.ready(¬ee); 69 70 numbers.sendNumber(23); 71 numbers.sendNumber(85); 72 numbers.sendNumber(42); 73 } 74 --- 75 76 Copyright: 77 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 78 All rights reserved. 79 80 License: 81 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 82 Alternatively, this file may be distributed under the terms of the Tango 83 3-Clause BSD License (see LICENSE_BSD.txt for details). 84 85 *******************************************************************************/ 86 87 module ocean.util.container.queue.NotifyingQueue; 88 89 90 import ocean.util.container.queue.FlexibleRingQueue; 91 92 import ocean.util.container.queue.model.IByteQueue; 93 94 import ocean.util.container.queue.model.IQueueInfo; 95 96 import ocean.io.model.ISuspendable; 97 98 import ocean.util.serialize.contiguous.Contiguous; 99 100 import ocean.util.serialize.contiguous.Serializer; 101 102 import ocean.util.serialize.contiguous.Deserializer; 103 104 import ocean.core.Array; 105 106 import ocean.transition; 107 108 import ocean.util.container.AppendBuffer; 109 110 import ocean.io.serialize.StructSerializer; 111 112 version ( UnitTest ) 113 { 114 import ocean.core.Test; 115 } 116 117 /******************************************************************************* 118 119 Request Queue implementation and logic. 120 121 A concrete client will probably prefer to use the templated version 122 123 *******************************************************************************/ 124 125 class NotifyingByteQueue : ISuspendable, IQueueInfo 126 { 127 import ocean.core.Verify; 128 129 /*************************************************************************** 130 131 Type of the delegate used for notifications 132 133 ***************************************************************************/ 134 135 public alias void delegate ( ) NotificationDg; 136 137 /*************************************************************************** 138 139 Queue being used 140 141 ***************************************************************************/ 142 143 private IByteQueue queue; 144 145 /*************************************************************************** 146 147 Whether the queue is enabled or not. Set/unset by the suspend() / 148 resume() methods. When enabled is false, the queue behaves as if it is 149 empty (no waiting notification delegates will be called). 150 151 ***************************************************************************/ 152 153 private bool enabled = true; 154 155 /*************************************************************************** 156 157 Array of delegates waiting for notification of data in queue 158 159 ***************************************************************************/ 160 161 private AppendBuffer!(NotificationDg) notifiers; 162 163 /*************************************************************************** 164 165 Constructor 166 167 Params: 168 max_bytes = size of the queue in bytes 169 170 ***************************************************************************/ 171 172 public this ( size_t max_bytes ) 173 { 174 this(new FlexibleByteRingQueue(max_bytes)); 175 } 176 177 178 /*************************************************************************** 179 180 Constructor 181 182 Params: 183 queue = instance of the queue implementation that will be used 184 185 ***************************************************************************/ 186 187 public this ( IByteQueue queue ) 188 { 189 this.queue = queue; 190 191 this.notifiers = new AppendBuffer!(NotificationDg); 192 } 193 194 195 /*************************************************************************** 196 197 Finds out whether the provided number of bytes will fit in the queue. 198 Also considers the need of wrapping. 199 200 Note that this method internally adds on the extra bytes required for 201 the item header, so it is *not* necessary for the end-user to first 202 calculate the item's push size. 203 204 Params: 205 bytes = size of item to check 206 207 Returns: 208 true if the bytes fits, else false 209 210 ***************************************************************************/ 211 212 public bool willFit ( size_t bytes ) 213 { 214 return this.queue.willFit(bytes); 215 } 216 217 218 219 /*************************************************************************** 220 221 Returns: 222 total number of bytes used by queue (used space + free space) 223 224 ***************************************************************************/ 225 226 public ulong total_space ( ) 227 { 228 return this.queue.total_space(); 229 } 230 231 232 /*************************************************************************** 233 234 Returns: 235 number of bytes stored in queue 236 237 ***************************************************************************/ 238 239 public ulong used_space ( ) 240 { 241 return this.queue.used_space(); 242 } 243 244 245 /*************************************************************************** 246 247 Returns: 248 number of bytes free in queue 249 250 ***************************************************************************/ 251 252 public ulong free_space ( ) 253 { 254 return this.queue.free_space(); 255 } 256 257 258 /*************************************************************************** 259 260 Returns: 261 the number of items in the queue 262 263 ***************************************************************************/ 264 265 public size_t length ( ) 266 { 267 return this.queue.length(); 268 } 269 270 271 /*************************************************************************** 272 273 Tells whether the queue is empty. 274 275 Returns: 276 true if the queue is empty 277 278 ***************************************************************************/ 279 280 public bool is_empty ( ) 281 { 282 return this.queue.is_empty(); 283 } 284 285 286 /*************************************************************************** 287 288 register an handler as available 289 290 Params: 291 notifier = handler that is now available 292 293 Returns: 294 false if the handler was called right away without 295 even registering 296 true if the handler was just added to the queue 297 298 ***************************************************************************/ 299 300 public bool ready ( scope NotificationDg notifier ) 301 { 302 debug foreach ( waiting_notifier; this.notifiers[] ) 303 { 304 verify (waiting_notifier !is notifier, 305 "RequestQueue.ready: notifier already registered"); 306 } 307 308 if (!this.is_empty() && this.enabled) 309 { 310 notifier(); 311 return false; 312 } 313 else 314 { 315 this.notifiers ~= notifier; 316 return true; 317 } 318 } 319 320 /*************************************************************************** 321 322 Check whether the provided notifier is already registered. 323 This allows the code to avoid calling ready() with the same notifier, 324 which may throw or add duplicate notifiers. 325 326 Note: This is an O(n) search, however it should not have a 327 performance impact in most cases since the number of registered 328 notifiers is typically very low. 329 330 Params: 331 notifier = the callback to check for 332 333 Returns: 334 true if the notifier is registered 335 336 ***************************************************************************/ 337 338 final public bool isRegistered ( scope NotificationDg notifier ) 339 { 340 foreach (wait_notifier; this.notifiers[]) 341 { 342 if (notifier is wait_notifier) 343 return true; 344 } 345 346 return false; 347 } 348 349 350 /*************************************************************************** 351 352 Returns: 353 how many notification delegates are waiting for data 354 355 ***************************************************************************/ 356 357 final public size_t waiting ( ) 358 { 359 return this.notifiers.length; 360 } 361 362 363 /*************************************************************************** 364 365 Push an item into the queue and notify the next waiting notification 366 delegate about it. 367 368 Params: 369 data = array of data that the item consists of 370 371 Returns: 372 true if push was successful 373 false if not 374 375 **************************************************************************/ 376 377 public bool push ( in void[] data ) 378 { 379 if ( !this.queue.push(data) ) return false; 380 381 this.notify(); 382 383 return true; 384 } 385 386 387 /*************************************************************************** 388 389 Push an item into the queue and notify the next waiting handler about 390 it. 391 392 Params: 393 size = size of the item to push 394 filler = delegate that will be called with that item to fill in the 395 actual data 396 397 Returns: 398 true if push was successful 399 false if not 400 401 ***************************************************************************/ 402 403 public bool push ( size_t size, scope void delegate ( void[] ) filler ) 404 { 405 auto target = this.queue.push(size); 406 407 if (target is null) return false; 408 409 filler(target); 410 411 this.notify(); 412 413 return true; 414 } 415 416 417 /*************************************************************************** 418 419 suspend consuming of the queue 420 421 ***************************************************************************/ 422 423 public void suspend ( ) 424 { 425 if (this.enabled == false) 426 { 427 return; 428 } 429 430 this.enabled = false; 431 } 432 433 434 /*************************************************************************** 435 436 Returns true if the queue is suspended, else false 437 438 ***************************************************************************/ 439 440 public bool suspended ( ) 441 { 442 return this.enabled == false; 443 } 444 445 446 /*************************************************************************** 447 448 resume consuming of the queue 449 450 ***************************************************************************/ 451 452 public void resume ( ) 453 { 454 if (this.enabled == true) 455 { 456 return; 457 } 458 459 this.enabled = true; 460 461 foreach (notifier; this.notifiers[]) 462 { 463 this.notify(); 464 } 465 } 466 467 468 /*************************************************************************** 469 470 pops an element if the queue is enabled 471 472 ***************************************************************************/ 473 474 public void[] pop ( ) 475 { 476 if ( !this.enabled ) 477 { 478 return null; 479 } 480 481 return this.queue.pop(); 482 } 483 484 485 /*************************************************************************** 486 487 Calls the next waiting notification delegate, if queue is enabled. 488 489 ***************************************************************************/ 490 491 private void notify ( ) 492 { 493 if ( this.notifiers.length > 0 && this.enabled ) 494 { 495 auto dg = notifiers.cut(); 496 497 dg(); 498 } 499 } 500 } 501 502 503 /******************************************************************************* 504 505 Templated Notifying Queue implementation 506 507 A concrete client should have an instance of this class and use it 508 to manage the connections and requests 509 510 Note: the stored type T is automatically de/serialized using the 511 StructSerializer. This performs a deep serialization of sub-structs and 512 array members. Union members are shallowly serialized. Delegate and class 513 members cannot be serialized. 514 515 Params: 516 T = type that the queue should store. If it's a struct it is stored 517 using the struct serializer, else it is storing it directly. Note 518 that by default the memory is not gc-aware so you reference 519 something from only the stored object, the gc could collect it. If 520 you desire different behavior pass your own queue instance to the 521 constructor 522 523 *******************************************************************************/ 524 525 class NotifyingQueue ( T ) : NotifyingByteQueue 526 { 527 // add to overload set 528 alias NotifyingByteQueue.push push; 529 530 /*************************************************************************** 531 532 Constructor 533 534 Params: 535 max_bytes = size of the queue in bytes 536 537 ***************************************************************************/ 538 539 public this ( size_t max_bytes ) 540 { 541 super(max_bytes); 542 } 543 544 545 /*************************************************************************** 546 547 Constructor 548 549 Params: 550 queue = instance of the queue implementation that will be used 551 552 ***************************************************************************/ 553 554 public this ( IByteQueue queue ) 555 { 556 super(queue); 557 } 558 559 560 /*************************************************************************** 561 562 Push a new request on the queue 563 564 Params: 565 request = The request to push 566 567 Returns: 568 true if push was successful 569 false if not 570 571 ***************************************************************************/ 572 573 bool push ( ref T request ) 574 { 575 static if ( is(T == struct) ) 576 auto length = Serializer.countRequiredSize(request); 577 else 578 auto length = request.sizeof; 579 580 void filler ( void[] target ) 581 { 582 static if ( is(T == struct) ) 583 Serializer.serialize(request, target); 584 else 585 target[] = (&request)[0..1]; 586 } 587 588 return super.push(length, &filler); 589 } 590 591 static if ( is(T == struct) ) 592 { 593 /*********************************************************************** 594 595 Pops a Request instance from the queue 596 597 Params: 598 cont_buffer = contiguous buffer to deserialize to 599 600 Returns: 601 pointer to the deserialized struct, completely allocated in the 602 given buffer 603 604 ***********************************************************************/ 605 606 T* pop ( ref Contiguous!(T) cont_buffer ) 607 { 608 if ( !this.enabled ) return null; 609 610 T* instance; 611 612 auto data = super.pop(); 613 614 if (data is null) 615 { 616 return null; 617 } 618 619 Const!(void[]) void_buffer = data; 620 621 Deserializer.deserialize(void_buffer, cont_buffer); 622 623 return cont_buffer.ptr; 624 } 625 } 626 else 627 { 628 /*********************************************************************** 629 630 Pops a Request instance from the queue 631 632 Params: 633 buffer = deserialisation buffer to use 634 635 Returns: 636 pointer to the deserialized item, completely allocated in the 637 given buffer 638 639 ***********************************************************************/ 640 641 T* pop ( ref void[] buffer ) 642 { 643 if ( !this.enabled ) return null; 644 645 T* instance; 646 647 auto data = super.pop(); 648 649 if (data is null) 650 { 651 return null; 652 } 653 654 buffer.copy(data); 655 656 return cast(T*)buffer.ptr; 657 } 658 } 659 } 660 661 unittest 662 { 663 void dg ( ) { } 664 665 auto queue = new NotifyingByteQueue(1024); 666 test(!queue.isRegistered(&dg)); 667 668 queue.ready(&dg); 669 test(queue.isRegistered(&dg)); 670 } 671 672 /// NotifyingQueue with a non-struct type 673 unittest 674 { 675 auto queue = new NotifyingQueue!(char[])(1024); 676 677 char[][] arr = ["foo".dup, "bar".dup]; 678 679 queue.push(arr[0]); 680 queue.push(arr[1]); 681 682 void[] buffer_1; 683 684 auto str_0 = queue.pop(buffer_1); 685 686 test!("==")(*str_0, "foo"); 687 688 void[] buffer_2; 689 690 auto str_1 = queue.pop(buffer_2); 691 692 test!("==")(*str_0, "foo"); // ensure there was no overwrite 693 test!("==")(*str_1, "bar"); 694 } 695 696 /// NotifyingQueue with a struct 697 unittest 698 { 699 struct S { char[] value; } 700 701 S[2] arr = [S("foo".dup), S("bar".dup)]; 702 703 auto queue = new NotifyingQueue!(S)(1024); 704 705 queue.push(arr[0]); 706 queue.push(arr[1]); 707 708 Contiguous!(S) ctg_1; 709 710 auto s0 = queue.pop(ctg_1); 711 712 test!("==")(s0.value, "foo"); 713 714 Contiguous!(S) ctg_2; 715 716 auto s1 = queue.pop(ctg_2); 717 718 test!("==")(s0.value, "foo"); // ensure there was no overwrite 719 test!("==")(s1.value, "bar"); 720 } 721 722 // Make sure NotifyingQueue template is instantinated & compiled 723 unittest 724 { 725 struct Dummy 726 { 727 int a; 728 int b; 729 char[] c; 730 } 731 732 void dg ( ) { } 733 734 auto queue = new NotifyingQueue!(Dummy)(1024); 735 test(!queue.isRegistered(&dg)); 736 737 queue.ready(&dg); 738 test(queue.isRegistered(&dg)); 739 } 740 741 742 unittest 743 { 744 auto q = new NotifyingQueue!(char)(256); 745 } 746