1 /*******************************************************************************
2 
3     Fixed size memory-based ring queue for elements of flexible size.
4 
5     Copyright:
6         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module ocean.util.container.queue.FlexibleRingQueue;
17 
18 
19 
20 import ocean.core.TypeConvert: assumeUnique;
21 
22 import ocean.meta.types.Qualifiers;
23 
24 import ocean.util.container.queue.model.IRingQueue;
25 
26 import ocean.util.container.queue.model.IByteQueue;
27 
28 import ocean.util.container.mem.MemManager;
29 
30 import ocean.io.model.IConduit: InputStream, OutputStream;
31 
32 import ocean.io.serialize.SimpleStreamSerializer;
33 
34 import ocean.text.util.ClassName;
35 
36 debug import ocean.io.Stdout;
37 
38 
39 
40 /*******************************************************************************
41 
42     Simple ubyte-based ring queue.
43 
44     TODO: usage example
45 
46 *******************************************************************************/
47 
48 class FlexibleByteRingQueue : IRingQueue!(IByteQueue)
49 {
50     import ocean.core.Verify;
51     import ocean.core.Enforce: enforce;
52 
53     import Integer = ocean.text.convert.Integer_tango: toString;
54     private alias Integer.toString itoa;
55 
56     /***************************************************************************
57 
58         Location of the gap at the rear end of the data array where the unused
59         space starts.
60 
61     ***************************************************************************/
62 
63     private size_t gap;
64 
65 
66     /***************************************************************************
67 
68         Metadata header for saving/loading the queue state
69 
70     ***************************************************************************/
71 
72     public struct ExportMetadata
73     {
74         uint items;
75     }
76 
77 
78     /***************************************************************************
79 
80         Header for queue items
81 
82     ***************************************************************************/
83 
84     private struct Header
85     {
86         size_t length;
87     }
88 
89 
90     /***************************************************************************
91 
92         Invariant to assert queue position consistency: When the queue is empty,
93         read_from and write_to must both be 0.
94 
95     ***************************************************************************/
96 
97     invariant ( )
98     {
99         debug scope ( failure ) Stderr.formatln
100         (
101             "{} invariant failed with items = {}, read_from = {}, " ~
102             "write_to = {}, gap = {}, data.length = {}",
103             classname(this), this.items, this.read_from, this.write_to,
104             this.gap, this.data.length
105         );
106 
107         if (this.items)
108         {
109             assert(this.gap       <= this.data.length, "gap out of range");
110             assert(this.read_from <= this.data.length, "read_from out of range");
111             assert(this.write_to  <= this.data.length, "write_to out of range");
112             assert(this.write_to,                      "write_to 0 with non-empty queue");
113             assert(this.read_from < this.gap,          "read_from within gap");
114             assert((this.gap == this.write_to) ||
115                    !(this.read_from < this.write_to),
116                    "read_from < write_to but gap not write position");
117         }
118         else
119         {
120             assert(!this.gap, "gap expected to be 0 for empty queue");
121             assert(!this.read_from, "read_from expected to be 0 for empty queue");
122             assert(!this.write_to, "write_to expected to be 0 for empty queue");
123         }
124     }
125 
126 
127     /***************************************************************************
128 
129         Constructor. The queue's memory buffer is allocated by the GC.
130 
131         Params:
132             dimension = size of queue in bytes
133 
134     ***************************************************************************/
135 
136     public this ( size_t dimension )
137     {
138         verify(dimension > 0, typeof(this).stringof ~ ": cannot construct a 0-length queue");
139 
140         super(dimension);
141     }
142 
143 
144     /***************************************************************************
145 
146         Constructor. Allocates the queue's memory buffer with the provided
147         memory manager.
148 
149         Params:
150             mem_manager = memory manager to use to allocate queue's buffer
151             dimension = size of queue in bytes
152 
153     ***************************************************************************/
154 
155     public this ( IMemManager mem_manager, size_t dimension )
156     {
157         super(mem_manager, dimension);
158     }
159 
160 
161     /***************************************************************************
162 
163         Pushes an item into the queue.
164 
165         item.length = 0 is allowed.
166 
167         Params:
168             item = data item to push
169 
170         Returns:
171             true if the item was pushed successfully, false if it didn't fit
172 
173     ***************************************************************************/
174 
175     public bool push ( in void[] item )
176     {
177         auto data = this.push(item.length);
178 
179         if ( data is null )
180         {
181             return false;
182         }
183 
184         data[] = item[];
185 
186         return true;
187     }
188 
189     /***************************************************************************
190 
191         Reserves space for an item of <size> bytes on the queue but doesn't
192         fill the content. The caller is expected to fill in the content using
193         the returned slice.
194 
195         size = 0 is allowed.
196 
197         Params:
198             size = size of the space of the item that should be reserved
199 
200         Returns:
201             slice to the reserved space if it was successfully reserved, else
202             null. Returns non-null empty string if size = 0 and the item was
203             successfully pushed.
204 
205         Out:
206             The length of the returned array slice is size unless the slice is
207             null.
208 
209     ***************************************************************************/
210 
211     public void[] push ( size_t size )
212     out (slice)
213     {
214         assert(slice is null || slice.length == size,
215                classname(this) ~ "push: length of returned buffer not as requested");
216     }
217     do
218     {
219         return this.willFit(size) ? this.push_(size) : null;
220     }
221 
222 
223     /***************************************************************************
224 
225         Pops an item from the queue.
226 
227         Returns:
228             item popped from queue, may be null if queue is empty
229 
230     ***************************************************************************/
231 
232     public void[] pop ( )
233     {
234         return this.items ? this.pop_() : null;
235     }
236 
237 
238     /***************************************************************************
239 
240         Peeks at the item that would be popped next.
241 
242         Returns:
243             item that would be popped from queue,
244             may be null if queue is empty
245 
246     ***************************************************************************/
247 
248     public void[] peek ( )
249     {
250         if (this.items)
251         {
252             auto h = this.read_from;
253             auto d = h + Header.sizeof;
254             auto header = cast(Header*) this.data[h .. d].ptr;
255             return this.data[d .. d + header.length];
256         }
257         else
258         {
259             return null;
260         }
261     }
262 
263 
264     /***************************************************************************
265 
266         Returns:
267             number of bytes stored in queue
268 
269     ***************************************************************************/
270 
271     public override ulong used_space ( )
272     {
273         if (this.items == 0)
274         {
275             return 0;
276         }
277 
278         if (this.write_to > this.read_from)
279         {
280             return this.write_to - this.read_from;
281         }
282 
283         return this.gap - this.read_from + this.write_to;
284     }
285 
286 
287     /***************************************************************************
288 
289         Removes all items from the queue.
290 
291     ***************************************************************************/
292 
293     public override void clear ( )
294     {
295         super.clear();
296         this.items = 0;
297         this.gap = 0;
298     }
299 
300 
301     /***************************************************************************
302 
303         Tells how much space an item would take up when written into the queue.
304         (Including the size of the required item header.)
305 
306         Params:
307             item = item to calculate push size of
308 
309         Returns:
310             number of bytes the item would take up if pushed to the queue
311 
312     ***************************************************************************/
313 
314     static public size_t pushSize ( in void[] item )
315     {
316         return pushSize(item.length);
317     }
318 
319 
320     /***************************************************************************
321 
322         Tells how much space an item of the specified size would take up when
323         written into the queue. (Including the size of the required item
324         header.)
325 
326         Params:
327             bytes = number of bytes to calculate push size of
328 
329         Returns:
330             number of bytes the item would take up if pushed to the queue
331 
332     ***************************************************************************/
333 
334     static public size_t pushSize ( size_t bytes )
335     {
336         return Header.sizeof + bytes;
337     }
338 
339 
340     /***************************************************************************
341 
342         Finds out whether the provided item will fit in the queue. Also
343         considers the need of wrapping.
344 
345         Params:
346             item = item to check
347 
348         Returns:
349             true if the item fits, else false
350 
351     ***************************************************************************/
352 
353     bool willFit ( in void[] item )
354     {
355         return this.willFit(item.length);
356     }
357 
358 
359     /***************************************************************************
360 
361         Finds out whether the provided number of bytes will fit in the queue.
362         Also considers the need of wrapping.
363 
364         Note that this method internally adds on the extra bytes required for
365         the item header, so it is *not* necessary for the end-user to first
366         calculate the item's push size.
367 
368         Params:
369             bytes = size of item to check
370 
371         Returns:
372             true if the bytes fits, else false
373 
374     ***************************************************************************/
375 
376     public bool willFit ( size_t bytes )
377     {
378         size_t push_size = this.pushSize(bytes);
379 
380         if (this.read_from < this.write_to)
381         {
382             /*
383              *  Free space at either
384              *  - data[write_to .. $], the end, or
385              *  - data[0 .. read_from], the beginning, wrapping around.
386              */
387             return ((this.data.length - this.write_to) >= push_size) // Fits at the end.
388                    || (this.read_from >= push_size); // Fits at the start wrapping around.
389 
390         }
391         else if (this.items)
392         {
393             // Free space at data[write_to .. read_from].
394             return (this.read_from - this.write_to) >= push_size;
395         }
396         else
397         {
398             // Queue empty: data is the free space.
399             return push_size <= this.data.length;
400         }
401     }
402 
403     /***************************************************************************
404 
405         Writes the queue's state and contents to the given output stream in the
406         following format:
407 
408         - First ExportMetadata.sizeof bytes: Metadata header
409         - Next size_t.sizeof bytes: Number n of bytes of queue data (possibly 0)
410         - Next n bytes: Queue data
411 
412         Params:
413             output = output stream to write to
414 
415         Returns:
416             number of bytes written to output.
417 
418     ***************************************************************************/
419 
420     public size_t save ( OutputStream output )
421     {
422         size_t bytes = 0;
423 
424         this.save((in void[] meta, in void[] head, in void[] tail = null)
425         {
426             bytes += SimpleStreamSerializer.writeData(output, meta);
427             bytes += SimpleStreamSerializer.write(output, head.length + tail.length);
428             if (head.length) bytes += SimpleStreamSerializer.writeData(output, head);
429             if (tail.length) bytes += SimpleStreamSerializer.writeData(output, tail);
430         });
431 
432         return bytes;
433     }
434 
435     /***************************************************************************
436 
437         Calls the provided delegate, store(), to store the queue state and
438         contents.
439 
440         The caller may concatenate the contents of the three buffers because
441           - it is safe to assume that load() will use the same meta.length and
442           - load() expects to receive the head ~ tail data.
443 
444         Example:
445 
446         ---
447 
448             auto queue = new FlexibleRingQueue(/+ ... +/);
449 
450             // Populate the queue...
451 
452             void[] queue_save_data;
453 
454             // Save the populated queue in queue_save_data.
455 
456             queue.save(void[] meta, void[] head, void[] tail)
457             {
458                 queue_save_data = meta ~ head ~ tail;
459             }
460 
461             // Restore queue from queue_save_data.
462 
463             queue.load(void[] meta, void[] data)
464             {
465                 // It is safe to assume meta.length is the same as in the
466                 // queue.save() callback delegate.
467                 meta[] = queue_save_data[0 .. meta.length];
468 
469                 void[] queue_load_data = queue_save_data[meta.length .. $];
470                 data[] = queue_load_data[];
471                 return queue_load_data.length;
472             }
473 
474         ---
475 
476         This method can also be called to poll the amount of space required for
477         storing the current queue content, which is
478         meta.length + head.length + tail.length.
479 
480         The data produced by this method is accepted by the load() method of any
481         queue where queue.length >= head.length + tail.length.
482 
483         Params:
484             store = output delegate
485 
486     ***************************************************************************/
487 
488     public void save ( scope void delegate ( in void[] meta, in void[] head, in void[] tail = null ) store )
489     {
490         auto meta = ExportMetadata(this.items);
491 
492         if (this.read_from < this.write_to)
493         {
494             store((&meta)[0 .. 1], this.data[this.read_from .. this.write_to]);
495         }
496         else
497         {
498             store((&meta)[0 .. 1],
499                   this.data[this.read_from .. this.gap],
500                   this.data[0 .. this.write_to]);
501         }
502     }
503 
504     /***************************************************************************
505 
506         Restores the queue state and contents, reading from input and expecting
507         data previously written by save() to an output stream.
508 
509         Assumes that the input data do not exceed the queue capacity and throws
510         if they do. If this is possible and you want to handle this gracefully
511         (rather than getting an exception thrown), use the other overload of
512         this method.
513 
514         Params:
515             input = input stream
516 
517         Returns:
518             the number of bytes read from input.
519 
520         Throws:
521             ValidationError if the input data are inconsistent or exceed the
522             queue capacity. When throwing, the queue remains empty.
523 
524     ***************************************************************************/
525 
526     public size_t load ( InputStream input )
527     {
528         size_t bytes = 0;
529 
530         this.load((void[] meta, void[] data)
531         {
532             bytes += SimpleStreamSerializer.readData(input, meta);
533 
534             size_t data_length;
535             bytes += SimpleStreamSerializer.read(input, data_length);
536             enforce!(ValidationError)(data_length <= data.length,
537                 "Size of loaded data exceeds queue capacity");
538             bytes += SimpleStreamSerializer.readData(input, data[0 .. data_length]);
539             return data_length;
540         });
541 
542         return bytes;
543     }
544 
545     /***************************************************************************
546 
547         Restores the queue state and contents.
548 
549         Clears the queue, then calls the provided delegate, restore(), to
550         restore the queue state and contents and validates it.
551 
552         restore() should populate the meta and data buffer it receives with data
553         previously obtained from save():
554          - meta should be populated with the data from the store() meta
555            parameter,
556          - data[0 .. head.length + tail.length] should be populated with the
557            head ~ tail data as received by the store() delegate during save(),
558          - restore() should return head.length + tail.length.
559 
560         See the example in the documentation of save().
561 
562         Params:
563             restore = input delegate
564 
565         Throws:
566             ValidationError if the input data are inconsistent. When throwing,
567             the queue remains empty.
568 
569     ***************************************************************************/
570 
571     public void load ( scope size_t delegate ( void[] meta, void[] data ) restore )
572     {
573         this.clear();
574 
575         /*
576          * Pass this.data as the destination buffer to restore() and validate
577          * its content after restore() populated it. Should the validation fail,
578          * items, read_from, write_to and gap will remain 0 so the queue is
579          * empty and the invalid data in this.data are not harmful.
580          */
581 
582         ExportMetadata meta;
583         size_t end = restore((&meta)[0 .. 1], this.data);
584 
585         verify(end <= this.data.length,
586                idup(classname(this) ~ ".save(): restore callback expected to " ~
587                "return at most " ~ itoa(this.data.length) ~ ", not" ~ itoa(end)));
588 
589         this.validate(meta, this.data[0 .. end]);
590 
591         this.items    = meta.items;
592         this.write_to = end;
593         this.gap      = end;
594     }
595 
596     /***************************************************************************
597 
598         Pushes an item into the queue.
599 
600         Params:
601             item = data item to push
602 
603     ***************************************************************************/
604 
605     private void[] push_ ( size_t size )
606     out (slice)
607     {
608         assert(slice !is null,
609                classname(this) ~ "push_: returned a null slice");
610         assert(slice.length == size,
611                classname(this) ~ "push_: length of returned slice not as requested");
612         assert(this); // invariant
613     }
614     do
615     {
616         assert(this); // invariant
617         verify(this.willFit(size), classname(this) ~ ".push_: item will not fit");
618 
619         auto push_size = this.pushSize(size);
620 
621         /*
622          * read_from and write_to can have three different relationships:
623          *
624          * 1. write_to == read_from: The queue is empty, both are 0, the
625          *    record goes to data[write_to .. $].
626          *
627          * 2. write_to < read_from: The record goes in
628          *    data[write_to .. read_from].
629          *
630          * 3. read_from < write_to: The record goes either in
631          *   a) data[write_to .. $] if there is enough space or
632          *   b) data[0 .. read_from], wrapping around by setting
633          *      write_to = 0.
634          *
635          * The destination slice of data in case 3a is equivalent to case 1
636          * and in case 3b to case 2.
637          */
638 
639         if (this.read_from < this.write_to)
640         {
641             verify(this.gap == this.write_to);
642 
643             // Case 3: Check if the record fits in data[write_to .. $] ...
644             if (this.data.length - this.write_to < push_size)
645             {
646                 /*
647                  * ... no, we have to wrap around. The precondition claims
648                  * the record does fit so there must be enough space in
649                  * data[0 .. read_from].
650                  */
651                 verify(push_size <= this.read_from);
652                 this.write_to = 0;
653             }
654         }
655 
656         auto start = this.write_to;
657         this.write_to += push_size;
658 
659         if (this.write_to > this.read_from) // Case 1 or 3a.
660         {
661             this.gap = this.write_to;
662         }
663 
664         this.items++;
665 
666         void[] dst = this.data[start .. this.write_to];
667         *cast(Header*)dst[0 .. Header.sizeof].ptr = Header(size);
668         return dst[Header.sizeof .. $];
669     }
670 
671 
672     /***************************************************************************
673 
674         Pops an item from the queue.
675 
676         Returns:
677             item popped from queue
678 
679     ***************************************************************************/
680 
681     private void[] pop_ ( )
682     out (buffer)
683     {
684         assert(buffer, classname(this) ~ ".pop_: returned a null buffer");
685         assert(this); // invariant
686     }
687     do
688     {
689         assert(this); // invariant
690         verify(this.items > 0, classname(this) ~ ".pop_: no items in the queue");
691 
692         auto position = this.read_from;
693         this.read_from += Header.sizeof;
694 
695         // TODO: Error if this.data.length < this.read_from.
696 
697         auto header = cast(Header*)this.data[position .. this.read_from].ptr;
698 
699         // TODO: Error if this.data.length - this.read_from < header.length
700 
701         position = this.read_from;
702         this.read_from += header.length;
703         verify(this.read_from <= this.gap); // The invariant ensures that
704                                             // this.gap is not 0.
705 
706         this.items--; // The precondition prevents decrementing 0.
707 
708         scope (exit)
709         {
710             if (this.items)
711             {
712                 if (this.read_from == this.gap)
713                 {
714                     /*
715                      *  End of data, wrap around:
716                      *  1. Set the read position to the start of this.data.
717                      */
718                     this.read_from = 0;
719                     /*
720                      *  2. The write position is now the end of the data.
721                      *     If the queue is now empty, i.e. this.items == 0,
722                      *     write_to must be 0.
723                      */
724 
725                     verify(this.items || !this.write_to);
726                     this.gap = this.write_to;
727                 }
728             }
729             else // Popped the last record.
730             {
731                 verify(this.read_from == this.write_to);
732                 this.read_from = 0;
733                 this.write_to  = 0;
734                 this.gap       = 0;
735             }
736 
737         }
738 
739         return this.data[position .. this.read_from];
740     }
741 
742     /***************************************************************************
743 
744         Validates queue state and content data.
745 
746         Params:
747             meta = queue state data as imported by load()
748             data = queue content data
749 
750         Throws:
751             ValidationError if meta and/or data are inconsistent.
752 
753     ***************************************************************************/
754 
755     private static void validate ( ExportMetadata meta, in void[] data )
756     {
757         if (meta.items)
758         {
759             enforce!(ValidationError)(
760                 data.length,
761                 cast(istring) ("Expected data for a non-empty queue ("
762                     ~ itoa(meta.items) ~ " records)")
763             );
764 
765             enforce!(ValidationError)(
766                 data.length >= cast(size_t)meta.items * Header.sizeof,
767                 cast(istring) ("Queue data shorter than required minimum for " ~
768                     itoa(meta.items) ~ " records (got " ~ itoa(data.length) ~
769                     " bytes)")
770             );
771 
772             size_t pos = 0;
773 
774             for (uint i = 0; i < meta.items; i++)
775             {
776                 verify(pos <= data.length);
777 
778                 try
779                 {
780                     enforce!(ValidationError)(pos != data.length,
781                                               "Unexpected end of input data");
782 
783                     auto start = pos;
784                     pos += Header.sizeof;
785 
786                     enforce!(ValidationError)(
787                         pos <= data.length,
788                         cast(istring) ("End of queue data in the middle of" ~
789                             " the record header which starts at byte " ~
790                             itoa(start))
791                     );
792 
793                     auto header = cast(const(Header)*)data[start .. pos].ptr;
794 
795                     enforce!(ValidationError)(
796                         (data.length - pos) >= header.length,
797                         cast(istring) ("End of queue data in the middle of the" ~
798                             " queue record, record length = " ~
799                             itoa(header.length))
800                     );
801 
802                     pos += header.length;
803                 }
804                 catch (ValidationError e)
805                 {
806                     auto msg = "Error reading record " ~ itoa(i + i) ~ "/" ~
807                         itoa(meta.items) ~ ": " ~ e.message();
808                     e.msg = assumeUnique(msg);
809                     throw e;
810                 }
811             }
812 
813             verify(pos <= data.length);
814 
815             enforce!(ValidationError)(
816                 pos >= data.length,
817                 cast(istring) ("Queue data  too long (" ~ itoa(meta.items) ~
818                     " records, " ~ itoa(pos) ~ "/" ~ itoa(data.length) ~
819                     " bytes used)")
820             );
821         }
822         else
823         {
824             enforce!(ValidationError)(
825                 !data.length,
826                 cast(istring) ("Expected no data for an empty queue, not " ~
827                     itoa(data.length) ~ " bytes")
828             );
829         }
830     }
831 
832     /**************************************************************************/
833 
834     static class ValidationError: Exception
835     {
836         import ocean.core.Exception : DefaultExceptionCtor;
837         mixin DefaultExceptionCtor!();
838     }
839 }
840 /*******************************************************************************
841 
842     UnitTest
843 
844 *******************************************************************************/
845 
846 version (unittest)
847 {
848     import ocean.io.model.IConduit: IConduit;
849 }
850 
851 unittest
852 {
853     static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10;
854 
855     scope queue = new FlexibleByteRingQueue(queue_size_1);
856     test(queue.free_space >= queue_size_1);
857     test(queue.is_empty);
858 
859     test(queue.free_space >= queue_size_1);
860     test(queue.is_empty);
861 
862     test(queue.push("Element 1"));
863     test(queue.pop() == "Element 1");
864     test(queue.get_items == 0);
865     test(!queue.free_space == 0);
866     test(queue.is_empty);
867     test(queue.used_space() == 0);
868 
869     test(queue.push("Element 1"));
870     test(queue.push("Element 2"));
871     test(queue.push("Element 3"));
872     test(queue.push("Element 4"));
873     test(queue.push("Element 5"));
874     test(queue.push("Element 6"));
875     test(queue.push("Element 7"));
876     test(queue.push("Element 8"));
877     test(queue.push("Element 9"));
878     test(queue.push("Element10"));
879 
880     test(queue.length == 10);
881     test(queue.free_space == 0);
882     test(!queue.is_empty);
883 
884     test(!queue.push("more"));
885     test(queue.length == 10);
886 
887     scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5);
888     middle.push("1");
889     middle.push("2");
890     middle.push("3");
891     middle.push("4");
892     test(middle.pop == "1");
893     test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof);
894     test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4);
895     test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
896 
897     test(middle.push("5"));
898     test(middle.push("6"));
899     test(middle.free_space() == 0);
900 }
901 
902 /*******************************************************************************
903 
904     Save/load test. Uses the unittest above, adding save/load sequences in the
905     middle. This test is separate to allow for changing the above push/pop test
906     without breaking the save/load test.
907 
908 *******************************************************************************/
909 
910 version (unittest)
911 {
912     import ocean.io.device.MemoryDevice;
913 }
914 
915 unittest
916 {
917     // Buffers and callback functions for the delegate based save() & load().
918 
919     void[] saved_meta, saved_data;
920 
921     void store ( in void[] meta, in void[] head, in void[] tail )
922     {
923         saved_meta = meta.dup;
924         saved_data = head.dup ~ tail;
925     }
926 
927     size_t restore ( void[] meta, void[] data )
928     {
929         meta[] = saved_meta;
930         data[0 .. saved_data.length] = saved_data;
931         return saved_data.length;
932     }
933 
934     // Memory I/O stream device for the stream based save() & load().
935 
936     scope backup = new MemoryDevice;
937 
938     static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10;
939 
940     scope queue = new FlexibleByteRingQueue(queue_size_1);
941     test(queue.free_space >= queue_size_1);
942     test(queue.is_empty);
943 
944     queue.save(&store);
945     queue.load(&restore);
946 
947     test(queue.free_space >= queue_size_1);
948     test(queue.is_empty);
949 
950     test(queue.push("Element 1"));
951     test(queue.pop() == "Element 1");
952     test(queue.get_items == 0);
953     test(!queue.free_space == 0);
954     test(queue.is_empty);
955     test(queue.used_space() == 0);
956 
957     test(queue.push("Element 1"));
958     test(queue.push("Element 2"));
959     test(queue.push("Element 3"));
960     test(queue.push("Element 4"));
961     test(queue.push("Element 5"));
962     test(queue.push("Element 6"));
963     test(queue.push("Element 7"));
964     test(queue.push("Element 8"));
965     test(queue.push("Element 9"));
966     test(queue.push("Element10"));
967 
968     // Save and restore the queue status in the middle of a test.
969 
970     queue.save(&store);
971     queue.clear();
972     queue.load(&restore);
973 
974     test(queue.length == 10);
975     test(queue.free_space == 0);
976     test(!queue.is_empty);
977 
978     test(!queue.push("more"));
979     test(queue.length == 10);
980 
981     scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5);
982     middle.push("1");
983     middle.push("2");
984     middle.push("3");
985     middle.push("4");
986     test(middle.pop == "1");
987     test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof);
988     test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4);
989     test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
990 
991     // Save and restore the queue status in the middle of a test.
992 
993     middle.save(backup);
994     middle.clear();
995     backup.seek(0);
996     middle.load(backup);
997     test(backup.read(null) == backup.Eof);
998     backup.close();
999 
1000     test(middle.push("5"));
1001     test(middle.push("6"));
1002     test(middle.free_space() == 0);
1003 }
1004 
1005 /*******************************************************************************
1006 
1007     Test for the corner case of saving the queue state after wrapping around.
1008 
1009 *******************************************************************************/
1010 
1011 unittest
1012 {
1013     enum Save {Dont = 0, Dg, Stream}
1014 
1015     // Buffers and callback functions for the delegate based save() & load().
1016 
1017     void[] saved_meta, saved_data;
1018 
1019     void store ( in void[] meta, in void[] head, in void[] tail )
1020     {
1021         saved_meta = meta.dup;
1022         saved_data = head.dup ~ tail;
1023     }
1024 
1025     size_t restore ( void[] meta, void[] data )
1026     {
1027         meta[] = saved_meta;
1028         data[0 .. saved_data.length] = saved_data;
1029         return saved_data.length;
1030     }
1031 
1032     // Memory I/O stream device for the stream based save() & load().
1033 
1034     scope backup = new MemoryDevice;
1035 
1036     void save_wraparound ( Save save )
1037     {
1038         static immutable Q_SIZE = 20;
1039         FlexibleByteRingQueue q = new FlexibleByteRingQueue(Q_SIZE);
1040 
1041         void push(uint n)
1042         in
1043         {
1044             test(n <= ubyte.max);
1045         }
1046         do
1047         {
1048             for (ubyte i = 0; i < n; i++)
1049             {
1050                 if (auto push_slice = q.push(1))
1051                 {
1052                     push_slice[] = (&i)[0 .. 1];
1053                 }
1054                 else
1055                 {
1056                     break;
1057                 }
1058 
1059                 // Save and restore the queue status after wrapping around.
1060 
1061                 if (q.get_write_to <= q.get_read_from) switch (save)
1062                 {
1063                     case save.Dont: break;
1064 
1065                     case save.Dg:
1066                         q.save(&store);
1067                         q.clear();
1068                         q.load(&restore);
1069                         break;
1070 
1071                     case save.Stream:
1072                         q.save(backup);
1073                         q.clear();
1074                         backup.seek(0);
1075                         q.load(backup);
1076                         test(backup.read(null) == backup.Eof);
1077                         backup.close();
1078                         break;
1079 
1080                     default: assert(false);
1081                 }
1082             }
1083         }
1084 
1085         void pop(uint n)
1086         {
1087             for (uint i = 0; i < n; i++)
1088             {
1089                 if (auto popped = q.pop())
1090                 {
1091                     test (popped.length == 1);
1092                     static ubyte[] unexpected = [cast(ubyte)Q_SIZE+1];
1093                     test (popped != unexpected);
1094                     popped[] = unexpected;
1095                 }
1096                 else
1097                 {
1098                     break;
1099                 }
1100             }
1101         }
1102 
1103         push(2);
1104         pop(1);
1105         push(2);
1106         pop(1);
1107         push(3);
1108         pop(4);
1109         pop(1);
1110     }
1111     save_wraparound(Save.Dont);
1112     save_wraparound(Save.Dg);
1113     save_wraparound(Save.Stream);
1114 }
1115 
1116 /*******************************************************************************
1117 
1118     Performance test
1119 
1120 *******************************************************************************/
1121 
1122 version (unittest)
1123 {
1124     // Uncomment the next line to see UnitTest output
1125     // version = UnitTestVerbose;
1126 
1127     import ocean.core.Test;
1128 
1129     import ocean.math.random.Random;
1130     import ocean.time.StopWatch;
1131     import core.memory;
1132     import ocean.io.FilePath;
1133 }
1134 
1135 unittest
1136 {
1137      scope random = new Random();
1138 
1139     /***********************************************************************
1140 
1141         Test wrapping
1142 
1143     ***********************************************************************/
1144 
1145     {
1146         scope queue = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*3);
1147 
1148         test(queue.get_read_from == 0);
1149         test(queue.get_write_to == 0);
1150         // [___] r=0 w=0
1151         test(queue.push("1"));
1152 
1153         test(queue.get_read_from == 0);
1154         test(queue.get_write_to == 1+FlexibleByteRingQueue.Header.sizeof);
1155         test(queue.get_items == 1);
1156         test((cast(FlexibleByteRingQueue.Header*) queue.get_data.ptr).length == 1);
1157 
1158         {
1159             const(void)[] expected = "1";
1160             test(queue.get_data[FlexibleByteRingQueue.Header.sizeof ..
1161                               1+FlexibleByteRingQueue.Header.sizeof] ==
1162                                 expected);
1163         }
1164 
1165         // [#__] r=0 w=5
1166         test(queue.push("2"));
1167 
1168         // [##_] r=0 w=10
1169         test(queue.push("3"));
1170 
1171         // [###] r=0 w=15
1172         test(!queue.push("4"));
1173         test(queue.free_space == 0);
1174         test(queue.pop() == "1");
1175 
1176         // [_##] r=5 w=15
1177         test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof);
1178         test(queue.pop() == "2");
1179 
1180         // [__#] r=10 w=15
1181         test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
1182         test(queue.get_write_to == queue.get_data.length);
1183         test(queue.push("1"));
1184 
1185         // [#_#] r=10 w=5
1186         test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof);
1187         test(queue.get_write_to == queue.pushSize("2".length));
1188         test(queue.push("2"));
1189        // Stdout.formatln("gap is {}, free is {}, write is {}", queue.gap, queue.free_space(),queue.write_to);
1190 
1191 
1192         // [###] r=10 w=10
1193         test(queue.free_space == 0);
1194         test(queue.pop() == "3");
1195 
1196         // [##_] r=15/0 w=10
1197         test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*1);
1198         test(queue.pop() == "1");
1199 
1200         // [_#_] r=5 w=10
1201         test(queue.pop() == "2");
1202 
1203         // [__] r=0 w=0
1204         test(queue.is_empty);
1205         test(queue.push("1"));
1206 
1207         // [#__] r=0 w=5
1208         test(queue.push("2#"));
1209 
1210         // [#$_] r=0 w=11 ($ = 2 bytes)
1211         test(queue.pop() == "1");
1212 
1213         // [_$_] r=5 w=11
1214         test(queue.push("1"));
1215 
1216         // [#$_] r=5 w=5
1217         test(!queue.push("2"));
1218         test(queue.pop() == "2#");
1219 
1220         // [#__] r=11 w=5
1221         test(queue.push("2")); // this needs to be wrapped now
1222 
1223         // [##_] r=11 w=10
1224     }
1225 }
1226 
1227 
1228 // Test for a specific bug that caused garbled loglines due to an old wrong
1229 // willFit() function
1230 unittest
1231 {
1232     auto q = new FlexibleByteRingQueue(45);
1233 
1234     // Setup conditions
1235     test(q.push("123456")); // w = 14
1236     test(q.push("12345678")); // w = 30
1237     test(q.push("123456")); // w = 44
1238     test(q.pop() == "123456");
1239     test(q.pop() == "12345678"); // r == 30
1240     test(q.push("12345678123456781234")); // r = 30, gap = 44
1241 
1242     auto test_push = "123456789.....16";
1243 
1244     // Make sure the bugs conditions are present
1245     test!(">")(q.get_read_from, q.get_write_to);
1246     test!("<")(q.get_read_from, q.gap);
1247     test!(">")(q.pushSize(test_push.length) + q.get_write_to, q.get_data.length);
1248 
1249     // Do the actual test
1250     test(!q.push(test_push));
1251 
1252     test!("==")(q.pop(), "123456");
1253     test!("==")(q.pop(), "12345678123456781234");
1254 }