1 /******************************************************************************* 2 3 Fixed size memory-based ring queue for elements of flexible size. 4 5 Copyright: 6 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module ocean.util.container.queue.FlexibleRingQueue; 17 18 19 20 21 import ocean.transition; 22 23 import ocean.util.container.queue.model.IRingQueue; 24 25 import ocean.util.container.queue.model.IByteQueue; 26 27 import ocean.util.container.mem.MemManager; 28 29 import ocean.io.model.IConduit: InputStream, OutputStream; 30 31 import ocean.io.serialize.SimpleStreamSerializer; 32 33 import ocean.text.util.ClassName; 34 35 debug import ocean.io.Stdout; 36 37 38 39 /******************************************************************************* 40 41 Simple ubyte-based ring queue. 42 43 TODO: usage example 44 45 *******************************************************************************/ 46 47 class FlexibleByteRingQueue : IRingQueue!(IByteQueue) 48 { 49 import ocean.core.Verify; 50 import ocean.core.Enforce: enforce; 51 52 import Integer = ocean.text.convert.Integer_tango: toString; 53 private alias Integer.toString itoa; 54 55 /*************************************************************************** 56 57 Location of the gap at the rear end of the data array where the unused 58 space starts. 59 60 ***************************************************************************/ 61 62 private size_t gap; 63 64 65 /*************************************************************************** 66 67 Metadata header for saving/loading the queue state 68 69 ***************************************************************************/ 70 71 public struct ExportMetadata 72 { 73 uint items; 74 } 75 76 77 /*************************************************************************** 78 79 Header for queue items 80 81 ***************************************************************************/ 82 83 private struct Header 84 { 85 size_t length; 86 } 87 88 89 /*************************************************************************** 90 91 Invariant to assert queue position consistency: When the queue is empty, 92 read_from and write_to must both be 0. 93 94 ***************************************************************************/ 95 96 invariant ( ) 97 { 98 debug scope ( failure ) Stderr.formatln 99 ( 100 "{} invariant failed with items = {}, read_from = {}, " ~ 101 "write_to = {}, gap = {}, data.length = {}", 102 classname(this), this.items, this.read_from, this.write_to, 103 this.gap, this.data.length 104 ); 105 106 if (this.items) 107 { 108 assert(this.gap <= this.data.length, "gap out of range"); 109 assert(this.read_from <= this.data.length, "read_from out of range"); 110 assert(this.write_to <= this.data.length, "write_to out of range"); 111 assert(this.write_to, "write_to 0 with non-empty queue"); 112 assert(this.read_from < this.gap, "read_from within gap"); 113 assert((this.gap == this.write_to) || 114 !(this.read_from < this.write_to), 115 "read_from < write_to but gap not write position"); 116 } 117 else 118 { 119 assert(!this.gap, "gap expected to be 0 for empty queue"); 120 assert(!this.read_from, "read_from expected to be 0 for empty queue"); 121 assert(!this.write_to, "write_to expected to be 0 for empty queue"); 122 } 123 } 124 125 126 /*************************************************************************** 127 128 Constructor. The queue's memory buffer is allocated by the GC. 129 130 Params: 131 dimension = size of queue in bytes 132 133 ***************************************************************************/ 134 135 public this ( size_t dimension ) 136 { 137 verify(dimension > 0, typeof(this).stringof ~ ": cannot construct a 0-length queue"); 138 139 super(dimension); 140 } 141 142 143 /*************************************************************************** 144 145 Constructor. Allocates the queue's memory buffer with the provided 146 memory manager. 147 148 Params: 149 mem_manager = memory manager to use to allocate queue's buffer 150 dimension = size of queue in bytes 151 152 ***************************************************************************/ 153 154 public this ( IMemManager mem_manager, size_t dimension ) 155 { 156 super(mem_manager, dimension); 157 } 158 159 160 /*************************************************************************** 161 162 Pushes an item into the queue. 163 164 item.length = 0 is allowed. 165 166 Params: 167 item = data item to push 168 169 Returns: 170 true if the item was pushed successfully, false if it didn't fit 171 172 ***************************************************************************/ 173 174 public bool push ( in void[] item ) 175 { 176 auto data = this.push(item.length); 177 178 if ( data is null ) 179 { 180 return false; 181 } 182 183 data[] = item[]; 184 185 return true; 186 } 187 188 /*************************************************************************** 189 190 Reserves space for an item of <size> bytes on the queue but doesn't 191 fill the content. The caller is expected to fill in the content using 192 the returned slice. 193 194 size = 0 is allowed. 195 196 Params: 197 size = size of the space of the item that should be reserved 198 199 Returns: 200 slice to the reserved space if it was successfully reserved, else 201 null. Returns non-null empty string if size = 0 and the item was 202 successfully pushed. 203 204 Out: 205 The length of the returned array slice is size unless the slice is 206 null. 207 208 ***************************************************************************/ 209 210 public void[] push ( size_t size ) 211 out (slice) 212 { 213 assert(slice is null || slice.length == size, 214 classname(this) ~ "push: length of returned buffer not as requested"); 215 } 216 body 217 { 218 return this.willFit(size) ? this.push_(size) : null; 219 } 220 221 222 /*************************************************************************** 223 224 Pops an item from the queue. 225 226 Returns: 227 item popped from queue, may be null if queue is empty 228 229 ***************************************************************************/ 230 231 public void[] pop ( ) 232 { 233 return this.items ? this.pop_() : null; 234 } 235 236 237 /*************************************************************************** 238 239 Peeks at the item that would be popped next. 240 241 Returns: 242 item that would be popped from queue, 243 may be null if queue is empty 244 245 ***************************************************************************/ 246 247 public void[] peek ( ) 248 { 249 if (this.items) 250 { 251 auto h = this.read_from; 252 auto d = h + Header.sizeof; 253 auto header = cast(Header*) this.data[h .. d].ptr; 254 return this.data[d .. d + header.length]; 255 } 256 else 257 { 258 return null; 259 } 260 } 261 262 263 /*************************************************************************** 264 265 Returns: 266 number of bytes stored in queue 267 268 ***************************************************************************/ 269 270 public override ulong used_space ( ) 271 { 272 if (this.items == 0) 273 { 274 return 0; 275 } 276 277 if (this.write_to > this.read_from) 278 { 279 return this.write_to - this.read_from; 280 } 281 282 return this.gap - this.read_from + this.write_to; 283 } 284 285 286 /*************************************************************************** 287 288 Removes all items from the queue. 289 290 ***************************************************************************/ 291 292 public override void clear ( ) 293 { 294 super.clear(); 295 this.items = 0; 296 this.gap = 0; 297 } 298 299 300 /*************************************************************************** 301 302 Tells how much space an item would take up when written into the queue. 303 (Including the size of the required item header.) 304 305 Params: 306 item = item to calculate push size of 307 308 Returns: 309 number of bytes the item would take up if pushed to the queue 310 311 ***************************************************************************/ 312 313 static public size_t pushSize ( in void[] item ) 314 { 315 return pushSize(item.length); 316 } 317 318 319 /*************************************************************************** 320 321 Tells how much space an item of the specified size would take up when 322 written into the queue. (Including the size of the required item 323 header.) 324 325 Params: 326 bytes = number of bytes to calculate push size of 327 328 Returns: 329 number of bytes the item would take up if pushed to the queue 330 331 ***************************************************************************/ 332 333 static public size_t pushSize ( size_t bytes ) 334 { 335 return Header.sizeof + bytes; 336 } 337 338 339 /*************************************************************************** 340 341 Finds out whether the provided item will fit in the queue. Also 342 considers the need of wrapping. 343 344 Params: 345 item = item to check 346 347 Returns: 348 true if the item fits, else false 349 350 ***************************************************************************/ 351 352 bool willFit ( in void[] item ) 353 { 354 return this.willFit(item.length); 355 } 356 357 358 /*************************************************************************** 359 360 Finds out whether the provided number of bytes will fit in the queue. 361 Also considers the need of wrapping. 362 363 Note that this method internally adds on the extra bytes required for 364 the item header, so it is *not* necessary for the end-user to first 365 calculate the item's push size. 366 367 Params: 368 bytes = size of item to check 369 370 Returns: 371 true if the bytes fits, else false 372 373 ***************************************************************************/ 374 375 public bool willFit ( size_t bytes ) 376 { 377 size_t push_size = this.pushSize(bytes); 378 379 if (this.read_from < this.write_to) 380 { 381 /* 382 * Free space at either 383 * - data[write_to .. $], the end, or 384 * - data[0 .. read_from], the beginning, wrapping around. 385 */ 386 return ((this.data.length - this.write_to) >= push_size) // Fits at the end. 387 || (this.read_from >= push_size); // Fits at the start wrapping around. 388 389 } 390 else if (this.items) 391 { 392 // Free space at data[write_to .. read_from]. 393 return (this.read_from - this.write_to) >= push_size; 394 } 395 else 396 { 397 // Queue empty: data is the free space. 398 return push_size <= this.data.length; 399 } 400 } 401 402 /*************************************************************************** 403 404 Writes the queue's state and contents to the given output stream in the 405 following format: 406 407 - First ExportMetadata.sizeof bytes: Metadata header 408 - Next size_t.sizeof bytes: Number n of bytes of queue data (possibly 0) 409 - Next n bytes: Queue data 410 411 Params: 412 output = output stream to write to 413 414 Returns: 415 number of bytes written to output. 416 417 ***************************************************************************/ 418 419 public size_t save ( OutputStream output ) 420 { 421 size_t bytes = 0; 422 423 this.save((in void[] meta, in void[] head, in void[] tail = null) 424 { 425 bytes += SimpleStreamSerializer.writeData(output, meta); 426 bytes += SimpleStreamSerializer.write(output, head.length + tail.length); 427 if (head.length) bytes += SimpleStreamSerializer.writeData(output, head); 428 if (tail.length) bytes += SimpleStreamSerializer.writeData(output, tail); 429 }); 430 431 return bytes; 432 } 433 434 /*************************************************************************** 435 436 Calls the provided delegate, store(), to store the queue state and 437 contents. 438 439 The caller may concatenate the contents of the three buffers because 440 - it is safe to assume that load() will use the same meta.length and 441 - load() expects to receive the head ~ tail data. 442 443 Example: 444 445 --- 446 447 auto queue = new FlexibleRingQueue(/+ ... +/); 448 449 // Populate the queue... 450 451 void[] queue_save_data; 452 453 // Save the populated queue in queue_save_data. 454 455 queue.save(void[] meta, void[] head, void[] tail) 456 { 457 queue_save_data = meta ~ head ~ tail; 458 } 459 460 // Restore queue from queue_save_data. 461 462 queue.load(void[] meta, void[] data) 463 { 464 // It is safe to assume meta.length is the same as in the 465 // queue.save() callback delegate. 466 meta[] = queue_save_data[0 .. meta.length]; 467 468 void[] queue_load_data = queue_save_data[meta.length .. $]; 469 data[] = queue_load_data[]; 470 return queue_load_data.length; 471 } 472 473 --- 474 475 This method can also be called to poll the amount of space required for 476 storing the current queue content, which is 477 meta.length + head.length + tail.length. 478 479 The data produced by this method is accepted by the load() method of any 480 queue where queue.length >= head.length + tail.length. 481 482 Params: 483 store = output delegate 484 485 ***************************************************************************/ 486 487 public void save ( scope void delegate ( in void[] meta, in void[] head, in void[] tail = null ) store ) 488 { 489 auto meta = ExportMetadata(this.items); 490 491 if (this.read_from < this.write_to) 492 { 493 store((&meta)[0 .. 1], this.data[this.read_from .. this.write_to]); 494 } 495 else 496 { 497 store((&meta)[0 .. 1], 498 this.data[this.read_from .. this.gap], 499 this.data[0 .. this.write_to]); 500 } 501 } 502 503 /*************************************************************************** 504 505 Restores the queue state and contents, reading from input and expecting 506 data previously written by save() to an output stream. 507 508 Assumes that the input data do not exceed the queue capacity and throws 509 if they do. If this is possible and you want to handle this gracefully 510 (rather than getting an exception thrown), use the other overload of 511 this method. 512 513 Params: 514 input = input stream 515 516 Returns: 517 the number of bytes read from input. 518 519 Throws: 520 ValidationError if the input data are inconsistent or exceed the 521 queue capacity. When throwing, the queue remains empty. 522 523 ***************************************************************************/ 524 525 public size_t load ( InputStream input ) 526 { 527 size_t bytes = 0; 528 529 this.load((void[] meta, void[] data) 530 { 531 bytes += SimpleStreamSerializer.readData(input, meta); 532 533 size_t data_length; 534 bytes += SimpleStreamSerializer.read(input, data_length); 535 enforce!(ValidationError)(data_length <= data.length, 536 "Size of loaded data exceeds queue capacity"); 537 bytes += SimpleStreamSerializer.readData(input, data[0 .. data_length]); 538 return data_length; 539 }); 540 541 return bytes; 542 } 543 544 /*************************************************************************** 545 546 Restores the queue state and contents. 547 548 Clears the queue, then calls the provided delegate, restore(), to 549 restore the queue state and contents and validates it. 550 551 restore() should populate the meta and data buffer it receives with data 552 previously obtained from save(): 553 - meta should be populated with the data from the store() meta 554 parameter, 555 - data[0 .. head.length + tail.length] should be populated with the 556 head ~ tail data as received by the store() delegate during save(), 557 - restore() should return head.length + tail.length. 558 559 See the example in the documentation of save(). 560 561 Params: 562 restore = input delegate 563 564 Throws: 565 ValidationError if the input data are inconsistent. When throwing, 566 the queue remains empty. 567 568 ***************************************************************************/ 569 570 public void load ( scope size_t delegate ( void[] meta, void[] data ) restore ) 571 { 572 this.clear(); 573 574 /* 575 * Pass this.data as the destination buffer to restore() and validate 576 * its content after restore() populated it. Should the validation fail, 577 * items, read_from, write_to and gap will remain 0 so the queue is 578 * empty and the invalid data in this.data are not harmful. 579 */ 580 581 ExportMetadata meta; 582 size_t end = restore((&meta)[0 .. 1], this.data); 583 584 verify(end <= this.data.length, 585 idup(classname(this) ~ ".save(): restore callback expected to " ~ 586 "return at most " ~ itoa(this.data.length) ~ ", not" ~ itoa(end))); 587 588 this.validate(meta, this.data[0 .. end]); 589 590 this.items = meta.items; 591 this.write_to = end; 592 this.gap = end; 593 } 594 595 /*************************************************************************** 596 597 Pushes an item into the queue. 598 599 Params: 600 item = data item to push 601 602 ***************************************************************************/ 603 604 private void[] push_ ( size_t size ) 605 out (slice) 606 { 607 assert(slice !is null, 608 classname(this) ~ "push_: returned a null slice"); 609 assert(slice.length == size, 610 classname(this) ~ "push_: length of returned slice not as requested"); 611 assert(this); // invariant 612 } 613 body 614 { 615 assert(this); // invariant 616 verify(this.willFit(size), classname(this) ~ ".push_: item will not fit"); 617 618 auto push_size = this.pushSize(size); 619 620 /* 621 * read_from and write_to can have three different relationships: 622 * 623 * 1. write_to == read_from: The queue is empty, both are 0, the 624 * record goes to data[write_to .. $]. 625 * 626 * 2. write_to < read_from: The record goes in 627 * data[write_to .. read_from]. 628 * 629 * 3. read_from < write_to: The record goes either in 630 * a) data[write_to .. $] if there is enough space or 631 * b) data[0 .. read_from], wrapping around by setting 632 * write_to = 0. 633 * 634 * The destination slice of data in case 3a is equivalent to case 1 635 * and in case 3b to case 2. 636 */ 637 638 if (this.read_from < this.write_to) 639 { 640 verify(this.gap == this.write_to); 641 642 // Case 3: Check if the record fits in data[write_to .. $] ... 643 if (this.data.length - this.write_to < push_size) 644 { 645 /* 646 * ... no, we have to wrap around. The precondition claims 647 * the record does fit so there must be enough space in 648 * data[0 .. read_from]. 649 */ 650 verify(push_size <= this.read_from); 651 this.write_to = 0; 652 } 653 } 654 655 auto start = this.write_to; 656 this.write_to += push_size; 657 658 if (this.write_to > this.read_from) // Case 1 or 3a. 659 { 660 this.gap = this.write_to; 661 } 662 663 this.items++; 664 665 void[] dst = this.data[start .. this.write_to]; 666 *cast(Header*)dst[0 .. Header.sizeof].ptr = Header(size); 667 return dst[Header.sizeof .. $]; 668 } 669 670 671 /*************************************************************************** 672 673 Pops an item from the queue. 674 675 Returns: 676 item popped from queue 677 678 ***************************************************************************/ 679 680 private void[] pop_ ( ) 681 out (buffer) 682 { 683 assert(buffer, classname(this) ~ ".pop_: returned a null buffer"); 684 assert(this); // invariant 685 } 686 body 687 { 688 assert(this); // invariant 689 verify(this.items > 0, classname(this) ~ ".pop_: no items in the queue"); 690 691 auto position = this.read_from; 692 this.read_from += Header.sizeof; 693 694 // TODO: Error if this.data.length < this.read_from. 695 696 auto header = cast(Header*)this.data[position .. this.read_from].ptr; 697 698 // TODO: Error if this.data.length - this.read_from < header.length 699 700 position = this.read_from; 701 this.read_from += header.length; 702 verify(this.read_from <= this.gap); // The invariant ensures that 703 // this.gap is not 0. 704 705 this.items--; // The precondition prevents decrementing 0. 706 707 scope (exit) 708 { 709 if (this.items) 710 { 711 if (this.read_from == this.gap) 712 { 713 /* 714 * End of data, wrap around: 715 * 1. Set the read position to the start of this.data. 716 */ 717 this.read_from = 0; 718 /* 719 * 2. The write position is now the end of the data. 720 * If the queue is now empty, i.e. this.items == 0, 721 * write_to must be 0. 722 */ 723 724 verify(this.items || !this.write_to); 725 this.gap = this.write_to; 726 } 727 } 728 else // Popped the last record. 729 { 730 verify(this.read_from == this.write_to); 731 this.read_from = 0; 732 this.write_to = 0; 733 this.gap = 0; 734 } 735 736 } 737 738 return this.data[position .. this.read_from]; 739 } 740 741 /*************************************************************************** 742 743 Validates queue state and content data. 744 745 Params: 746 meta = queue state data as imported by load() 747 data = queue content data 748 749 Throws: 750 ValidationError if meta and/or data are inconsistent. 751 752 ***************************************************************************/ 753 754 private static void validate ( ExportMetadata meta, in void[] data ) 755 { 756 if (meta.items) 757 { 758 enforce!(ValidationError)( 759 data.length, 760 cast(istring) ("Expected data for a non-empty queue (" 761 ~ itoa(meta.items) ~ " records)") 762 ); 763 764 enforce!(ValidationError)( 765 data.length >= cast(size_t)meta.items * Header.sizeof, 766 cast(istring) ("Queue data shorter than required minimum for " ~ 767 itoa(meta.items) ~ " records (got " ~ itoa(data.length) ~ 768 " bytes)") 769 ); 770 771 size_t pos = 0; 772 773 for (uint i = 0; i < meta.items; i++) 774 { 775 verify(pos <= data.length); 776 777 try 778 { 779 enforce!(ValidationError)(pos != data.length, 780 "Unexpected end of input data"); 781 782 auto start = pos; 783 pos += Header.sizeof; 784 785 enforce!(ValidationError)( 786 pos <= data.length, 787 cast(istring) ("End of queue data in the middle of" ~ 788 " the record header which starts at byte " ~ 789 itoa(start)) 790 ); 791 792 auto header = cast(Const!(Header)*)data[start .. pos].ptr; 793 794 enforce!(ValidationError)( 795 (data.length - pos) >= header.length, 796 cast(istring) ("End of queue data in the middle of the" ~ 797 " queue record, record length = " ~ 798 itoa(header.length)) 799 ); 800 801 pos += header.length; 802 } 803 catch (ValidationError e) 804 { 805 auto msg = "Error reading record " ~ itoa(i + i) ~ "/" ~ 806 itoa(meta.items) ~ ": " ~ e.message(); 807 e.msg = assumeUnique(msg); 808 throw e; 809 } 810 } 811 812 verify(pos <= data.length); 813 814 enforce!(ValidationError)( 815 pos >= data.length, 816 cast(istring) ("Queue data too long (" ~ itoa(meta.items) ~ 817 " records, " ~ itoa(pos) ~ "/" ~ itoa(data.length) ~ 818 " bytes used)") 819 ); 820 } 821 else 822 { 823 enforce!(ValidationError)( 824 !data.length, 825 cast(istring) ("Expected no data for an empty queue, not " ~ 826 itoa(data.length) ~ " bytes") 827 ); 828 } 829 } 830 831 /**************************************************************************/ 832 833 static class ValidationError: Exception 834 { 835 import ocean.core.Exception : DefaultExceptionCtor; 836 mixin DefaultExceptionCtor!(); 837 } 838 } 839 /******************************************************************************* 840 841 UnitTest 842 843 *******************************************************************************/ 844 845 version ( UnitTest ) 846 { 847 import ocean.io.model.IConduit: IConduit; 848 } 849 850 unittest 851 { 852 static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10; 853 854 scope queue = new FlexibleByteRingQueue(queue_size_1); 855 test(queue.free_space >= queue_size_1); 856 test(queue.is_empty); 857 858 test(queue.free_space >= queue_size_1); 859 test(queue.is_empty); 860 861 test(queue.push("Element 1")); 862 test(queue.pop() == "Element 1"); 863 test(queue.get_items == 0); 864 test(!queue.free_space == 0); 865 test(queue.is_empty); 866 test(queue.used_space() == 0); 867 868 test(queue.push("Element 1")); 869 test(queue.push("Element 2")); 870 test(queue.push("Element 3")); 871 test(queue.push("Element 4")); 872 test(queue.push("Element 5")); 873 test(queue.push("Element 6")); 874 test(queue.push("Element 7")); 875 test(queue.push("Element 8")); 876 test(queue.push("Element 9")); 877 test(queue.push("Element10")); 878 879 test(queue.length == 10); 880 test(queue.free_space == 0); 881 test(!queue.is_empty); 882 883 test(!queue.push("more")); 884 test(queue.length == 10); 885 886 scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5); 887 middle.push("1"); 888 middle.push("2"); 889 middle.push("3"); 890 middle.push("4"); 891 test(middle.pop == "1"); 892 test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof); 893 test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4); 894 test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 895 896 test(middle.push("5")); 897 test(middle.push("6")); 898 test(middle.free_space() == 0); 899 } 900 901 /******************************************************************************* 902 903 Save/load test. Uses the unittest above, adding save/load sequences in the 904 middle. This test is separate to allow for changing the above push/pop test 905 without breaking the save/load test. 906 907 *******************************************************************************/ 908 909 version ( UnitTest ) 910 { 911 import ocean.io.device.MemoryDevice; 912 } 913 914 unittest 915 { 916 // Buffers and callback functions for the delegate based save() & load(). 917 918 void[] saved_meta, saved_data; 919 920 void store ( in void[] meta, in void[] head, in void[] tail ) 921 { 922 saved_meta = meta.dup; 923 saved_data = head.dup ~ tail; 924 } 925 926 size_t restore ( void[] meta, void[] data ) 927 { 928 meta[] = saved_meta; 929 data[0 .. saved_data.length] = saved_data; 930 return saved_data.length; 931 } 932 933 // Memory I/O stream device for the stream based save() & load(). 934 935 scope backup = new MemoryDevice; 936 937 static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10; 938 939 scope queue = new FlexibleByteRingQueue(queue_size_1); 940 test(queue.free_space >= queue_size_1); 941 test(queue.is_empty); 942 943 queue.save(&store); 944 queue.load(&restore); 945 946 test(queue.free_space >= queue_size_1); 947 test(queue.is_empty); 948 949 test(queue.push("Element 1")); 950 test(queue.pop() == "Element 1"); 951 test(queue.get_items == 0); 952 test(!queue.free_space == 0); 953 test(queue.is_empty); 954 test(queue.used_space() == 0); 955 956 test(queue.push("Element 1")); 957 test(queue.push("Element 2")); 958 test(queue.push("Element 3")); 959 test(queue.push("Element 4")); 960 test(queue.push("Element 5")); 961 test(queue.push("Element 6")); 962 test(queue.push("Element 7")); 963 test(queue.push("Element 8")); 964 test(queue.push("Element 9")); 965 test(queue.push("Element10")); 966 967 // Save and restore the queue status in the middle of a test. 968 969 queue.save(&store); 970 queue.clear(); 971 queue.load(&restore); 972 973 test(queue.length == 10); 974 test(queue.free_space == 0); 975 test(!queue.is_empty); 976 977 test(!queue.push("more")); 978 test(queue.length == 10); 979 980 scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5); 981 middle.push("1"); 982 middle.push("2"); 983 middle.push("3"); 984 middle.push("4"); 985 test(middle.pop == "1"); 986 test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof); 987 test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4); 988 test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 989 990 // Save and restore the queue status in the middle of a test. 991 992 middle.save(backup); 993 middle.clear(); 994 backup.seek(0); 995 middle.load(backup); 996 test(backup.read(null) == backup.Eof); 997 backup.close(); 998 999 test(middle.push("5")); 1000 test(middle.push("6")); 1001 test(middle.free_space() == 0); 1002 } 1003 1004 /******************************************************************************* 1005 1006 Test for the corner case of saving the queue state after wrapping around. 1007 1008 *******************************************************************************/ 1009 1010 unittest 1011 { 1012 enum Save {Dont = 0, Dg, Stream} 1013 1014 // Buffers and callback functions for the delegate based save() & load(). 1015 1016 void[] saved_meta, saved_data; 1017 1018 void store ( in void[] meta, in void[] head, in void[] tail ) 1019 { 1020 saved_meta = meta.dup; 1021 saved_data = head.dup ~ tail; 1022 } 1023 1024 size_t restore ( void[] meta, void[] data ) 1025 { 1026 meta[] = saved_meta; 1027 data[0 .. saved_data.length] = saved_data; 1028 return saved_data.length; 1029 } 1030 1031 // Memory I/O stream device for the stream based save() & load(). 1032 1033 scope backup = new MemoryDevice; 1034 1035 void save_wraparound ( Save save ) 1036 { 1037 static immutable Q_SIZE = 20; 1038 FlexibleByteRingQueue q = new FlexibleByteRingQueue(Q_SIZE); 1039 1040 void push(uint n) 1041 in 1042 { 1043 test(n <= ubyte.max); 1044 } 1045 body 1046 { 1047 for (ubyte i = 0; i < n; i++) 1048 { 1049 if (auto push_slice = q.push(1)) 1050 { 1051 push_slice[] = (&i)[0 .. 1]; 1052 } 1053 else 1054 { 1055 break; 1056 } 1057 1058 // Save and restore the queue status after wrapping around. 1059 1060 if (q.get_write_to <= q.get_read_from) switch (save) 1061 { 1062 case save.Dont: break; 1063 1064 case save.Dg: 1065 q.save(&store); 1066 q.clear(); 1067 q.load(&restore); 1068 break; 1069 1070 case save.Stream: 1071 q.save(backup); 1072 q.clear(); 1073 backup.seek(0); 1074 q.load(backup); 1075 test(backup.read(null) == backup.Eof); 1076 backup.close(); 1077 break; 1078 1079 default: assert(false); 1080 } 1081 } 1082 } 1083 1084 void pop(uint n) 1085 { 1086 for (uint i = 0; i < n; i++) 1087 { 1088 if (auto popped = q.pop()) 1089 { 1090 test (popped.length == 1); 1091 static ubyte[] unexpected = [cast(ubyte)Q_SIZE+1]; 1092 test (popped != unexpected); 1093 popped[] = unexpected; 1094 } 1095 else 1096 { 1097 break; 1098 } 1099 } 1100 } 1101 1102 push(2); 1103 pop(1); 1104 push(2); 1105 pop(1); 1106 push(3); 1107 pop(4); 1108 pop(1); 1109 } 1110 save_wraparound(Save.Dont); 1111 save_wraparound(Save.Dg); 1112 save_wraparound(Save.Stream); 1113 } 1114 1115 /******************************************************************************* 1116 1117 Performance test 1118 1119 *******************************************************************************/ 1120 1121 version ( UnitTest ) 1122 { 1123 // Uncomment the next line to see UnitTest output 1124 // version = UnitTestVerbose; 1125 1126 import ocean.core.Test; 1127 1128 import ocean.math.random.Random; 1129 import ocean.time.StopWatch; 1130 import core.memory; 1131 import ocean.io.FilePath; 1132 } 1133 1134 unittest 1135 { 1136 scope random = new Random(); 1137 1138 /*********************************************************************** 1139 1140 Test wrapping 1141 1142 ***********************************************************************/ 1143 1144 { 1145 scope queue = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*3); 1146 1147 test(queue.get_read_from == 0); 1148 test(queue.get_write_to == 0); 1149 // [___] r=0 w=0 1150 test(queue.push("1")); 1151 1152 test(queue.get_read_from == 0); 1153 test(queue.get_write_to == 1+FlexibleByteRingQueue.Header.sizeof); 1154 test(queue.get_items == 1); 1155 test((cast(FlexibleByteRingQueue.Header*) queue.get_data.ptr).length == 1); 1156 1157 { 1158 Const!(void)[] expected = "1"; 1159 test(queue.get_data[FlexibleByteRingQueue.Header.sizeof .. 1160 1+FlexibleByteRingQueue.Header.sizeof] == 1161 expected); 1162 } 1163 1164 // [#__] r=0 w=5 1165 test(queue.push("2")); 1166 1167 // [##_] r=0 w=10 1168 test(queue.push("3")); 1169 1170 // [###] r=0 w=15 1171 test(!queue.push("4")); 1172 test(queue.free_space == 0); 1173 test(queue.pop() == "1"); 1174 1175 // [_##] r=5 w=15 1176 test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof); 1177 test(queue.pop() == "2"); 1178 1179 // [__#] r=10 w=15 1180 test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 1181 test(queue.get_write_to == queue.get_data.length); 1182 test(queue.push("1")); 1183 1184 // [#_#] r=10 w=5 1185 test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof); 1186 test(queue.get_write_to == queue.pushSize("2".length)); 1187 test(queue.push("2")); 1188 // Stdout.formatln("gap is {}, free is {}, write is {}", queue.gap, queue.free_space(),queue.write_to); 1189 1190 1191 // [###] r=10 w=10 1192 test(queue.free_space == 0); 1193 test(queue.pop() == "3"); 1194 1195 // [##_] r=15/0 w=10 1196 test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*1); 1197 test(queue.pop() == "1"); 1198 1199 // [_#_] r=5 w=10 1200 test(queue.pop() == "2"); 1201 1202 // [__] r=0 w=0 1203 test(queue.is_empty); 1204 test(queue.push("1")); 1205 1206 // [#__] r=0 w=5 1207 test(queue.push("2#")); 1208 1209 // [#$_] r=0 w=11 ($ = 2 bytes) 1210 test(queue.pop() == "1"); 1211 1212 // [_$_] r=5 w=11 1213 test(queue.push("1")); 1214 1215 // [#$_] r=5 w=5 1216 test(!queue.push("2")); 1217 test(queue.pop() == "2#"); 1218 1219 // [#__] r=11 w=5 1220 test(queue.push("2")); // this needs to be wrapped now 1221 1222 // [##_] r=11 w=10 1223 } 1224 } 1225 1226 1227 // Test for a specific bug that caused garbled loglines due to an old wrong 1228 // willFit() function 1229 unittest 1230 { 1231 auto q = new FlexibleByteRingQueue(45); 1232 1233 // Setup conditions 1234 test(q.push("123456")); // w = 14 1235 test(q.push("12345678")); // w = 30 1236 test(q.push("123456")); // w = 44 1237 test(q.pop() == "123456"); 1238 test(q.pop() == "12345678"); // r == 30 1239 test(q.push("12345678123456781234")); // r = 30, gap = 44 1240 1241 auto test_push = "123456789.....16"; 1242 1243 // Make sure the bugs conditions are present 1244 test!(">")(q.get_read_from, q.get_write_to); 1245 test!("<")(q.get_read_from, q.gap); 1246 test!(">")(q.pushSize(test_push.length) + q.get_write_to, q.get_data.length); 1247 1248 // Do the actual test 1249 test(!q.push(test_push)); 1250 1251 test!("==")(q.pop(), "123456"); 1252 test!("==")(q.pop(), "12345678123456781234"); 1253 }