1 /******************************************************************************* 2 3 Queue that provides a notification mechanism for when new items were added 4 5 Generic interfaces and logic for RequestQueues and related classes. 6 7 Genericly speaking, a request handler delegate is being registered at 8 the queue (ready()). The notifying queue will then call notify() to inform 9 it about a new item added, notify() is expected to call pop() to receive 10 that item. It should keep calling pop() until no items are left 11 and then re-register at the queue and wait for another call to notify(). 12 In other words: 13 14 1. NotifyingQueue.ready(¬ify) 15 16 2. NotifyingQueue.ready calls notify() 17 18 3. notify() calls NotifyingQueue.pop(); 19 20 * pop() returned a request: notify() processes data, back to 3. 21 22 * pop() returned null: continue to 4. 23 24 4. notify() calls NotifyingQueue.ready(¬ify) 25 26 A more simple solution like this was considered: 27 28 1. NotifyingQueue.ready(¬ify) 29 30 2. NotifyingQueue calls notify(Request) 31 32 3. notify() processes, back to 1. 33 34 But was decided against because it would cause a stack overflow for fibers, 35 as a RequestHandler needs to call RequestQueue.ready() and if fibers are 36 involved that call will be issued from within the fiber. 37 If ready() calls notify again another processing of a request in the fiber 38 will happen, causing another call to ready() leading to a recursion. 39 40 Now we require that the fiber calls pop in a loop. 41 42 Usage example for a hypothetical client who writes numbers to a socket 43 --- 44 // Workaround: https://github.com/dlang/dub/issues/1761 45 module_ NotifyingQueueExample; 46 47 import ocean.util.container.queue.NotifyingQueue; 48 49 void main ( ) 50 { 51 auto notifying_queue = new NotifyingByteQueue(1024 * 40); 52 53 void notee ( ) 54 { 55 while (true) 56 { 57 auto popped = cast(char[]) notifying_queue.pop() 58 59 if ( popped !is null ) 60 { 61 Stdout.formatln("Popped from the queue: {}", popped); 62 } 63 else break; 64 } 65 66 notifying_queue.ready(¬ee); 67 } 68 69 notifying_queue.ready(¬ee); 70 71 numbers.sendNumber(23); 72 numbers.sendNumber(85); 73 numbers.sendNumber(42); 74 } 75 --- 76 77 Copyright: 78 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 79 All rights reserved. 80 81 License: 82 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 83 Alternatively, this file may be distributed under the terms of the Tango 84 3-Clause BSD License (see LICENSE_BSD.txt for details). 85 86 *******************************************************************************/ 87 88 module ocean.util.container.queue.NotifyingQueue; 89 90 91 import ocean.util.container.queue.FlexibleRingQueue; 92 93 import ocean.util.container.queue.model.IByteQueue; 94 95 import ocean.util.container.queue.model.IQueueInfo; 96 97 import ocean.io.model.ISuspendable; 98 99 import ocean.util.serialize.contiguous.Contiguous; 100 101 import ocean.util.serialize.contiguous.Serializer; 102 103 import ocean.util.serialize.contiguous.Deserializer; 104 105 import ocean.core.Array; 106 107 import ocean.meta.types.Qualifiers; 108 109 import ocean.util.container.AppendBuffer; 110 111 import ocean.io.serialize.StructSerializer; 112 113 version (unittest) 114 { 115 import ocean.core.Test; 116 } 117 118 /******************************************************************************* 119 120 Request Queue implementation and logic. 121 122 A concrete client will probably prefer to use the templated version 123 124 *******************************************************************************/ 125 126 class NotifyingByteQueue : ISuspendable, IQueueInfo 127 { 128 import ocean.core.Verify; 129 130 /*************************************************************************** 131 132 Type of the delegate used for notifications 133 134 ***************************************************************************/ 135 136 public alias void delegate ( ) NotificationDg; 137 138 /*************************************************************************** 139 140 Queue being used 141 142 ***************************************************************************/ 143 144 private IByteQueue queue; 145 146 /*************************************************************************** 147 148 Whether the queue is enabled or not. Set/unset by the suspend() / 149 resume() methods. When enabled is false, the queue behaves as if it is 150 empty (no waiting notification delegates will be called). 151 152 ***************************************************************************/ 153 154 private bool enabled = true; 155 156 /*************************************************************************** 157 158 Array of delegates waiting for notification of data in queue 159 160 ***************************************************************************/ 161 162 private AppendBuffer!(NotificationDg) notifiers; 163 164 /*************************************************************************** 165 166 Constructor 167 168 Params: 169 max_bytes = size of the queue in bytes 170 171 ***************************************************************************/ 172 173 public this ( size_t max_bytes ) 174 { 175 this(new FlexibleByteRingQueue(max_bytes)); 176 } 177 178 179 /*************************************************************************** 180 181 Constructor 182 183 Params: 184 queue = instance of the queue implementation that will be used 185 186 ***************************************************************************/ 187 188 public this ( IByteQueue queue ) 189 { 190 this.queue = queue; 191 192 this.notifiers = new AppendBuffer!(NotificationDg); 193 } 194 195 196 /*************************************************************************** 197 198 Finds out whether the provided number of bytes will fit in the queue. 199 Also considers the need of wrapping. 200 201 Note that this method internally adds on the extra bytes required for 202 the item header, so it is *not* necessary for the end-user to first 203 calculate the item's push size. 204 205 Params: 206 bytes = size of item to check 207 208 Returns: 209 true if the bytes fits, else false 210 211 ***************************************************************************/ 212 213 public bool willFit ( size_t bytes ) 214 { 215 return this.queue.willFit(bytes); 216 } 217 218 219 220 /*************************************************************************** 221 222 Returns: 223 total number of bytes used by queue (used space + free space) 224 225 ***************************************************************************/ 226 227 public ulong total_space ( ) 228 { 229 return this.queue.total_space(); 230 } 231 232 233 /*************************************************************************** 234 235 Returns: 236 number of bytes stored in queue 237 238 ***************************************************************************/ 239 240 public ulong used_space ( ) 241 { 242 return this.queue.used_space(); 243 } 244 245 246 /*************************************************************************** 247 248 Returns: 249 number of bytes free in queue 250 251 ***************************************************************************/ 252 253 public ulong free_space ( ) 254 { 255 return this.queue.free_space(); 256 } 257 258 259 /*************************************************************************** 260 261 Returns: 262 the number of items in the queue 263 264 ***************************************************************************/ 265 266 public size_t length ( ) 267 { 268 return this.queue.length(); 269 } 270 271 272 /*************************************************************************** 273 274 Tells whether the queue is empty. 275 276 Returns: 277 true if the queue is empty 278 279 ***************************************************************************/ 280 281 public bool is_empty ( ) 282 { 283 return this.queue.is_empty(); 284 } 285 286 287 /*************************************************************************** 288 289 register an handler as available 290 291 Params: 292 notifier = handler that is now available 293 294 Returns: 295 false if the handler was called right away without 296 even registering 297 true if the handler was just added to the queue 298 299 ***************************************************************************/ 300 301 public bool ready ( scope NotificationDg notifier ) 302 { 303 debug foreach ( waiting_notifier; this.notifiers[] ) 304 { 305 verify (waiting_notifier !is notifier, 306 "RequestQueue.ready: notifier already registered"); 307 } 308 309 if (!this.is_empty() && this.enabled) 310 { 311 notifier(); 312 return false; 313 } 314 else 315 { 316 this.notifiers ~= notifier; 317 return true; 318 } 319 } 320 321 /*************************************************************************** 322 323 Check whether the provided notifier is already registered. 324 This allows the code to avoid calling ready() with the same notifier, 325 which may throw or add duplicate notifiers. 326 327 Note: This is an O(n) search, however it should not have a 328 performance impact in most cases since the number of registered 329 notifiers is typically very low. 330 331 Params: 332 notifier = the callback to check for 333 334 Returns: 335 true if the notifier is registered 336 337 ***************************************************************************/ 338 339 final public bool isRegistered ( scope NotificationDg notifier ) 340 { 341 foreach (wait_notifier; this.notifiers[]) 342 { 343 if (notifier is wait_notifier) 344 return true; 345 } 346 347 return false; 348 } 349 350 351 /*************************************************************************** 352 353 Returns: 354 how many notification delegates are waiting for data 355 356 ***************************************************************************/ 357 358 final public size_t waiting ( ) 359 { 360 return this.notifiers.length; 361 } 362 363 364 /*************************************************************************** 365 366 Push an item into the queue and notify the next waiting notification 367 delegate about it. 368 369 Params: 370 data = array of data that the item consists of 371 372 Returns: 373 true if push was successful 374 false if not 375 376 **************************************************************************/ 377 378 public bool push ( in void[] data ) 379 { 380 if ( !this.queue.push(data) ) return false; 381 382 this.notify(); 383 384 return true; 385 } 386 387 388 /*************************************************************************** 389 390 Push an item into the queue and notify the next waiting handler about 391 it. 392 393 Params: 394 size = size of the item to push 395 filler = delegate that will be called with that item to fill in the 396 actual data 397 398 Returns: 399 true if push was successful 400 false if not 401 402 ***************************************************************************/ 403 404 public bool push ( size_t size, scope void delegate ( void[] ) filler ) 405 { 406 auto target = this.queue.push(size); 407 408 if (target is null) return false; 409 410 filler(target); 411 412 this.notify(); 413 414 return true; 415 } 416 417 418 /*************************************************************************** 419 420 suspend consuming of the queue 421 422 ***************************************************************************/ 423 424 public void suspend ( ) 425 { 426 if (this.enabled == false) 427 { 428 return; 429 } 430 431 this.enabled = false; 432 } 433 434 435 /*************************************************************************** 436 437 Returns true if the queue is suspended, else false 438 439 ***************************************************************************/ 440 441 public bool suspended ( ) 442 { 443 return this.enabled == false; 444 } 445 446 447 /*************************************************************************** 448 449 resume consuming of the queue 450 451 ***************************************************************************/ 452 453 public void resume ( ) 454 { 455 if (this.enabled == true) 456 { 457 return; 458 } 459 460 this.enabled = true; 461 462 foreach (notifier; this.notifiers[]) 463 { 464 this.notify(); 465 } 466 } 467 468 469 /*************************************************************************** 470 471 pops an element if the queue is enabled 472 473 ***************************************************************************/ 474 475 public void[] pop ( ) 476 { 477 if ( !this.enabled ) 478 { 479 return null; 480 } 481 482 return this.queue.pop(); 483 } 484 485 486 /*************************************************************************** 487 488 Calls the next waiting notification delegate, if queue is enabled. 489 490 ***************************************************************************/ 491 492 private void notify ( ) 493 { 494 if ( this.notifiers.length > 0 && this.enabled ) 495 { 496 auto dg = notifiers.cut(); 497 498 dg(); 499 } 500 } 501 } 502 503 504 /******************************************************************************* 505 506 Templated Notifying Queue implementation 507 508 A concrete client should have an instance of this class and use it 509 to manage the connections and requests 510 511 Note: the stored type T is automatically de/serialized using the 512 StructSerializer. This performs a deep serialization of sub-structs and 513 array members. Union members are shallowly serialized. Delegate and class 514 members cannot be serialized. 515 516 Params: 517 T = type that the queue should store. If it's a struct it is stored 518 using the struct serializer, else it is storing it directly. Note 519 that by default the memory is not gc-aware so you reference 520 something from only the stored object, the gc could collect it. If 521 you desire different behavior pass your own queue instance to the 522 constructor 523 524 *******************************************************************************/ 525 526 class NotifyingQueue ( T ) : NotifyingByteQueue 527 { 528 // add to overload set 529 alias NotifyingByteQueue.push push; 530 531 /*************************************************************************** 532 533 Constructor 534 535 Params: 536 max_bytes = size of the queue in bytes 537 538 ***************************************************************************/ 539 540 public this ( size_t max_bytes ) 541 { 542 super(max_bytes); 543 } 544 545 546 /*************************************************************************** 547 548 Constructor 549 550 Params: 551 queue = instance of the queue implementation that will be used 552 553 ***************************************************************************/ 554 555 public this ( IByteQueue queue ) 556 { 557 super(queue); 558 } 559 560 561 /*************************************************************************** 562 563 Push a new request on the queue 564 565 Params: 566 request = The request to push 567 568 Returns: 569 true if push was successful 570 false if not 571 572 ***************************************************************************/ 573 574 bool push ( ref T request ) 575 { 576 static if ( is(T == struct) ) 577 auto length = Serializer.countRequiredSize(request); 578 else 579 auto length = request.sizeof; 580 581 void filler ( void[] target ) 582 { 583 static if ( is(T == struct) ) 584 Serializer.serialize(request, target); 585 else 586 target[] = (&request)[0..1]; 587 } 588 589 return super.push(length, &filler); 590 } 591 592 static if ( is(T == struct) ) 593 { 594 /*********************************************************************** 595 596 Pops a Request instance from the queue 597 598 Params: 599 cont_buffer = contiguous buffer to deserialize to 600 601 Returns: 602 pointer to the deserialized struct, completely allocated in the 603 given buffer 604 605 ***********************************************************************/ 606 607 T* pop ( ref Contiguous!(T) cont_buffer ) 608 { 609 if ( !this.enabled ) return null; 610 611 T* instance; 612 613 auto data = super.pop(); 614 615 if (data is null) 616 { 617 return null; 618 } 619 620 const(void[]) void_buffer = data; 621 622 Deserializer.deserialize(void_buffer, cont_buffer); 623 624 return cont_buffer.ptr; 625 } 626 } 627 else 628 { 629 /*********************************************************************** 630 631 Pops a Request instance from the queue 632 633 Params: 634 buffer = deserialisation buffer to use 635 636 Returns: 637 pointer to the deserialized item, completely allocated in the 638 given buffer 639 640 ***********************************************************************/ 641 642 T* pop ( ref void[] buffer ) 643 { 644 if ( !this.enabled ) return null; 645 646 T* instance; 647 648 auto data = super.pop(); 649 650 if (data is null) 651 { 652 return null; 653 } 654 655 buffer.copy(data); 656 657 return cast(T*)buffer.ptr; 658 } 659 } 660 } 661 662 unittest 663 { 664 void dg ( ) { } 665 666 auto queue = new NotifyingByteQueue(1024); 667 test(!queue.isRegistered(&dg)); 668 669 queue.ready(&dg); 670 test(queue.isRegistered(&dg)); 671 } 672 673 /// NotifyingQueue with a non-struct type 674 unittest 675 { 676 auto queue = new NotifyingQueue!(char[])(1024); 677 678 char[][] arr = ["foo".dup, "bar".dup]; 679 680 queue.push(arr[0]); 681 queue.push(arr[1]); 682 683 void[] buffer_1; 684 685 auto str_0 = queue.pop(buffer_1); 686 687 test!("==")(*str_0, "foo"); 688 689 void[] buffer_2; 690 691 auto str_1 = queue.pop(buffer_2); 692 693 test!("==")(*str_0, "foo"); // ensure there was no overwrite 694 test!("==")(*str_1, "bar"); 695 } 696 697 /// NotifyingQueue with a struct 698 unittest 699 { 700 struct S { char[] value; } 701 702 S[2] arr = [S("foo".dup), S("bar".dup)]; 703 704 auto queue = new NotifyingQueue!(S)(1024); 705 706 queue.push(arr[0]); 707 queue.push(arr[1]); 708 709 Contiguous!(S) ctg_1; 710 711 auto s0 = queue.pop(ctg_1); 712 713 test!("==")(s0.value, "foo"); 714 715 Contiguous!(S) ctg_2; 716 717 auto s1 = queue.pop(ctg_2); 718 719 test!("==")(s0.value, "foo"); // ensure there was no overwrite 720 test!("==")(s1.value, "bar"); 721 } 722 723 // Make sure NotifyingQueue template is instantinated & compiled 724 unittest 725 { 726 struct Dummy 727 { 728 int a; 729 int b; 730 char[] c; 731 } 732 733 void dg ( ) { } 734 735 auto queue = new NotifyingQueue!(Dummy)(1024); 736 test(!queue.isRegistered(&dg)); 737 738 queue.ready(&dg); 739 test(queue.isRegistered(&dg)); 740 } 741 742 743 unittest 744 { 745 auto q = new NotifyingQueue!(char)(256); 746 }