1 /******************************************************************************* 2 3 Fixed size memory-based ring queue for elements of flexible size. 4 5 Copyright: 6 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module ocean.util.container.queue.FlexibleRingQueue; 17 18 19 20 import ocean.core.TypeConvert: assumeUnique; 21 22 import ocean.meta.types.Qualifiers; 23 24 import ocean.util.container.queue.model.IRingQueue; 25 26 import ocean.util.container.queue.model.IByteQueue; 27 28 import ocean.util.container.mem.MemManager; 29 30 import ocean.io.model.IConduit: InputStream, OutputStream; 31 32 import ocean.io.serialize.SimpleStreamSerializer; 33 34 import ocean.text.util.ClassName; 35 36 debug import ocean.io.Stdout; 37 38 39 40 /******************************************************************************* 41 42 Simple ubyte-based ring queue. 43 44 TODO: usage example 45 46 *******************************************************************************/ 47 48 class FlexibleByteRingQueue : IRingQueue!(IByteQueue) 49 { 50 import ocean.core.Verify; 51 import ocean.core.Enforce: enforce; 52 53 import Integer = ocean.text.convert.Integer_tango: toString; 54 private alias Integer.toString itoa; 55 56 /*************************************************************************** 57 58 Location of the gap at the rear end of the data array where the unused 59 space starts. 60 61 ***************************************************************************/ 62 63 private size_t gap; 64 65 66 /*************************************************************************** 67 68 Metadata header for saving/loading the queue state 69 70 ***************************************************************************/ 71 72 public struct ExportMetadata 73 { 74 uint items; 75 } 76 77 78 /*************************************************************************** 79 80 Header for queue items 81 82 ***************************************************************************/ 83 84 private struct Header 85 { 86 size_t length; 87 } 88 89 90 /*************************************************************************** 91 92 Invariant to assert queue position consistency: When the queue is empty, 93 read_from and write_to must both be 0. 94 95 ***************************************************************************/ 96 97 invariant ( ) 98 { 99 debug scope ( failure ) Stderr.formatln 100 ( 101 "{} invariant failed with items = {}, read_from = {}, " ~ 102 "write_to = {}, gap = {}, data.length = {}", 103 classname(this), this.items, this.read_from, this.write_to, 104 this.gap, this.data.length 105 ); 106 107 if (this.items) 108 { 109 assert(this.gap <= this.data.length, "gap out of range"); 110 assert(this.read_from <= this.data.length, "read_from out of range"); 111 assert(this.write_to <= this.data.length, "write_to out of range"); 112 assert(this.write_to, "write_to 0 with non-empty queue"); 113 assert(this.read_from < this.gap, "read_from within gap"); 114 assert((this.gap == this.write_to) || 115 !(this.read_from < this.write_to), 116 "read_from < write_to but gap not write position"); 117 } 118 else 119 { 120 assert(!this.gap, "gap expected to be 0 for empty queue"); 121 assert(!this.read_from, "read_from expected to be 0 for empty queue"); 122 assert(!this.write_to, "write_to expected to be 0 for empty queue"); 123 } 124 } 125 126 127 /*************************************************************************** 128 129 Constructor. The queue's memory buffer is allocated by the GC. 130 131 Params: 132 dimension = size of queue in bytes 133 134 ***************************************************************************/ 135 136 public this ( size_t dimension ) 137 { 138 verify(dimension > 0, typeof(this).stringof ~ ": cannot construct a 0-length queue"); 139 140 super(dimension); 141 } 142 143 144 /*************************************************************************** 145 146 Constructor. Allocates the queue's memory buffer with the provided 147 memory manager. 148 149 Params: 150 mem_manager = memory manager to use to allocate queue's buffer 151 dimension = size of queue in bytes 152 153 ***************************************************************************/ 154 155 public this ( IMemManager mem_manager, size_t dimension ) 156 { 157 super(mem_manager, dimension); 158 } 159 160 161 /*************************************************************************** 162 163 Pushes an item into the queue. 164 165 item.length = 0 is allowed. 166 167 Params: 168 item = data item to push 169 170 Returns: 171 true if the item was pushed successfully, false if it didn't fit 172 173 ***************************************************************************/ 174 175 public bool push ( in void[] item ) 176 { 177 auto data = this.push(item.length); 178 179 if ( data is null ) 180 { 181 return false; 182 } 183 184 data[] = item[]; 185 186 return true; 187 } 188 189 /*************************************************************************** 190 191 Reserves space for an item of <size> bytes on the queue but doesn't 192 fill the content. The caller is expected to fill in the content using 193 the returned slice. 194 195 size = 0 is allowed. 196 197 Params: 198 size = size of the space of the item that should be reserved 199 200 Returns: 201 slice to the reserved space if it was successfully reserved, else 202 null. Returns non-null empty string if size = 0 and the item was 203 successfully pushed. 204 205 Out: 206 The length of the returned array slice is size unless the slice is 207 null. 208 209 ***************************************************************************/ 210 211 public void[] push ( size_t size ) 212 out (slice) 213 { 214 assert(slice is null || slice.length == size, 215 classname(this) ~ "push: length of returned buffer not as requested"); 216 } 217 do 218 { 219 return this.willFit(size) ? this.push_(size) : null; 220 } 221 222 223 /*************************************************************************** 224 225 Pops an item from the queue. 226 227 Returns: 228 item popped from queue, may be null if queue is empty 229 230 ***************************************************************************/ 231 232 public void[] pop ( ) 233 { 234 return this.items ? this.pop_() : null; 235 } 236 237 238 /*************************************************************************** 239 240 Peeks at the item that would be popped next. 241 242 Returns: 243 item that would be popped from queue, 244 may be null if queue is empty 245 246 ***************************************************************************/ 247 248 public void[] peek ( ) 249 { 250 if (this.items) 251 { 252 auto h = this.read_from; 253 auto d = h + Header.sizeof; 254 auto header = cast(Header*) this.data[h .. d].ptr; 255 return this.data[d .. d + header.length]; 256 } 257 else 258 { 259 return null; 260 } 261 } 262 263 264 /*************************************************************************** 265 266 Returns: 267 number of bytes stored in queue 268 269 ***************************************************************************/ 270 271 public override ulong used_space ( ) 272 { 273 if (this.items == 0) 274 { 275 return 0; 276 } 277 278 if (this.write_to > this.read_from) 279 { 280 return this.write_to - this.read_from; 281 } 282 283 return this.gap - this.read_from + this.write_to; 284 } 285 286 287 /*************************************************************************** 288 289 Removes all items from the queue. 290 291 ***************************************************************************/ 292 293 public override void clear ( ) 294 { 295 super.clear(); 296 this.items = 0; 297 this.gap = 0; 298 } 299 300 301 /*************************************************************************** 302 303 Tells how much space an item would take up when written into the queue. 304 (Including the size of the required item header.) 305 306 Params: 307 item = item to calculate push size of 308 309 Returns: 310 number of bytes the item would take up if pushed to the queue 311 312 ***************************************************************************/ 313 314 static public size_t pushSize ( in void[] item ) 315 { 316 return pushSize(item.length); 317 } 318 319 320 /*************************************************************************** 321 322 Tells how much space an item of the specified size would take up when 323 written into the queue. (Including the size of the required item 324 header.) 325 326 Params: 327 bytes = number of bytes to calculate push size of 328 329 Returns: 330 number of bytes the item would take up if pushed to the queue 331 332 ***************************************************************************/ 333 334 static public size_t pushSize ( size_t bytes ) 335 { 336 return Header.sizeof + bytes; 337 } 338 339 340 /*************************************************************************** 341 342 Finds out whether the provided item will fit in the queue. Also 343 considers the need of wrapping. 344 345 Params: 346 item = item to check 347 348 Returns: 349 true if the item fits, else false 350 351 ***************************************************************************/ 352 353 bool willFit ( in void[] item ) 354 { 355 return this.willFit(item.length); 356 } 357 358 359 /*************************************************************************** 360 361 Finds out whether the provided number of bytes will fit in the queue. 362 Also considers the need of wrapping. 363 364 Note that this method internally adds on the extra bytes required for 365 the item header, so it is *not* necessary for the end-user to first 366 calculate the item's push size. 367 368 Params: 369 bytes = size of item to check 370 371 Returns: 372 true if the bytes fits, else false 373 374 ***************************************************************************/ 375 376 public bool willFit ( size_t bytes ) 377 { 378 size_t push_size = this.pushSize(bytes); 379 380 if (this.read_from < this.write_to) 381 { 382 /* 383 * Free space at either 384 * - data[write_to .. $], the end, or 385 * - data[0 .. read_from], the beginning, wrapping around. 386 */ 387 return ((this.data.length - this.write_to) >= push_size) // Fits at the end. 388 || (this.read_from >= push_size); // Fits at the start wrapping around. 389 390 } 391 else if (this.items) 392 { 393 // Free space at data[write_to .. read_from]. 394 return (this.read_from - this.write_to) >= push_size; 395 } 396 else 397 { 398 // Queue empty: data is the free space. 399 return push_size <= this.data.length; 400 } 401 } 402 403 /*************************************************************************** 404 405 Writes the queue's state and contents to the given output stream in the 406 following format: 407 408 - First ExportMetadata.sizeof bytes: Metadata header 409 - Next size_t.sizeof bytes: Number n of bytes of queue data (possibly 0) 410 - Next n bytes: Queue data 411 412 Params: 413 output = output stream to write to 414 415 Returns: 416 number of bytes written to output. 417 418 ***************************************************************************/ 419 420 public size_t save ( OutputStream output ) 421 { 422 size_t bytes = 0; 423 424 this.save((in void[] meta, in void[] head, in void[] tail = null) 425 { 426 bytes += SimpleStreamSerializer.writeData(output, meta); 427 bytes += SimpleStreamSerializer.write(output, head.length + tail.length); 428 if (head.length) bytes += SimpleStreamSerializer.writeData(output, head); 429 if (tail.length) bytes += SimpleStreamSerializer.writeData(output, tail); 430 }); 431 432 return bytes; 433 } 434 435 /*************************************************************************** 436 437 Calls the provided delegate, store(), to store the queue state and 438 contents. 439 440 The caller may concatenate the contents of the three buffers because 441 - it is safe to assume that load() will use the same meta.length and 442 - load() expects to receive the head ~ tail data. 443 444 Example: 445 446 --- 447 448 auto queue = new FlexibleRingQueue(/+ ... +/); 449 450 // Populate the queue... 451 452 void[] queue_save_data; 453 454 // Save the populated queue in queue_save_data. 455 456 queue.save(void[] meta, void[] head, void[] tail) 457 { 458 queue_save_data = meta ~ head ~ tail; 459 } 460 461 // Restore queue from queue_save_data. 462 463 queue.load(void[] meta, void[] data) 464 { 465 // It is safe to assume meta.length is the same as in the 466 // queue.save() callback delegate. 467 meta[] = queue_save_data[0 .. meta.length]; 468 469 void[] queue_load_data = queue_save_data[meta.length .. $]; 470 data[] = queue_load_data[]; 471 return queue_load_data.length; 472 } 473 474 --- 475 476 This method can also be called to poll the amount of space required for 477 storing the current queue content, which is 478 meta.length + head.length + tail.length. 479 480 The data produced by this method is accepted by the load() method of any 481 queue where queue.length >= head.length + tail.length. 482 483 Params: 484 store = output delegate 485 486 ***************************************************************************/ 487 488 public void save ( scope void delegate ( in void[] meta, in void[] head, in void[] tail = null ) store ) 489 { 490 auto meta = ExportMetadata(this.items); 491 492 if (this.read_from < this.write_to) 493 { 494 store((&meta)[0 .. 1], this.data[this.read_from .. this.write_to]); 495 } 496 else 497 { 498 store((&meta)[0 .. 1], 499 this.data[this.read_from .. this.gap], 500 this.data[0 .. this.write_to]); 501 } 502 } 503 504 /*************************************************************************** 505 506 Restores the queue state and contents, reading from input and expecting 507 data previously written by save() to an output stream. 508 509 Assumes that the input data do not exceed the queue capacity and throws 510 if they do. If this is possible and you want to handle this gracefully 511 (rather than getting an exception thrown), use the other overload of 512 this method. 513 514 Params: 515 input = input stream 516 517 Returns: 518 the number of bytes read from input. 519 520 Throws: 521 ValidationError if the input data are inconsistent or exceed the 522 queue capacity. When throwing, the queue remains empty. 523 524 ***************************************************************************/ 525 526 public size_t load ( InputStream input ) 527 { 528 size_t bytes = 0; 529 530 this.load((void[] meta, void[] data) 531 { 532 bytes += SimpleStreamSerializer.readData(input, meta); 533 534 size_t data_length; 535 bytes += SimpleStreamSerializer.read(input, data_length); 536 enforce!(ValidationError)(data_length <= data.length, 537 "Size of loaded data exceeds queue capacity"); 538 bytes += SimpleStreamSerializer.readData(input, data[0 .. data_length]); 539 return data_length; 540 }); 541 542 return bytes; 543 } 544 545 /*************************************************************************** 546 547 Restores the queue state and contents. 548 549 Clears the queue, then calls the provided delegate, restore(), to 550 restore the queue state and contents and validates it. 551 552 restore() should populate the meta and data buffer it receives with data 553 previously obtained from save(): 554 - meta should be populated with the data from the store() meta 555 parameter, 556 - data[0 .. head.length + tail.length] should be populated with the 557 head ~ tail data as received by the store() delegate during save(), 558 - restore() should return head.length + tail.length. 559 560 See the example in the documentation of save(). 561 562 Params: 563 restore = input delegate 564 565 Throws: 566 ValidationError if the input data are inconsistent. When throwing, 567 the queue remains empty. 568 569 ***************************************************************************/ 570 571 public void load ( scope size_t delegate ( void[] meta, void[] data ) restore ) 572 { 573 this.clear(); 574 575 /* 576 * Pass this.data as the destination buffer to restore() and validate 577 * its content after restore() populated it. Should the validation fail, 578 * items, read_from, write_to and gap will remain 0 so the queue is 579 * empty and the invalid data in this.data are not harmful. 580 */ 581 582 ExportMetadata meta; 583 size_t end = restore((&meta)[0 .. 1], this.data); 584 585 verify(end <= this.data.length, 586 idup(classname(this) ~ ".save(): restore callback expected to " ~ 587 "return at most " ~ itoa(this.data.length) ~ ", not" ~ itoa(end))); 588 589 this.validate(meta, this.data[0 .. end]); 590 591 this.items = meta.items; 592 this.write_to = end; 593 this.gap = end; 594 } 595 596 /*************************************************************************** 597 598 Pushes an item into the queue. 599 600 Params: 601 item = data item to push 602 603 ***************************************************************************/ 604 605 private void[] push_ ( size_t size ) 606 out (slice) 607 { 608 assert(slice !is null, 609 classname(this) ~ "push_: returned a null slice"); 610 assert(slice.length == size, 611 classname(this) ~ "push_: length of returned slice not as requested"); 612 assert(this); // invariant 613 } 614 do 615 { 616 assert(this); // invariant 617 verify(this.willFit(size), classname(this) ~ ".push_: item will not fit"); 618 619 auto push_size = this.pushSize(size); 620 621 /* 622 * read_from and write_to can have three different relationships: 623 * 624 * 1. write_to == read_from: The queue is empty, both are 0, the 625 * record goes to data[write_to .. $]. 626 * 627 * 2. write_to < read_from: The record goes in 628 * data[write_to .. read_from]. 629 * 630 * 3. read_from < write_to: The record goes either in 631 * a) data[write_to .. $] if there is enough space or 632 * b) data[0 .. read_from], wrapping around by setting 633 * write_to = 0. 634 * 635 * The destination slice of data in case 3a is equivalent to case 1 636 * and in case 3b to case 2. 637 */ 638 639 if (this.read_from < this.write_to) 640 { 641 verify(this.gap == this.write_to); 642 643 // Case 3: Check if the record fits in data[write_to .. $] ... 644 if (this.data.length - this.write_to < push_size) 645 { 646 /* 647 * ... no, we have to wrap around. The precondition claims 648 * the record does fit so there must be enough space in 649 * data[0 .. read_from]. 650 */ 651 verify(push_size <= this.read_from); 652 this.write_to = 0; 653 } 654 } 655 656 auto start = this.write_to; 657 this.write_to += push_size; 658 659 if (this.write_to > this.read_from) // Case 1 or 3a. 660 { 661 this.gap = this.write_to; 662 } 663 664 this.items++; 665 666 void[] dst = this.data[start .. this.write_to]; 667 *cast(Header*)dst[0 .. Header.sizeof].ptr = Header(size); 668 return dst[Header.sizeof .. $]; 669 } 670 671 672 /*************************************************************************** 673 674 Pops an item from the queue. 675 676 Returns: 677 item popped from queue 678 679 ***************************************************************************/ 680 681 private void[] pop_ ( ) 682 out (buffer) 683 { 684 assert(buffer, classname(this) ~ ".pop_: returned a null buffer"); 685 assert(this); // invariant 686 } 687 do 688 { 689 assert(this); // invariant 690 verify(this.items > 0, classname(this) ~ ".pop_: no items in the queue"); 691 692 auto position = this.read_from; 693 this.read_from += Header.sizeof; 694 695 // TODO: Error if this.data.length < this.read_from. 696 697 auto header = cast(Header*)this.data[position .. this.read_from].ptr; 698 699 // TODO: Error if this.data.length - this.read_from < header.length 700 701 position = this.read_from; 702 this.read_from += header.length; 703 verify(this.read_from <= this.gap); // The invariant ensures that 704 // this.gap is not 0. 705 706 this.items--; // The precondition prevents decrementing 0. 707 708 scope (exit) 709 { 710 if (this.items) 711 { 712 if (this.read_from == this.gap) 713 { 714 /* 715 * End of data, wrap around: 716 * 1. Set the read position to the start of this.data. 717 */ 718 this.read_from = 0; 719 /* 720 * 2. The write position is now the end of the data. 721 * If the queue is now empty, i.e. this.items == 0, 722 * write_to must be 0. 723 */ 724 725 verify(this.items || !this.write_to); 726 this.gap = this.write_to; 727 } 728 } 729 else // Popped the last record. 730 { 731 verify(this.read_from == this.write_to); 732 this.read_from = 0; 733 this.write_to = 0; 734 this.gap = 0; 735 } 736 737 } 738 739 return this.data[position .. this.read_from]; 740 } 741 742 /*************************************************************************** 743 744 Validates queue state and content data. 745 746 Params: 747 meta = queue state data as imported by load() 748 data = queue content data 749 750 Throws: 751 ValidationError if meta and/or data are inconsistent. 752 753 ***************************************************************************/ 754 755 private static void validate ( ExportMetadata meta, in void[] data ) 756 { 757 if (meta.items) 758 { 759 enforce!(ValidationError)( 760 data.length, 761 cast(istring) ("Expected data for a non-empty queue (" 762 ~ itoa(meta.items) ~ " records)") 763 ); 764 765 enforce!(ValidationError)( 766 data.length >= cast(size_t)meta.items * Header.sizeof, 767 cast(istring) ("Queue data shorter than required minimum for " ~ 768 itoa(meta.items) ~ " records (got " ~ itoa(data.length) ~ 769 " bytes)") 770 ); 771 772 size_t pos = 0; 773 774 for (uint i = 0; i < meta.items; i++) 775 { 776 verify(pos <= data.length); 777 778 try 779 { 780 enforce!(ValidationError)(pos != data.length, 781 "Unexpected end of input data"); 782 783 auto start = pos; 784 pos += Header.sizeof; 785 786 enforce!(ValidationError)( 787 pos <= data.length, 788 cast(istring) ("End of queue data in the middle of" ~ 789 " the record header which starts at byte " ~ 790 itoa(start)) 791 ); 792 793 auto header = cast(const(Header)*)data[start .. pos].ptr; 794 795 enforce!(ValidationError)( 796 (data.length - pos) >= header.length, 797 cast(istring) ("End of queue data in the middle of the" ~ 798 " queue record, record length = " ~ 799 itoa(header.length)) 800 ); 801 802 pos += header.length; 803 } 804 catch (ValidationError e) 805 { 806 auto msg = "Error reading record " ~ itoa(i + i) ~ "/" ~ 807 itoa(meta.items) ~ ": " ~ e.message(); 808 e.msg = assumeUnique(msg); 809 throw e; 810 } 811 } 812 813 verify(pos <= data.length); 814 815 enforce!(ValidationError)( 816 pos >= data.length, 817 cast(istring) ("Queue data too long (" ~ itoa(meta.items) ~ 818 " records, " ~ itoa(pos) ~ "/" ~ itoa(data.length) ~ 819 " bytes used)") 820 ); 821 } 822 else 823 { 824 enforce!(ValidationError)( 825 !data.length, 826 cast(istring) ("Expected no data for an empty queue, not " ~ 827 itoa(data.length) ~ " bytes") 828 ); 829 } 830 } 831 832 /**************************************************************************/ 833 834 static class ValidationError: Exception 835 { 836 import ocean.core.Exception : DefaultExceptionCtor; 837 mixin DefaultExceptionCtor!(); 838 } 839 } 840 /******************************************************************************* 841 842 UnitTest 843 844 *******************************************************************************/ 845 846 version (unittest) 847 { 848 import ocean.io.model.IConduit: IConduit; 849 } 850 851 unittest 852 { 853 static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10; 854 855 scope queue = new FlexibleByteRingQueue(queue_size_1); 856 test(queue.free_space >= queue_size_1); 857 test(queue.is_empty); 858 859 test(queue.free_space >= queue_size_1); 860 test(queue.is_empty); 861 862 test(queue.push("Element 1")); 863 test(queue.pop() == "Element 1"); 864 test(queue.get_items == 0); 865 test(!queue.free_space == 0); 866 test(queue.is_empty); 867 test(queue.used_space() == 0); 868 869 test(queue.push("Element 1")); 870 test(queue.push("Element 2")); 871 test(queue.push("Element 3")); 872 test(queue.push("Element 4")); 873 test(queue.push("Element 5")); 874 test(queue.push("Element 6")); 875 test(queue.push("Element 7")); 876 test(queue.push("Element 8")); 877 test(queue.push("Element 9")); 878 test(queue.push("Element10")); 879 880 test(queue.length == 10); 881 test(queue.free_space == 0); 882 test(!queue.is_empty); 883 884 test(!queue.push("more")); 885 test(queue.length == 10); 886 887 scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5); 888 middle.push("1"); 889 middle.push("2"); 890 middle.push("3"); 891 middle.push("4"); 892 test(middle.pop == "1"); 893 test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof); 894 test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4); 895 test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 896 897 test(middle.push("5")); 898 test(middle.push("6")); 899 test(middle.free_space() == 0); 900 } 901 902 /******************************************************************************* 903 904 Save/load test. Uses the unittest above, adding save/load sequences in the 905 middle. This test is separate to allow for changing the above push/pop test 906 without breaking the save/load test. 907 908 *******************************************************************************/ 909 910 version (unittest) 911 { 912 import ocean.io.device.MemoryDevice; 913 } 914 915 unittest 916 { 917 // Buffers and callback functions for the delegate based save() & load(). 918 919 void[] saved_meta, saved_data; 920 921 void store ( in void[] meta, in void[] head, in void[] tail ) 922 { 923 saved_meta = meta.dup; 924 saved_data = head.dup ~ tail; 925 } 926 927 size_t restore ( void[] meta, void[] data ) 928 { 929 meta[] = saved_meta; 930 data[0 .. saved_data.length] = saved_data; 931 return saved_data.length; 932 } 933 934 // Memory I/O stream device for the stream based save() & load(). 935 936 scope backup = new MemoryDevice; 937 938 static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10; 939 940 scope queue = new FlexibleByteRingQueue(queue_size_1); 941 test(queue.free_space >= queue_size_1); 942 test(queue.is_empty); 943 944 queue.save(&store); 945 queue.load(&restore); 946 947 test(queue.free_space >= queue_size_1); 948 test(queue.is_empty); 949 950 test(queue.push("Element 1")); 951 test(queue.pop() == "Element 1"); 952 test(queue.get_items == 0); 953 test(!queue.free_space == 0); 954 test(queue.is_empty); 955 test(queue.used_space() == 0); 956 957 test(queue.push("Element 1")); 958 test(queue.push("Element 2")); 959 test(queue.push("Element 3")); 960 test(queue.push("Element 4")); 961 test(queue.push("Element 5")); 962 test(queue.push("Element 6")); 963 test(queue.push("Element 7")); 964 test(queue.push("Element 8")); 965 test(queue.push("Element 9")); 966 test(queue.push("Element10")); 967 968 // Save and restore the queue status in the middle of a test. 969 970 queue.save(&store); 971 queue.clear(); 972 queue.load(&restore); 973 974 test(queue.length == 10); 975 test(queue.free_space == 0); 976 test(!queue.is_empty); 977 978 test(!queue.push("more")); 979 test(queue.length == 10); 980 981 scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5); 982 middle.push("1"); 983 middle.push("2"); 984 middle.push("3"); 985 middle.push("4"); 986 test(middle.pop == "1"); 987 test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof); 988 test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4); 989 test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 990 991 // Save and restore the queue status in the middle of a test. 992 993 middle.save(backup); 994 middle.clear(); 995 backup.seek(0); 996 middle.load(backup); 997 test(backup.read(null) == backup.Eof); 998 backup.close(); 999 1000 test(middle.push("5")); 1001 test(middle.push("6")); 1002 test(middle.free_space() == 0); 1003 } 1004 1005 /******************************************************************************* 1006 1007 Test for the corner case of saving the queue state after wrapping around. 1008 1009 *******************************************************************************/ 1010 1011 unittest 1012 { 1013 enum Save {Dont = 0, Dg, Stream} 1014 1015 // Buffers and callback functions for the delegate based save() & load(). 1016 1017 void[] saved_meta, saved_data; 1018 1019 void store ( in void[] meta, in void[] head, in void[] tail ) 1020 { 1021 saved_meta = meta.dup; 1022 saved_data = head.dup ~ tail; 1023 } 1024 1025 size_t restore ( void[] meta, void[] data ) 1026 { 1027 meta[] = saved_meta; 1028 data[0 .. saved_data.length] = saved_data; 1029 return saved_data.length; 1030 } 1031 1032 // Memory I/O stream device for the stream based save() & load(). 1033 1034 scope backup = new MemoryDevice; 1035 1036 void save_wraparound ( Save save ) 1037 { 1038 static immutable Q_SIZE = 20; 1039 FlexibleByteRingQueue q = new FlexibleByteRingQueue(Q_SIZE); 1040 1041 void push(uint n) 1042 in 1043 { 1044 test(n <= ubyte.max); 1045 } 1046 do 1047 { 1048 for (ubyte i = 0; i < n; i++) 1049 { 1050 if (auto push_slice = q.push(1)) 1051 { 1052 push_slice[] = (&i)[0 .. 1]; 1053 } 1054 else 1055 { 1056 break; 1057 } 1058 1059 // Save and restore the queue status after wrapping around. 1060 1061 if (q.get_write_to <= q.get_read_from) switch (save) 1062 { 1063 case save.Dont: break; 1064 1065 case save.Dg: 1066 q.save(&store); 1067 q.clear(); 1068 q.load(&restore); 1069 break; 1070 1071 case save.Stream: 1072 q.save(backup); 1073 q.clear(); 1074 backup.seek(0); 1075 q.load(backup); 1076 test(backup.read(null) == backup.Eof); 1077 backup.close(); 1078 break; 1079 1080 default: assert(false); 1081 } 1082 } 1083 } 1084 1085 void pop(uint n) 1086 { 1087 for (uint i = 0; i < n; i++) 1088 { 1089 if (auto popped = q.pop()) 1090 { 1091 test (popped.length == 1); 1092 static ubyte[] unexpected = [cast(ubyte)Q_SIZE+1]; 1093 test (popped != unexpected); 1094 popped[] = unexpected; 1095 } 1096 else 1097 { 1098 break; 1099 } 1100 } 1101 } 1102 1103 push(2); 1104 pop(1); 1105 push(2); 1106 pop(1); 1107 push(3); 1108 pop(4); 1109 pop(1); 1110 } 1111 save_wraparound(Save.Dont); 1112 save_wraparound(Save.Dg); 1113 save_wraparound(Save.Stream); 1114 } 1115 1116 /******************************************************************************* 1117 1118 Performance test 1119 1120 *******************************************************************************/ 1121 1122 version (unittest) 1123 { 1124 // Uncomment the next line to see UnitTest output 1125 // version = UnitTestVerbose; 1126 1127 import ocean.core.Test; 1128 1129 import ocean.math.random.Random; 1130 import ocean.time.StopWatch; 1131 import core.memory; 1132 import ocean.io.FilePath; 1133 } 1134 1135 unittest 1136 { 1137 scope random = new Random(); 1138 1139 /*********************************************************************** 1140 1141 Test wrapping 1142 1143 ***********************************************************************/ 1144 1145 { 1146 scope queue = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*3); 1147 1148 test(queue.get_read_from == 0); 1149 test(queue.get_write_to == 0); 1150 // [___] r=0 w=0 1151 test(queue.push("1")); 1152 1153 test(queue.get_read_from == 0); 1154 test(queue.get_write_to == 1+FlexibleByteRingQueue.Header.sizeof); 1155 test(queue.get_items == 1); 1156 test((cast(FlexibleByteRingQueue.Header*) queue.get_data.ptr).length == 1); 1157 1158 { 1159 const(void)[] expected = "1"; 1160 test(queue.get_data[FlexibleByteRingQueue.Header.sizeof .. 1161 1+FlexibleByteRingQueue.Header.sizeof] == 1162 expected); 1163 } 1164 1165 // [#__] r=0 w=5 1166 test(queue.push("2")); 1167 1168 // [##_] r=0 w=10 1169 test(queue.push("3")); 1170 1171 // [###] r=0 w=15 1172 test(!queue.push("4")); 1173 test(queue.free_space == 0); 1174 test(queue.pop() == "1"); 1175 1176 // [_##] r=5 w=15 1177 test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof); 1178 test(queue.pop() == "2"); 1179 1180 // [__#] r=10 w=15 1181 test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2); 1182 test(queue.get_write_to == queue.get_data.length); 1183 test(queue.push("1")); 1184 1185 // [#_#] r=10 w=5 1186 test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof); 1187 test(queue.get_write_to == queue.pushSize("2".length)); 1188 test(queue.push("2")); 1189 // Stdout.formatln("gap is {}, free is {}, write is {}", queue.gap, queue.free_space(),queue.write_to); 1190 1191 1192 // [###] r=10 w=10 1193 test(queue.free_space == 0); 1194 test(queue.pop() == "3"); 1195 1196 // [##_] r=15/0 w=10 1197 test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*1); 1198 test(queue.pop() == "1"); 1199 1200 // [_#_] r=5 w=10 1201 test(queue.pop() == "2"); 1202 1203 // [__] r=0 w=0 1204 test(queue.is_empty); 1205 test(queue.push("1")); 1206 1207 // [#__] r=0 w=5 1208 test(queue.push("2#")); 1209 1210 // [#$_] r=0 w=11 ($ = 2 bytes) 1211 test(queue.pop() == "1"); 1212 1213 // [_$_] r=5 w=11 1214 test(queue.push("1")); 1215 1216 // [#$_] r=5 w=5 1217 test(!queue.push("2")); 1218 test(queue.pop() == "2#"); 1219 1220 // [#__] r=11 w=5 1221 test(queue.push("2")); // this needs to be wrapped now 1222 1223 // [##_] r=11 w=10 1224 } 1225 } 1226 1227 1228 // Test for a specific bug that caused garbled loglines due to an old wrong 1229 // willFit() function 1230 unittest 1231 { 1232 auto q = new FlexibleByteRingQueue(45); 1233 1234 // Setup conditions 1235 test(q.push("123456")); // w = 14 1236 test(q.push("12345678")); // w = 30 1237 test(q.push("123456")); // w = 44 1238 test(q.pop() == "123456"); 1239 test(q.pop() == "12345678"); // r == 30 1240 test(q.push("12345678123456781234")); // r = 30, gap = 44 1241 1242 auto test_push = "123456789.....16"; 1243 1244 // Make sure the bugs conditions are present 1245 test!(">")(q.get_read_from, q.get_write_to); 1246 test!("<")(q.get_read_from, q.gap); 1247 test!(">")(q.pushSize(test_push.length) + q.get_write_to, q.get_data.length); 1248 1249 // Do the actual test 1250 test(!q.push(test_push)); 1251 1252 test!("==")(q.pop(), "123456"); 1253 test!("==")(q.pop(), "12345678123456781234"); 1254 }