1 /*******************************************************************************
2 
3     Fixed size memory-based ring queue for elements of flexible size.
4 
5     Copyright:
6         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module ocean.util.container.queue.FlexibleRingQueue;
17 
18 
19 
20 
21 import ocean.transition;
22 
23 import ocean.util.container.queue.model.IRingQueue;
24 
25 import ocean.util.container.queue.model.IByteQueue;
26 
27 import ocean.util.container.mem.MemManager;
28 
29 import ocean.io.model.IConduit: InputStream, OutputStream;
30 
31 import ocean.io.serialize.SimpleStreamSerializer;
32 
33 import ocean.text.util.ClassName;
34 
35 debug import ocean.io.Stdout;
36 
37 
38 
39 /*******************************************************************************
40 
41     Simple ubyte-based ring queue.
42 
43     TODO: usage example
44 
45 *******************************************************************************/
46 
47 class FlexibleByteRingQueue : IRingQueue!(IByteQueue)
48 {
49     import ocean.core.Verify;
50     import ocean.core.Enforce: enforce;
51 
52     import Integer = ocean.text.convert.Integer_tango: toString;
53     private alias Integer.toString itoa;
54 
55     /***************************************************************************
56 
57         Location of the gap at the rear end of the data array where the unused
58         space starts.
59 
60     ***************************************************************************/
61 
62     private size_t gap;
63 
64 
65     /***************************************************************************
66 
67         Metadata header for saving/loading the queue state
68 
69     ***************************************************************************/
70 
71     public struct ExportMetadata
72     {
73         uint items;
74     }
75 
76 
77     /***************************************************************************
78 
79         Header for queue items
80 
81     ***************************************************************************/
82 
83     private struct Header
84     {
85         size_t length;
86     }
87 
88 
89     /***************************************************************************
90 
91         Invariant to assert queue position consistency: When the queue is empty,
92         read_from and write_to must both be 0.
93 
94     ***************************************************************************/
95 
96     invariant ( )
97     {
98         debug scope ( failure ) Stderr.formatln
99         (
100             "{} invariant failed with items = {}, read_from = {}, " ~
101             "write_to = {}, gap = {}, data.length = {}",
102             classname(this), this.items, this.read_from, this.write_to,
103             this.gap, this.data.length
104         );
105 
106         if (this.items)
107         {
108             assert(this.gap       <= this.data.length, "gap out of range");
109             assert(this.read_from <= this.data.length, "read_from out of range");
110             assert(this.write_to  <= this.data.length, "write_to out of range");
111             assert(this.write_to,                      "write_to 0 with non-empty queue");
112             assert(this.read_from < this.gap,          "read_from within gap");
113             assert((this.gap == this.write_to) ||
114                    !(this.read_from < this.write_to),
115                    "read_from < write_to but gap not write position");
116         }
117         else
118         {
119             assert(!this.gap, "gap expected to be 0 for empty queue");
120             assert(!this.read_from, "read_from expected to be 0 for empty queue");
121             assert(!this.write_to, "write_to expected to be 0 for empty queue");
122         }
123     }
124 
125 
126     /***************************************************************************
127 
128         Constructor. The queue's memory buffer is allocated by the GC.
129 
130         Params:
131             dimension = size of queue in bytes
132 
133     ***************************************************************************/
134 
135     public this ( size_t dimension )
136     {
137         verify(dimension > 0, typeof(this).stringof ~ ": cannot construct a 0-length queue");
138 
139         super(dimension);
140     }
141 
142 
143     /***************************************************************************
144 
145         Constructor. Allocates the queue's memory buffer with the provided
146         memory manager.
147 
148         Params:
149             mem_manager = memory manager to use to allocate queue's buffer
150             dimension = size of queue in bytes
151 
152     ***************************************************************************/
153 
154     public this ( IMemManager mem_manager, size_t dimension )
155     {
156         super(mem_manager, dimension);
157     }
158 
159 
160     /***************************************************************************
161 
162         Pushes an item into the queue.
163 
164         item.length = 0 is allowed.
165 
166         Params:
167             item = data item to push
168 
169         Returns:
170             true if the item was pushed successfully, false if it didn't fit
171 
172     ***************************************************************************/
173 
174     public bool push ( in void[] item )
175     {
176         auto data = this.push(item.length);
177 
178         if ( data is null )
179         {
180             return false;
181         }
182 
183         data[] = item[];
184 
185         return true;
186     }
187 
188     /***************************************************************************
189 
190         Reserves space for an item of <size> bytes on the queue but doesn't
191         fill the content. The caller is expected to fill in the content using
192         the returned slice.
193 
194         size = 0 is allowed.
195 
196         Params:
197             size = size of the space of the item that should be reserved
198 
199         Returns:
200             slice to the reserved space if it was successfully reserved, else
201             null. Returns non-null empty string if size = 0 and the item was
202             successfully pushed.
203 
204         Out:
205             The length of the returned array slice is size unless the slice is
206             null.
207 
208     ***************************************************************************/
209 
210     public void[] push ( size_t size )
211     out (slice)
212     {
213         assert(slice is null || slice.length == size,
214                classname(this) ~ "push: length of returned buffer not as requested");
215     }
216     body
217     {
218         return this.willFit(size) ? this.push_(size) : null;
219     }
220 
221 
222     /***************************************************************************
223 
224         Pops an item from the queue.
225 
226         Returns:
227             item popped from queue, may be null if queue is empty
228 
229     ***************************************************************************/
230 
231     public void[] pop ( )
232     {
233         return this.items ? this.pop_() : null;
234     }
235 
236 
237     /***************************************************************************
238 
239         Peeks at the item that would be popped next.
240 
241         Returns:
242             item that would be popped from queue,
243             may be null if queue is empty
244 
245     ***************************************************************************/
246 
247     public void[] peek ( )
248     {
249         if (this.items)
250         {
251             auto h = this.read_from;
252             auto d = h + Header.sizeof;
253             auto header = cast(Header*) this.data[h .. d].ptr;
254             return this.data[d .. d + header.length];
255         }
256         else
257         {
258             return null;
259         }
260     }
261 
262 
263     /***************************************************************************
264 
265         Returns:
266             number of bytes stored in queue
267 
268     ***************************************************************************/
269 
270     public override ulong used_space ( )
271     {
272         if (this.items == 0)
273         {
274             return 0;
275         }
276 
277         if (this.write_to > this.read_from)
278         {
279             return this.write_to - this.read_from;
280         }
281 
282         return this.gap - this.read_from + this.write_to;
283     }
284 
285 
286     /***************************************************************************
287 
288         Removes all items from the queue.
289 
290     ***************************************************************************/
291 
292     public override void clear ( )
293     {
294         super.clear();
295         this.items = 0;
296         this.gap = 0;
297     }
298 
299 
300     /***************************************************************************
301 
302         Tells how much space an item would take up when written into the queue.
303         (Including the size of the required item header.)
304 
305         Params:
306             item = item to calculate push size of
307 
308         Returns:
309             number of bytes the item would take up if pushed to the queue
310 
311     ***************************************************************************/
312 
313     static public size_t pushSize ( in void[] item )
314     {
315         return pushSize(item.length);
316     }
317 
318 
319     /***************************************************************************
320 
321         Tells how much space an item of the specified size would take up when
322         written into the queue. (Including the size of the required item
323         header.)
324 
325         Params:
326             bytes = number of bytes to calculate push size of
327 
328         Returns:
329             number of bytes the item would take up if pushed to the queue
330 
331     ***************************************************************************/
332 
333     static public size_t pushSize ( size_t bytes )
334     {
335         return Header.sizeof + bytes;
336     }
337 
338 
339     /***************************************************************************
340 
341         Finds out whether the provided item will fit in the queue. Also
342         considers the need of wrapping.
343 
344         Params:
345             item = item to check
346 
347         Returns:
348             true if the item fits, else false
349 
350     ***************************************************************************/
351 
352     bool willFit ( in void[] item )
353     {
354         return this.willFit(item.length);
355     }
356 
357 
358     /***************************************************************************
359 
360         Finds out whether the provided number of bytes will fit in the queue.
361         Also considers the need of wrapping.
362 
363         Note that this method internally adds on the extra bytes required for
364         the item header, so it is *not* necessary for the end-user to first
365         calculate the item's push size.
366 
367         Params:
368             bytes = size of item to check
369 
370         Returns:
371             true if the bytes fits, else false
372 
373     ***************************************************************************/
374 
375     public bool willFit ( size_t bytes )
376     {
377         size_t push_size = this.pushSize(bytes);
378 
379         if (this.read_from < this.write_to)
380         {
381             /*
382              *  Free space at either
383              *  - data[write_to .. $], the end, or
384              *  - data[0 .. read_from], the beginning, wrapping around.
385              */
386             return ((this.data.length - this.write_to) >= push_size) // Fits at the end.
387                    || (this.read_from >= push_size); // Fits at the start wrapping around.
388 
389         }
390         else if (this.items)
391         {
392             // Free space at data[write_to .. read_from].
393             return (this.read_from - this.write_to) >= push_size;
394         }
395         else
396         {
397             // Queue empty: data is the free space.
398             return push_size <= this.data.length;
399         }
400     }
401 
402     /***************************************************************************
403 
404         Writes the queue's state and contents to the given output stream in the
405         following format:
406 
407         - First ExportMetadata.sizeof bytes: Metadata header
408         - Next size_t.sizeof bytes: Number n of bytes of queue data (possibly 0)
409         - Next n bytes: Queue data
410 
411         Params:
412             output = output stream to write to
413 
414         Returns:
415             number of bytes written to output.
416 
417     ***************************************************************************/
418 
419     public size_t save ( OutputStream output )
420     {
421         size_t bytes = 0;
422 
423         this.save((in void[] meta, in void[] head, in void[] tail = null)
424         {
425             bytes += SimpleStreamSerializer.writeData(output, meta);
426             bytes += SimpleStreamSerializer.write(output, head.length + tail.length);
427             if (head.length) bytes += SimpleStreamSerializer.writeData(output, head);
428             if (tail.length) bytes += SimpleStreamSerializer.writeData(output, tail);
429         });
430 
431         return bytes;
432     }
433 
434     /***************************************************************************
435 
436         Calls the provided delegate, store(), to store the queue state and
437         contents.
438 
439         The caller may concatenate the contents of the three buffers because
440           - it is safe to assume that load() will use the same meta.length and
441           - load() expects to receive the head ~ tail data.
442 
443         Example:
444 
445         ---
446 
447             auto queue = new FlexibleRingQueue(/+ ... +/);
448 
449             // Populate the queue...
450 
451             void[] queue_save_data;
452 
453             // Save the populated queue in queue_save_data.
454 
455             queue.save(void[] meta, void[] head, void[] tail)
456             {
457                 queue_save_data = meta ~ head ~ tail;
458             }
459 
460             // Restore queue from queue_save_data.
461 
462             queue.load(void[] meta, void[] data)
463             {
464                 // It is safe to assume meta.length is the same as in the
465                 // queue.save() callback delegate.
466                 meta[] = queue_save_data[0 .. meta.length];
467 
468                 void[] queue_load_data = queue_save_data[meta.length .. $];
469                 data[] = queue_load_data[];
470                 return queue_load_data.length;
471             }
472 
473         ---
474 
475         This method can also be called to poll the amount of space required for
476         storing the current queue content, which is
477         meta.length + head.length + tail.length.
478 
479         The data produced by this method is accepted by the load() method of any
480         queue where queue.length >= head.length + tail.length.
481 
482         Params:
483             store = output delegate
484 
485     ***************************************************************************/
486 
487     public void save ( scope void delegate ( in void[] meta, in void[] head, in void[] tail = null ) store )
488     {
489         auto meta = ExportMetadata(this.items);
490 
491         if (this.read_from < this.write_to)
492         {
493             store((&meta)[0 .. 1], this.data[this.read_from .. this.write_to]);
494         }
495         else
496         {
497             store((&meta)[0 .. 1],
498                   this.data[this.read_from .. this.gap],
499                   this.data[0 .. this.write_to]);
500         }
501     }
502 
503     /***************************************************************************
504 
505         Restores the queue state and contents, reading from input and expecting
506         data previously written by save() to an output stream.
507 
508         Assumes that the input data do not exceed the queue capacity and throws
509         if they do. If this is possible and you want to handle this gracefully
510         (rather than getting an exception thrown), use the other overload of
511         this method.
512 
513         Params:
514             input = input stream
515 
516         Returns:
517             the number of bytes read from input.
518 
519         Throws:
520             ValidationError if the input data are inconsistent or exceed the
521             queue capacity. When throwing, the queue remains empty.
522 
523     ***************************************************************************/
524 
525     public size_t load ( InputStream input )
526     {
527         size_t bytes = 0;
528 
529         this.load((void[] meta, void[] data)
530         {
531             bytes += SimpleStreamSerializer.readData(input, meta);
532 
533             size_t data_length;
534             bytes += SimpleStreamSerializer.read(input, data_length);
535             enforce!(ValidationError)(data_length <= data.length,
536                 "Size of loaded data exceeds queue capacity");
537             bytes += SimpleStreamSerializer.readData(input, data[0 .. data_length]);
538             return data_length;
539         });
540 
541         return bytes;
542     }
543 
544     /***************************************************************************
545 
546         Restores the queue state and contents.
547 
548         Clears the queue, then calls the provided delegate, restore(), to
549         restore the queue state and contents and validates it.
550 
551         restore() should populate the meta and data buffer it receives with data
552         previously obtained from save():
553          - meta should be populated with the data from the store() meta
554            parameter,
555          - data[0 .. head.length + tail.length] should be populated with the
556            head ~ tail data as received by the store() delegate during save(),
557          - restore() should return head.length + tail.length.
558 
559         See the example in the documentation of save().
560 
561         Params:
562             restore = input delegate
563 
564         Throws:
565             ValidationError if the input data are inconsistent. When throwing,
566             the queue remains empty.
567 
568     ***************************************************************************/
569 
570     public void load ( scope size_t delegate ( void[] meta, void[] data ) restore )
571     {
572         this.clear();
573 
574         /*
575          * Pass this.data as the destination buffer to restore() and validate
576          * its content after restore() populated it. Should the validation fail,
577          * items, read_from, write_to and gap will remain 0 so the queue is
578          * empty and the invalid data in this.data are not harmful.
579          */
580 
581         ExportMetadata meta;
582         size_t end = restore((&meta)[0 .. 1], this.data);
583 
584         verify(end <= this.data.length,
585                idup(classname(this) ~ ".save(): restore callback expected to " ~
586                "return at most " ~ itoa(this.data.length) ~ ", not" ~ itoa(end)));
587 
588         this.validate(meta, this.data[0 .. end]);
589 
590         this.items    = meta.items;
591         this.write_to = end;
592         this.gap      = end;
593     }
594 
595     /***************************************************************************
596 
597         Pushes an item into the queue.
598 
599         Params:
600             item = data item to push
601 
602     ***************************************************************************/
603 
604     private void[] push_ ( size_t size )
605     out (slice)
606     {
607         assert(slice !is null,
608                classname(this) ~ "push_: returned a null slice");
609         assert(slice.length == size,
610                classname(this) ~ "push_: length of returned slice not as requested");
611         assert(this); // invariant
612     }
613     body
614     {
615         assert(this); // invariant
616         verify(this.willFit(size), classname(this) ~ ".push_: item will not fit");
617 
618         auto push_size = this.pushSize(size);
619 
620         /*
621          * read_from and write_to can have three different relationships:
622          *
623          * 1. write_to == read_from: The queue is empty, both are 0, the
624          *    record goes to data[write_to .. $].
625          *
626          * 2. write_to < read_from: The record goes in
627          *    data[write_to .. read_from].
628          *
629          * 3. read_from < write_to: The record goes either in
630          *   a) data[write_to .. $] if there is enough space or
631          *   b) data[0 .. read_from], wrapping around by setting
632          *      write_to = 0.
633          *
634          * The destination slice of data in case 3a is equivalent to case 1
635          * and in case 3b to case 2.
636          */
637 
638         if (this.read_from < this.write_to)
639         {
640             verify(this.gap == this.write_to);
641 
642             // Case 3: Check if the record fits in data[write_to .. $] ...
643             if (this.data.length - this.write_to < push_size)
644             {
645                 /*
646                  * ... no, we have to wrap around. The precondition claims
647                  * the record does fit so there must be enough space in
648                  * data[0 .. read_from].
649                  */
650                 verify(push_size <= this.read_from);
651                 this.write_to = 0;
652             }
653         }
654 
655         auto start = this.write_to;
656         this.write_to += push_size;
657 
658         if (this.write_to > this.read_from) // Case 1 or 3a.
659         {
660             this.gap = this.write_to;
661         }
662 
663         this.items++;
664 
665         void[] dst = this.data[start .. this.write_to];
666         *cast(Header*)dst[0 .. Header.sizeof].ptr = Header(size);
667         return dst[Header.sizeof .. $];
668     }
669 
670 
671     /***************************************************************************
672 
673         Pops an item from the queue.
674 
675         Returns:
676             item popped from queue
677 
678     ***************************************************************************/
679 
680     private void[] pop_ ( )
681     out (buffer)
682     {
683         assert(buffer, classname(this) ~ ".pop_: returned a null buffer");
684         assert(this); // invariant
685     }
686     body
687     {
688         assert(this); // invariant
689         verify(this.items > 0, classname(this) ~ ".pop_: no items in the queue");
690 
691         auto position = this.read_from;
692         this.read_from += Header.sizeof;
693 
694         // TODO: Error if this.data.length < this.read_from.
695 
696         auto header = cast(Header*)this.data[position .. this.read_from].ptr;
697 
698         // TODO: Error if this.data.length - this.read_from < header.length
699 
700         position = this.read_from;
701         this.read_from += header.length;
702         verify(this.read_from <= this.gap); // The invariant ensures that
703                                             // this.gap is not 0.
704 
705         this.items--; // The precondition prevents decrementing 0.
706 
707         scope (exit)
708         {
709             if (this.items)
710             {
711                 if (this.read_from == this.gap)
712                 {
713                     /*
714                      *  End of data, wrap around:
715                      *  1. Set the read position to the start of this.data.
716                      */
717                     this.read_from = 0;
718                     /*
719                      *  2. The write position is now the end of the data.
720                      *     If the queue is now empty, i.e. this.items == 0,
721                      *     write_to must be 0.
722                      */
723 
724                     verify(this.items || !this.write_to);
725                     this.gap = this.write_to;
726                 }
727             }
728             else // Popped the last record.
729             {
730                 verify(this.read_from == this.write_to);
731                 this.read_from = 0;
732                 this.write_to  = 0;
733                 this.gap       = 0;
734             }
735 
736         }
737 
738         return this.data[position .. this.read_from];
739     }
740 
741     /***************************************************************************
742 
743         Validates queue state and content data.
744 
745         Params:
746             meta = queue state data as imported by load()
747             data = queue content data
748 
749         Throws:
750             ValidationError if meta and/or data are inconsistent.
751 
752     ***************************************************************************/
753 
754     private static void validate ( ExportMetadata meta, in void[] data )
755     {
756         if (meta.items)
757         {
758             enforce!(ValidationError)(
759                 data.length,
760                 cast(istring) ("Expected data for a non-empty queue ("
761                     ~ itoa(meta.items) ~ " records)")
762             );
763 
764             enforce!(ValidationError)(
765                 data.length >= cast(size_t)meta.items * Header.sizeof,
766                 cast(istring) ("Queue data shorter than required minimum for " ~
767                     itoa(meta.items) ~ " records (got " ~ itoa(data.length) ~
768                     " bytes)")
769             );
770 
771             size_t pos = 0;
772 
773             for (uint i = 0; i < meta.items; i++)
774             {
775                 verify(pos <= data.length);
776 
777                 try
778                 {
779                     enforce!(ValidationError)(pos != data.length,
780                                               "Unexpected end of input data");
781 
782                     auto start = pos;
783                     pos += Header.sizeof;
784 
785                     enforce!(ValidationError)(
786                         pos <= data.length,
787                         cast(istring) ("End of queue data in the middle of" ~
788                             " the record header which starts at byte " ~
789                             itoa(start))
790                     );
791 
792                     auto header = cast(Const!(Header)*)data[start .. pos].ptr;
793 
794                     enforce!(ValidationError)(
795                         (data.length - pos) >= header.length,
796                         cast(istring) ("End of queue data in the middle of the" ~
797                             " queue record, record length = " ~
798                             itoa(header.length))
799                     );
800 
801                     pos += header.length;
802                 }
803                 catch (ValidationError e)
804                 {
805                     auto msg = "Error reading record " ~ itoa(i + i) ~ "/" ~
806                         itoa(meta.items) ~ ": " ~ e.message();
807                     e.msg = assumeUnique(msg);
808                     throw e;
809                 }
810             }
811 
812             verify(pos <= data.length);
813 
814             enforce!(ValidationError)(
815                 pos >= data.length,
816                 cast(istring) ("Queue data  too long (" ~ itoa(meta.items) ~
817                     " records, " ~ itoa(pos) ~ "/" ~ itoa(data.length) ~
818                     " bytes used)")
819             );
820         }
821         else
822         {
823             enforce!(ValidationError)(
824                 !data.length,
825                 cast(istring) ("Expected no data for an empty queue, not " ~
826                     itoa(data.length) ~ " bytes")
827             );
828         }
829     }
830 
831     /**************************************************************************/
832 
833     static class ValidationError: Exception
834     {
835         import ocean.core.Exception : DefaultExceptionCtor;
836         mixin DefaultExceptionCtor!();
837     }
838 }
839 /*******************************************************************************
840 
841     UnitTest
842 
843 *******************************************************************************/
844 
845 version ( UnitTest )
846 {
847     import ocean.io.model.IConduit: IConduit;
848 }
849 
850 unittest
851 {
852     static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10;
853 
854     scope queue = new FlexibleByteRingQueue(queue_size_1);
855     test(queue.free_space >= queue_size_1);
856     test(queue.is_empty);
857 
858     test(queue.free_space >= queue_size_1);
859     test(queue.is_empty);
860 
861     test(queue.push("Element 1"));
862     test(queue.pop() == "Element 1");
863     test(queue.get_items == 0);
864     test(!queue.free_space == 0);
865     test(queue.is_empty);
866     test(queue.used_space() == 0);
867 
868     test(queue.push("Element 1"));
869     test(queue.push("Element 2"));
870     test(queue.push("Element 3"));
871     test(queue.push("Element 4"));
872     test(queue.push("Element 5"));
873     test(queue.push("Element 6"));
874     test(queue.push("Element 7"));
875     test(queue.push("Element 8"));
876     test(queue.push("Element 9"));
877     test(queue.push("Element10"));
878 
879     test(queue.length == 10);
880     test(queue.free_space == 0);
881     test(!queue.is_empty);
882 
883     test(!queue.push("more"));
884     test(queue.length == 10);
885 
886     scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5);
887     middle.push("1");
888     middle.push("2");
889     middle.push("3");
890     middle.push("4");
891     test(middle.pop == "1");
892     test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof);
893     test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4);
894     test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
895 
896     test(middle.push("5"));
897     test(middle.push("6"));
898     test(middle.free_space() == 0);
899 }
900 
901 /*******************************************************************************
902 
903     Save/load test. Uses the unittest above, adding save/load sequences in the
904     middle. This test is separate to allow for changing the above push/pop test
905     without breaking the save/load test.
906 
907 *******************************************************************************/
908 
909 version ( UnitTest )
910 {
911     import ocean.io.device.MemoryDevice;
912 }
913 
914 unittest
915 {
916     // Buffers and callback functions for the delegate based save() & load().
917 
918     void[] saved_meta, saved_data;
919 
920     void store ( in void[] meta, in void[] head, in void[] tail )
921     {
922         saved_meta = meta.dup;
923         saved_data = head.dup ~ tail;
924     }
925 
926     size_t restore ( void[] meta, void[] data )
927     {
928         meta[] = saved_meta;
929         data[0 .. saved_data.length] = saved_data;
930         return saved_data.length;
931     }
932 
933     // Memory I/O stream device for the stream based save() & load().
934 
935     scope backup = new MemoryDevice;
936 
937     static immutable queue_size_1 = (9+FlexibleByteRingQueue.Header.sizeof)*10;
938 
939     scope queue = new FlexibleByteRingQueue(queue_size_1);
940     test(queue.free_space >= queue_size_1);
941     test(queue.is_empty);
942 
943     queue.save(&store);
944     queue.load(&restore);
945 
946     test(queue.free_space >= queue_size_1);
947     test(queue.is_empty);
948 
949     test(queue.push("Element 1"));
950     test(queue.pop() == "Element 1");
951     test(queue.get_items == 0);
952     test(!queue.free_space == 0);
953     test(queue.is_empty);
954     test(queue.used_space() == 0);
955 
956     test(queue.push("Element 1"));
957     test(queue.push("Element 2"));
958     test(queue.push("Element 3"));
959     test(queue.push("Element 4"));
960     test(queue.push("Element 5"));
961     test(queue.push("Element 6"));
962     test(queue.push("Element 7"));
963     test(queue.push("Element 8"));
964     test(queue.push("Element 9"));
965     test(queue.push("Element10"));
966 
967     // Save and restore the queue status in the middle of a test.
968 
969     queue.save(&store);
970     queue.clear();
971     queue.load(&restore);
972 
973     test(queue.length == 10);
974     test(queue.free_space == 0);
975     test(!queue.is_empty);
976 
977     test(!queue.push("more"));
978     test(queue.length == 10);
979 
980     scope middle = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*5);
981     middle.push("1");
982     middle.push("2");
983     middle.push("3");
984     middle.push("4");
985     test(middle.pop == "1");
986     test(middle.get_read_from == 1 + FlexibleByteRingQueue.Header.sizeof);
987     test(middle.get_write_to == (1+FlexibleByteRingQueue.Header.sizeof)*4);
988     test(middle.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
989 
990     // Save and restore the queue status in the middle of a test.
991 
992     middle.save(backup);
993     middle.clear();
994     backup.seek(0);
995     middle.load(backup);
996     test(backup.read(null) == backup.Eof);
997     backup.close();
998 
999     test(middle.push("5"));
1000     test(middle.push("6"));
1001     test(middle.free_space() == 0);
1002 }
1003 
1004 /*******************************************************************************
1005 
1006     Test for the corner case of saving the queue state after wrapping around.
1007 
1008 *******************************************************************************/
1009 
1010 unittest
1011 {
1012     enum Save {Dont = 0, Dg, Stream}
1013 
1014     // Buffers and callback functions for the delegate based save() & load().
1015 
1016     void[] saved_meta, saved_data;
1017 
1018     void store ( in void[] meta, in void[] head, in void[] tail )
1019     {
1020         saved_meta = meta.dup;
1021         saved_data = head.dup ~ tail;
1022     }
1023 
1024     size_t restore ( void[] meta, void[] data )
1025     {
1026         meta[] = saved_meta;
1027         data[0 .. saved_data.length] = saved_data;
1028         return saved_data.length;
1029     }
1030 
1031     // Memory I/O stream device for the stream based save() & load().
1032 
1033     scope backup = new MemoryDevice;
1034 
1035     void save_wraparound ( Save save )
1036     {
1037         static immutable Q_SIZE = 20;
1038         FlexibleByteRingQueue q = new FlexibleByteRingQueue(Q_SIZE);
1039 
1040         void push(uint n)
1041         in
1042         {
1043             test(n <= ubyte.max);
1044         }
1045         body
1046         {
1047             for (ubyte i = 0; i < n; i++)
1048             {
1049                 if (auto push_slice = q.push(1))
1050                 {
1051                     push_slice[] = (&i)[0 .. 1];
1052                 }
1053                 else
1054                 {
1055                     break;
1056                 }
1057 
1058                 // Save and restore the queue status after wrapping around.
1059 
1060                 if (q.get_write_to <= q.get_read_from) switch (save)
1061                 {
1062                     case save.Dont: break;
1063 
1064                     case save.Dg:
1065                         q.save(&store);
1066                         q.clear();
1067                         q.load(&restore);
1068                         break;
1069 
1070                     case save.Stream:
1071                         q.save(backup);
1072                         q.clear();
1073                         backup.seek(0);
1074                         q.load(backup);
1075                         test(backup.read(null) == backup.Eof);
1076                         backup.close();
1077                         break;
1078 
1079                     default: assert(false);
1080                 }
1081             }
1082         }
1083 
1084         void pop(uint n)
1085         {
1086             for (uint i = 0; i < n; i++)
1087             {
1088                 if (auto popped = q.pop())
1089                 {
1090                     test (popped.length == 1);
1091                     static ubyte[] unexpected = [cast(ubyte)Q_SIZE+1];
1092                     test (popped != unexpected);
1093                     popped[] = unexpected;
1094                 }
1095                 else
1096                 {
1097                     break;
1098                 }
1099             }
1100         }
1101 
1102         push(2);
1103         pop(1);
1104         push(2);
1105         pop(1);
1106         push(3);
1107         pop(4);
1108         pop(1);
1109     }
1110     save_wraparound(Save.Dont);
1111     save_wraparound(Save.Dg);
1112     save_wraparound(Save.Stream);
1113 }
1114 
1115 /*******************************************************************************
1116 
1117     Performance test
1118 
1119 *******************************************************************************/
1120 
1121 version ( UnitTest )
1122 {
1123     // Uncomment the next line to see UnitTest output
1124     // version = UnitTestVerbose;
1125 
1126     import ocean.core.Test;
1127 
1128     import ocean.math.random.Random;
1129     import ocean.time.StopWatch;
1130     import core.memory;
1131     import ocean.io.FilePath;
1132 }
1133 
1134 unittest
1135 {
1136      scope random = new Random();
1137 
1138     /***********************************************************************
1139 
1140         Test wrapping
1141 
1142     ***********************************************************************/
1143 
1144     {
1145         scope queue = new FlexibleByteRingQueue((1+FlexibleByteRingQueue.Header.sizeof)*3);
1146 
1147         test(queue.get_read_from == 0);
1148         test(queue.get_write_to == 0);
1149         // [___] r=0 w=0
1150         test(queue.push("1"));
1151 
1152         test(queue.get_read_from == 0);
1153         test(queue.get_write_to == 1+FlexibleByteRingQueue.Header.sizeof);
1154         test(queue.get_items == 1);
1155         test((cast(FlexibleByteRingQueue.Header*) queue.get_data.ptr).length == 1);
1156 
1157         {
1158             Const!(void)[] expected = "1";
1159             test(queue.get_data[FlexibleByteRingQueue.Header.sizeof ..
1160                               1+FlexibleByteRingQueue.Header.sizeof] ==
1161                                 expected);
1162         }
1163 
1164         // [#__] r=0 w=5
1165         test(queue.push("2"));
1166 
1167         // [##_] r=0 w=10
1168         test(queue.push("3"));
1169 
1170         // [###] r=0 w=15
1171         test(!queue.push("4"));
1172         test(queue.free_space == 0);
1173         test(queue.pop() == "1");
1174 
1175         // [_##] r=5 w=15
1176         test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof);
1177         test(queue.pop() == "2");
1178 
1179         // [__#] r=10 w=15
1180         test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*2);
1181         test(queue.get_write_to == queue.get_data.length);
1182         test(queue.push("1"));
1183 
1184         // [#_#] r=10 w=5
1185         test(queue.free_space() == 1+FlexibleByteRingQueue.Header.sizeof);
1186         test(queue.get_write_to == queue.pushSize("2".length));
1187         test(queue.push("2"));
1188        // Stdout.formatln("gap is {}, free is {}, write is {}", queue.gap, queue.free_space(),queue.write_to);
1189 
1190 
1191         // [###] r=10 w=10
1192         test(queue.free_space == 0);
1193         test(queue.pop() == "3");
1194 
1195         // [##_] r=15/0 w=10
1196         test(queue.free_space() == (1+FlexibleByteRingQueue.Header.sizeof)*1);
1197         test(queue.pop() == "1");
1198 
1199         // [_#_] r=5 w=10
1200         test(queue.pop() == "2");
1201 
1202         // [__] r=0 w=0
1203         test(queue.is_empty);
1204         test(queue.push("1"));
1205 
1206         // [#__] r=0 w=5
1207         test(queue.push("2#"));
1208 
1209         // [#$_] r=0 w=11 ($ = 2 bytes)
1210         test(queue.pop() == "1");
1211 
1212         // [_$_] r=5 w=11
1213         test(queue.push("1"));
1214 
1215         // [#$_] r=5 w=5
1216         test(!queue.push("2"));
1217         test(queue.pop() == "2#");
1218 
1219         // [#__] r=11 w=5
1220         test(queue.push("2")); // this needs to be wrapped now
1221 
1222         // [##_] r=11 w=10
1223     }
1224 }
1225 
1226 
1227 // Test for a specific bug that caused garbled loglines due to an old wrong
1228 // willFit() function
1229 unittest
1230 {
1231     auto q = new FlexibleByteRingQueue(45);
1232 
1233     // Setup conditions
1234     test(q.push("123456")); // w = 14
1235     test(q.push("12345678")); // w = 30
1236     test(q.push("123456")); // w = 44
1237     test(q.pop() == "123456");
1238     test(q.pop() == "12345678"); // r == 30
1239     test(q.push("12345678123456781234")); // r = 30, gap = 44
1240 
1241     auto test_push = "123456789.....16";
1242 
1243     // Make sure the bugs conditions are present
1244     test!(">")(q.get_read_from, q.get_write_to);
1245     test!("<")(q.get_read_from, q.gap);
1246     test!(">")(q.pushSize(test_push.length) + q.get_write_to, q.get_data.length);
1247 
1248     // Do the actual test
1249     test(!q.push(test_push));
1250 
1251     test!("==")(q.pop(), "123456");
1252     test!("==")(q.pop(), "12345678123456781234");
1253 }