1 /*******************************************************************************
2 
3         Copyright:
4             Copyright (c) 2004 Kris Bell.
5             Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH.
6             All rights reserved.
7 
8         License:
9             Tango Dual License: 3-Clause BSD License / Academic Free License v3.0.
10             See LICENSE_TANGO.txt for details.
11 
12         Version:
13             Mar 2004: Initial release$(BR)
14             Dec 2006: Outback release
15 
16         Authors: Kris Bell
17 
18 *******************************************************************************/
19 
20 module ocean.io.stream.Buffered;
21 
22 import core.stdc..string;
23 
24 import ocean.meta.types.Qualifiers;
25 version (unittest)  import ocean.core.Test;
26 import ocean.core.Verify;
27 
28 public import ocean.io.model.IConduit;
29 
30 import ocean.io.device.Conduit;
31 version (unittest) import ocean.io.device.MemoryDevice;
32 
33 
34 /// Shorthand aliases
35 public alias BufferedInput  Bin;
36 /// ditto
37 public alias BufferedOutput Bout;
38 
39 
40 /*******************************************************************************
41 
42     Buffers the flow of data from a upstream input
43 
44     A downstream neighbour can locate and use this buffer instead of creating
45     another instance of their own.
46 
47     Note:
48         upstream is closer to the source, and downstream is further away
49 
50 *******************************************************************************/
51 
52 public class BufferedInput : InputFilter, InputBuffer
53 {
54     /// Clear/flush are the same
55     public alias flush             clear;
56     /// Access the source.
57     public alias InputFilter.input input;
58 
59     private void[]        data;             // The raw data buffer.
60     private size_t        index;            // Current read position.
61     private size_t        extent;           // Limit of valid content.
62 
63     invariant ()
64     {
65         assert(this.index <= this.extent);
66         assert(this.extent <= this.data.length);
67     }
68 
69     /***************************************************************************
70 
71         Construct a buffer.
72 
73         Construct a Buffer upon the provided input stream.
74 
75         Params:
76             stream = An input stream.
77 
78     ***************************************************************************/
79 
80     public this (InputStream stream)
81     {
82         verify(stream !is null);
83         this(stream, stream.conduit.bufferSize);
84     }
85 
86     /***************************************************************************
87 
88         Construct a buffer.
89 
90         Construct a Buffer upon the provided input stream.
91 
92         Params:
93             stream = An input stream.
94             capacity = Desired buffer capacity.
95 
96     ***************************************************************************/
97 
98     public this (InputStream stream, size_t capacity)
99     {
100         this.set(new ubyte[capacity], 0);
101         super(this.source = stream);
102     }
103 
104     /***************************************************************************
105 
106         Attempt to share an upstream Buffer, and create an instance
107         where there's not one available.
108 
109         If an upstream Buffer instances is visible, it will be shared.
110         Otherwise, a new instance is created based upon the bufferSize
111         exposed by the stream endpoint (conduit).
112 
113         Params:
114             stream = An input stream.
115 
116     ***************************************************************************/
117 
118     public static InputBuffer create (InputStream stream)
119     {
120         auto source = stream;
121         auto conduit = source.conduit;
122         while (cast(Mutator) source is null)
123         {
124             auto b = cast(InputBuffer) source;
125             if (b)
126                 return b;
127             if (source is conduit)
128                 break;
129             source = source.input;
130             verify(source !is null);
131         }
132 
133         return new BufferedInput(stream, conduit.bufferSize);
134     }
135 
136     /***************************************************************************
137 
138         Place more data from the source stream into this buffer, and returns
139         the number of bytes added.
140 
141         This does not compress the current buffer content, so consider doing
142         that explicitly.
143 
144         Returns:
145             Number of bytes added, which will be Eof when there is no further
146             input available.
147             Zero is also a valid response, meaning no data was actually added.
148 
149     ***************************************************************************/
150 
151     public final size_t populate ()
152     {
153         return this.writer(&this.input.read);
154     }
155 
156     /***************************************************************************
157 
158         Returns:
159             a void[] slice of the buffer from start to end,
160             where end is exclusive.
161 
162     ***************************************************************************/
163 
164     public final void[] opSlice (size_t start, size_t end)
165     {
166         verify(start <= this.extent && end <= this.extent && start <= end);
167         return this.data[start .. end];
168     }
169 
170     /***************************************************************************
171 
172         Retrieve the valid content.
173 
174         Returns:
175             A void[] slice of the buffer, from the current position up to
176             the limit of valid content.
177             The content remains in the buffer for future extraction.
178 
179     ***************************************************************************/
180 
181     public final void[] slice ()
182     {
183         return  this.data[this.index .. this.extent];
184     }
185 
186     /***************************************************************************
187 
188         Access buffer content.
189 
190         Read a slice of data from the buffer, loading from the
191         conduit as necessary. The specified number of bytes is
192         sliced from the buffer, and marked as having been read
193         when the 'eat' parameter is set true. When 'eat' is set
194         false, the read position is not adjusted.
195 
196         The slice cannot be larger than the size of the buffer: use method
197         `fill(void[])` instead where you simply want the content copied,
198         or use `conduit.read()` to extract directly from an attached conduit
199         Also if you need to retain the slice, then it should be `.dup`'d
200         before the buffer is compressed or repopulated.
201 
202         Params:
203             size =  Number of bytes to access.
204             eat =   Whether to consume the content or not.
205 
206         Returns:
207             The corresponding buffer slice when successful, or
208             null if there's not enough data available (Eof; Eob).
209 
210         Examples:
211         ---
212             // create a buffer with some content
213             auto buffer = new Buffer("hello world");
214 
215             // consume everything unread
216             auto slice = buffer.slice(buffer.this.readable());
217         ---
218 
219     ***************************************************************************/
220 
221     public final void[] slice (size_t size, bool eat = true)
222     {
223         if (size > this.readable())
224         {
225             // make some space? This will try to leave as much content
226             // in the buffer as possible, such that entire records may
227             // be aliased directly from within.
228             if (size > (this.data.length - this.index))
229             {
230                 if (size <= this.data.length)
231                     this.compress();
232                 else
233                     this.conduit.error("input buffer is empty");
234             }
235 
236             // populate tail of buffer with new content
237             do {
238                 if (this.writer(&this.source.read) is Eof)
239                     this.conduit.error("end-of-flow whilst reading");
240             } while (size > this.readable());
241         }
242 
243         auto i = this.index;
244         if (eat)
245             this.index += size;
246         return this.data[i .. i + size];
247     }
248 
249     /***************************************************************************
250 
251         Read directly from this buffer.
252 
253         Exposes the raw data buffer at the current _read position.
254         The delegate is provided with a void[] representing the available
255         data, and should return zero to leave the current _read position
256         intact.
257 
258         If the delegate consumes data, it should return the number of
259         bytes consumed; or IConduit.Eof to indicate an error.
260 
261         Params:
262             dg = Callback to provide buffer access to.
263 
264         Returns:
265             the delegate's return value
266 
267     ***************************************************************************/
268 
269     public final size_t reader (scope size_t delegate (const(void)[]) dg)
270     {
271         auto count = dg(this.data[this.index .. this.extent]);
272 
273         if (count != Eof)
274         {
275             this.index += count;
276             verify(this.index <= this.extent);
277         }
278         return count;
279     }
280 
281     /***************************************************************************
282 
283         Write into this buffer.
284 
285         Exposes the raw data buffer at the current _write position,
286         The delegate is provided with a void[] representing space
287         available within the buffer at the current _write position.
288 
289         The delegate should return the appropriate number of bytes
290         if it writes valid content, or IConduit.Eof on error.
291 
292         Params:
293             dg = The callback to provide buffer access to.
294 
295         Returns:
296             the delegate return's value.
297 
298     ***************************************************************************/
299 
300     public size_t writer (scope size_t delegate (void[]) dg)
301     {
302         auto count = dg(this.data[this.extent .. $]);
303 
304         if (count != Eof)
305         {
306             this.extent += count;
307             verify(this.extent <= this.data.length);
308         }
309         return count;
310     }
311 
312     /***************************************************************************
313 
314         Transfer content into the provided dst.
315 
316         Populates the provided array with content. We try to
317         satisfy the request from the buffer content, and read
318         directly from an attached conduit when the buffer is empty.
319 
320         Params:
321             dst = Destination of the content.
322 
323         Returns:
324             the number of bytes read, which may be less than `dst.length`.
325             Eof is returned when no further content is available.
326 
327     ***************************************************************************/
328 
329     public final override size_t read (void[] dst)
330     {
331         size_t content = this.readable;
332         if (content)
333         {
334             if (content >= dst.length)
335                 content = dst.length;
336 
337             // transfer buffer content
338             dst[0 .. content] = this.data[this.index .. this.index + content];
339             this.index += content;
340         }
341         // pathological cases read directly from conduit
342         else if (dst.length > this.data.length)
343             content = this.source.read(dst);
344         else
345         {
346             if (this.writable is 0)
347                 this.index = this.extent = 0;  // same as clear, without call-chain
348 
349             // keep buffer partially populated
350             if ((content = this.writer(&this.source.read)) != Eof && content > 0)
351                 content = this.read(dst);
352         }
353         return content;
354     }
355 
356     /**************************************************************************
357 
358         Fill the provided buffer
359 
360         Returns:
361             the number of bytes actually read, which will be less than
362             `dst.length` when Eof has been reached and Eof thereafter.
363 
364         Params:
365             dst = Where data should be placed.
366             exact = Whether to throw an exception when dst is not
367                     filled (an Eof occurs first). Defaults to false.
368 
369     **************************************************************************/
370 
371     public final size_t fill (void[] dst, bool exact = false)
372     {
373         size_t len = 0;
374 
375         while (len < dst.length)
376         {
377             size_t i = this.read(dst[len .. $]);
378             if (i is Eof)
379             {
380                 if (exact && len < dst.length)
381                     this.conduit.error("end-of-flow whilst reading");
382                 return (len > 0) ? len : Eof;
383             }
384             len += i;
385         }
386         return len;
387     }
388 
389     /***************************************************************************
390 
391         Move the current read location.
392 
393         Skips ahead by the specified number of bytes, streaming from
394         the associated conduit as necessary.
395 
396         Can also reverse the read position by 'size' bytes, when size
397         is negative. This may be used to support lookahead operations.
398         Note that a negative size will fail where there is not sufficient
399         content available in the buffer (can't _skip beyond the beginning).
400 
401         Params:
402             size = The number of bytes to move.
403 
404         Returns:
405             `true` if successful, `false` otherwise.
406 
407     ***************************************************************************/
408 
409     public final bool skip (int size)
410     {
411         if (size < 0)
412         {
413             size = -size;
414             if (this.index >= size)
415             {
416                 this.index -= size;
417                 return true;
418             }
419             return false;
420         }
421         return this.slice(size) !is null;
422     }
423 
424     /***************************************************************************
425 
426         Move the current read location.
427 
428     ***************************************************************************/
429 
430     public final override long seek (long offset, Anchor start = Anchor.Begin)
431     {
432         if (start is Anchor.Current)
433         {
434             // handle this specially because we know this is
435             // buffered - we should take into account the buffer
436             // position when seeking
437             offset -= this.readable;
438             auto bpos = offset + this.limit;
439 
440             if (bpos >= 0 && bpos < this.limit)
441             {
442                 // the new position is within the current
443                 // buffer, skip to that position.
444                 this.skip(cast(int) bpos - cast(int) position);
445 
446                 // see if we can return a valid offset
447                 auto pos = this.source.seek(0, Anchor.Current);
448                 if (pos != Eof)
449                     return pos - this.readable();
450                 return Eof;
451             }
452             // else, position is outside the buffer. Do a real
453             // seek using the adjusted position.
454         }
455 
456         this.clear();
457         return this.source.seek(offset, start);
458     }
459 
460     /***************************************************************************
461 
462         Iterator support.
463 
464         Upon success, the delegate should return the byte-based index of
465         the consumed pattern (tail end of it).
466         Failure to match a pattern should be indicated by returning an Eof
467 
468         Each pattern is expected to be stripped of the delimiter.
469         An end-of-file condition causes trailing content to be
470         placed into the token. Requests made beyond Eof result
471         in empty matches (length is zero).
472 
473         Additional iterator and/or reader instances
474         will operate in lockstep when bound to a common buffer.
475 
476         Params:
477             scan = The delegate to invoke with the current content.
478 
479         Returns:
480             `true` if a token was isolated, `false` otherwise.
481 
482     ***************************************************************************/
483 
484     public final bool next (scope size_t delegate (const(void)[]) scan)
485     {
486         while (this.reader(scan) is Eof)
487         {
488             // did we start at the beginning?
489             if (this.position)
490                 // yep - move partial token to start of buffer
491                 this.compress();
492             // no more space in the buffer?
493             else if (this.writable is 0)
494                 this.extend();
495 
496             verify(this.writable() > 0);
497 
498             // read another chunk of data
499             if (this.writer(&this.source.read) is Eof)
500                 return false;
501         }
502         return true;
503     }
504 
505     /***************************************************************************
506 
507         Reserve the specified space within the buffer, compressing
508         existing content as necessary to make room.
509 
510         Returns:
511             the current read point, after compression if that was required.
512 
513     ***************************************************************************/
514 
515     public final size_t reserve (size_t space)
516     {
517         verify(space < this.data.length);
518 
519         if ((this.data.length - this.index) < space)
520             this.compress();
521         return this.index;
522     }
523 
524     /***************************************************************************
525 
526         Compress buffer space.
527 
528         Limit is set to the amount of data remaining.
529         Position is always reset to zero.
530 
531         If we have some data left after an export, move it to the front of
532         the buffer and set position to be just after the remains.
533         This is for supporting certain conduits which choose to write just
534         the initial portion of a request.
535 
536         Returns:
537             The buffer instance.
538 
539     ***************************************************************************/
540 
541     public final BufferedInput compress ()
542     {
543         auto r = this.readable();
544 
545         if (this.index > 0 && r > 0)
546             // content may overlap ...
547             memmove(&data[0], &data[this.index], r);
548 
549         this.index = 0;
550         this.extent = r;
551         return this;
552     }
553 
554     /***************************************************************************
555 
556         Drain buffer content to the specific conduit.
557 
558         Returns:
559             the number of bytes written, or Eof.
560 
561         Note:
562             Write as much of the buffer that the associated conduit can consume.
563             The conduit is not obliged to consume all content,
564             so some may remain within the buffer.
565 
566     ***************************************************************************/
567 
568     public final size_t drain (OutputStream dst)
569     {
570         verify(dst !is null);
571 
572         size_t ret = this.reader(&dst.write);
573         this.compress();
574         return ret;
575     }
576 
577     /***************************************************************************
578 
579         Access buffer limit.
580 
581         Each buffer has a capacity, a limit, and a position.
582         The capacity is the maximum content a buffer can contain,
583         limit represents the extent of valid content, and position marks
584         the current read location.
585 
586         Returns:
587             the limit of readable content within this buffer.
588 
589     ***************************************************************************/
590 
591     public final size_t limit ()
592     {
593         return this.extent;
594     }
595 
596    /***************************************************************************
597 
598         Access buffer capacity.
599 
600         Each buffer has a capacity, a limit, and a position.
601         The capacity is the maximum content a buffer can contain, limit
602         represents the extent of valid content, and position marks
603         the current read location.
604 
605         Returns:
606             the maximum capacity of this buffer.
607 
608    ***************************************************************************/
609 
610     public final size_t capacity ()
611     {
612         return this.data.length;
613     }
614 
615     /***************************************************************************
616 
617         Access buffer read position.
618 
619         Each buffer has a capacity, a limit, and a position.
620         The capacity is the maximum content a buffer can contain, limit
621         represents the extent of valid content, and position marks
622         the current read location.
623 
624         Returns:
625             the current read-position within this buffer.
626 
627     ***************************************************************************/
628 
629     final size_t position ()
630     {
631         return this.index;
632     }
633 
634     /***************************************************************************
635 
636         Available content.
637 
638         Returns:
639             count of _readable bytes remaining in buffer.
640             This is calculated simply as `this.limit() - this.position()`.
641 
642     ***************************************************************************/
643 
644     public final size_t readable ()
645     {
646         return this.extent - this.index;
647     }
648 
649     /***************************************************************************
650 
651         Cast to a target type without invoking the wrath of the
652         runtime checks for misalignment. Instead, we truncate the
653         array length.
654 
655     ***************************************************************************/
656 
657     static inout(T)[] convert (T) (inout(void)[] x)
658     {
659         return (cast(inout(T)*) x.ptr) [0 .. (x.length / T.sizeof)];
660     }
661 
662     /***************************************************************************
663 
664         Clear buffer content.
665 
666         Note:
667             Reset 'position' and 'limit' to zero. This effectively
668             clears all content from the buffer.
669 
670     ***************************************************************************/
671 
672     public final override BufferedInput flush ()
673     {
674         this.index = this.extent = 0;
675 
676         // clear the filter chain also
677         if (this.source)
678             super.flush();
679         return this;
680     }
681 
682     /***************************************************************************
683 
684         Set the input stream.
685 
686     ***************************************************************************/
687 
688     public final void input (InputStream source)
689     {
690         this.source = source;
691     }
692 
693     /***************************************************************************
694 
695         Load the bits from a stream, up to an indicated length, and
696         return them all in an array.
697 
698         The function may consume more than the indicated size where additional
699         data is available during a block read operation, but will not wait for
700         more than specified.
701         An Eof terminates the operation.
702 
703         Returns:
704             an array representing the content
705 
706         Throws:
707             `IOException` on error.
708 
709     ***************************************************************************/
710 
711     public final override void[] load (size_t max = size_t.max)
712     {
713         this.load(super.input, super.conduit.bufferSize, max);
714         return this.slice;
715     }
716 
717     /***************************************************************************
718 
719         Import content from the specified conduit, expanding as necessary
720         up to the indicated maximum or until an Eof occurs.
721 
722         Returns:
723             the number of bytes contained.
724 
725     ***************************************************************************/
726 
727     private size_t load (InputStream src, size_t increment, size_t max)
728     {
729         size_t len, count;
730 
731         // make some room
732         this.compress();
733 
734         // explicitly resize?
735         if (max != max.max)
736             if ((len = this.writable()) < max)
737                 increment = max - len;
738 
739         while (count < max)
740         {
741             if (!this.writable())
742                 this.data.length = (this.data.length + increment);
743             if ((len = this.writer(&src.read)) is Eof)
744                 break;
745             else
746                 count += len;
747         }
748         return count;
749     }
750 
751     /***************************************************************************
752 
753         Reset the buffer content.
754 
755         Set the backing array with some content readable.
756         Writing to this will either flush it to an associated conduit,
757         or raise an Eof condition.
758         Use clear() to reset the content (make it all writable).
759 
760         Params:
761             data =     The backing array to buffer within.
762             readable = The number of bytes within data considered valid.
763 
764         Returns:
765             The buffer instance.
766 
767     ***************************************************************************/
768 
769     private final BufferedInput set (void[] data, size_t readable)
770     {
771         this.data = data;
772         this.extent = readable;
773 
774         // reset to start of input
775         this.index = 0;
776 
777         return this;
778     }
779 
780     /***************************************************************************
781 
782         Available space.
783 
784         Returns:
785             count of _writable bytes available in buffer.
786             This is calculated simply as `this.capacity() - this.limit()`.
787 
788     ***************************************************************************/
789 
790     private final size_t writable ()
791     {
792         return this.data.length - this.extent;
793     }
794 
795     /***************************************************************************
796 
797         Extend the buffer by half of its size
798 
799     ***************************************************************************/
800 
801     private void extend ()
802     {
803         this.data.length = this.data.length + (this.data.length / 2);
804     }
805 }
806 
807 
808 /*******************************************************************************
809 
810     Buffers the flow of data from a upstream output.
811 
812     A downstream neighbour can locate and use this buffer instead of creating
813     another instance of their own.
814 
815     Don't forget to flush() buffered content before closing.
816 
817     Note:
818         upstream is closer to the source, and downstream is further away
819 
820 *******************************************************************************/
821 
822 public class BufferedOutput : OutputFilter, OutputBuffer
823 {
824     /// access the sink
825     alias OutputFilter.output output;
826 
827     private void[]        data;             // the raw data buffer
828     private size_t        index;            // current read position
829     private size_t        extent;           // limit of valid content
830     private size_t        dimension;        // maximum extent of content
831 
832     /// Notifier that will be called on flush.
833     private void delegate() flush_notifier;
834 
835     invariant ()
836     {
837         assert (this.index <= this.extent);
838         assert (this.extent <= this.dimension);
839     }
840 
841     /***************************************************************************
842 
843         Construct a Buffer upon the provided input stream.
844 
845         Params:
846             stream = An input stream.
847             flush_notifier = user specified delegate called after the content
848                              of the buffer has been flushed to upstream output.
849 
850     ***************************************************************************/
851 
852     public this (OutputStream stream, scope void delegate() flush_notifier = null)
853     {
854         verify(stream !is null);
855         this(stream, stream.conduit.bufferSize, flush_notifier);
856     }
857 
858     /***************************************************************************
859 
860         Construct a Buffer upon the provided input stream.
861 
862         Params:
863             stream = An input stream.
864             capacity = Desired buffer capacity.
865             flush_notifier = user specified delegate called after the content
866                              of the buffer has been flushed to upstream output.
867 
868     ***************************************************************************/
869 
870     public this (OutputStream stream, size_t capacity,
871                  scope void delegate() flush_notifier = null)
872     {
873         this.set(new ubyte[capacity], 0);
874         this.flush_notifier = flush_notifier;
875         super(this.sink = stream);
876     }
877 
878     /***************************************************************************
879 
880         Attempts to share an upstream BufferedOutput, and creates a new
881         instance where there's not a shared one available.
882 
883         Where an upstream instance is visible it will be returned.
884         Otherwise, a new instance is created based upon the bufferSize
885         exposed by the associated conduit
886 
887         Params:
888             stream = An output stream.
889 
890     ***************************************************************************/
891 
892     public static OutputBuffer create (OutputStream stream)
893     {
894         auto sink = stream;
895         auto conduit = sink.conduit;
896         while (cast(Mutator) sink is null)
897         {
898             auto b = cast(OutputBuffer) sink;
899             if (b)
900                 return b;
901             if (sink is conduit)
902                 break;
903             sink = sink.output;
904             verify(sink !is null);
905         }
906 
907         return new BufferedOutput(stream, conduit.bufferSize);
908     }
909 
910     /***************************************************************************
911 
912         Retrieve the valid content.
913 
914         Returns:
915             A void[] slice of the buffer.
916 
917         Returns:
918             a slice of the buffer, from the current position up to the limit
919             of valid content.
920             The content remains in the buffer for future extraction.
921 
922     ***************************************************************************/
923 
924     public final void[] slice ()
925     {
926         return this.data[this.index .. this.extent];
927     }
928 
929     /***************************************************************************
930 
931         Emulate OutputStream.write().
932 
933         Appends src content to the buffer, flushing to an attached conduit
934         as necessary. An IOException is thrown upon write failure.
935 
936         Params:
937             src = The content to write.
938 
939         Returns:
940             the number of bytes written, which may be less than provided
941             (conceptually).
942 
943     ***************************************************************************/
944 
945     public final override size_t write (const(void)[] src)
946     {
947         this.append(src.ptr, src.length);
948         return src.length;
949     }
950 
951     /***************************************************************************
952 
953         Append content.
954 
955         Append an array to this buffer, flush to the conduit as necessary.
956         This is often used in lieu of a Writer.
957 
958         Params:
959             src = The content to _append.
960 
961         Returns:
962             a chaining reference if all content was written.
963 
964         Throws:
965             an IOException indicating Eof or Eob if not.
966 
967     ***************************************************************************/
968 
969     public final BufferedOutput append (const(void)[] src)
970     {
971         return this.append(src.ptr, src.length);
972     }
973 
974     /***************************************************************************
975 
976         Append content.
977 
978         Append an array to this buffer, flush to the conduit as necessary.
979         This is often used in lieu of a Writer.
980 
981         Params:
982             src = The content to _append.
983             length = The number of bytes in src.
984 
985         Returns:
986             a chaining reference if all content was written.
987 
988         Throws:
989             an IOException indicating Eof or Eob if not.
990 
991     ***************************************************************************/
992 
993     public final BufferedOutput append (const(void)* src, size_t length)
994     {
995         if (length > this.writable)
996         {
997             this.flush();
998 
999             // check for pathological case
1000             if (length > this.dimension)
1001                 do {
1002                     auto written = this.sink.write(src [0 .. length]);
1003                     if (written is Eof)
1004                         this.conduit.error("end-of-flow whilst writing");
1005                     length -= written;
1006                     src += written;
1007                 } while (length > this.dimension);
1008         }
1009 
1010         // avoid "out of bounds" test on zero length
1011         if (length)
1012         {
1013             // content may overlap ...
1014             memmove(&this.data[this.extent], src, length);
1015             this.extent += length;
1016         }
1017         return this;
1018     }
1019 
1020     /***************************************************************************
1021 
1022         Available space.
1023 
1024         Returns:
1025             count of _writable bytes available in buffer.
1026             This is calculated as `capacity() - limit()`.
1027 
1028     ***************************************************************************/
1029 
1030     public final size_t writable ()
1031     {
1032         return this.dimension - this.extent;
1033     }
1034 
1035     /***************************************************************************
1036 
1037         Access buffer limit.
1038 
1039         Each buffer has a capacity, a limit, and a position.
1040         The capacity is the maximum content a buffer can contain,
1041         limit represents the extent of valid content, and position marks
1042         the current read location.
1043 
1044         Returns:
1045             the limit of readable content within this buffer.
1046 
1047     ***************************************************************************/
1048 
1049     public final size_t limit ()
1050     {
1051         return this.extent;
1052     }
1053 
1054     /***************************************************************************
1055 
1056         Access buffer capacity.
1057 
1058         Each buffer has a capacity, a limit, and a position.
1059         The capacity is the maximum content a buffer can contain,
1060         limit represents the extent of valid content, and position marks
1061         the current read location.
1062 
1063         Returns:
1064             the maximum capacity of this buffer.
1065 
1066     ***************************************************************************/
1067 
1068     public final size_t capacity ()
1069     {
1070         return this.dimension;
1071     }
1072 
1073     /***************************************************************************
1074 
1075        Truncate the buffer within its extent.
1076 
1077         Returns:
1078             `true` if the new length is valid, `false` otherwise.
1079 
1080     ***************************************************************************/
1081 
1082     public final bool truncate (size_t length)
1083     {
1084         if (length <= this.data.length)
1085         {
1086             this.extent = length;
1087             return true;
1088         }
1089         return false;
1090     }
1091 
1092     /***************************************************************************
1093 
1094         Cast to a target type without invoking the wrath of the
1095         runtime checks for misalignment. Instead, we truncate the
1096         array length.
1097 
1098     ***************************************************************************/
1099 
1100     static T[] convert(T)(void[] x)
1101     {
1102         return (cast(T*) x.ptr) [0 .. (x.length / T.sizeof)];
1103     }
1104 
1105     /***************************************************************************
1106 
1107         Flush all buffer content to the specific conduit.
1108 
1109         Flush the contents of this buffer.
1110         This will block until all content is actually flushed via the associated
1111         conduit, whereas `drain()` will not.
1112 
1113        Throws:
1114             an IOException on premature Eof.
1115 
1116     ***************************************************************************/
1117 
1118     final override BufferedOutput flush ()
1119     {
1120         while (this.readable() > 0)
1121         {
1122             auto ret = this.reader(&this.sink.write);
1123             if (ret is Eof)
1124                 this.conduit.error("end-of-flow whilst writing");
1125         }
1126 
1127         // flush the filter chain also
1128         this.clear();
1129         super.flush;
1130 
1131         if (this.flush_notifier)
1132         {
1133             this.flush_notifier();
1134         }
1135 
1136         return this;
1137     }
1138 
1139     /***************************************************************************
1140 
1141         Copy content via this buffer from the provided src conduit.
1142 
1143         The src conduit has its content transferred through this buffer via
1144         a series of fill & drain operations,
1145         until there is no more content available.
1146         The buffer content should be explicitly flushed by the caller.
1147 
1148        Throws:
1149             an IOException on premature Eof.
1150 
1151     ***************************************************************************/
1152 
1153     final override BufferedOutput copy (InputStream src, size_t max = -1)
1154     {
1155         size_t chunk, copied;
1156 
1157         while (copied < max && (chunk = this.writer(&src.read)) != Eof)
1158         {
1159             copied += chunk;
1160 
1161             // don't drain until we actually need to
1162             if (this.writable is 0)
1163                 if (this.drain(this.sink) is Eof)
1164                     this.conduit.error("end-of-flow whilst writing");
1165         }
1166         return this;
1167     }
1168 
1169     /***************************************************************************
1170 
1171         Flushes the buffer and closes the stream.
1172 
1173     ***************************************************************************/
1174 
1175     final override void close ( )
1176     {
1177         this.flush();
1178         super.close();
1179     }
1180 
1181     /***************************************************************************
1182 
1183         Drain buffer content to the specific conduit.
1184 
1185         Write as much of the buffer that the associated conduit can consume.
1186         The conduit is not obliged to consume all content,
1187         so some may remain within the buffer.
1188 
1189         Returns:
1190             the number of bytes written, or Eof.
1191 
1192    ***************************************************************************/
1193 
1194     final size_t drain (OutputStream dst)
1195     {
1196         verify(dst !is null);
1197 
1198         size_t ret = this.reader(&dst.write);
1199         this.compress();
1200         return ret;
1201     }
1202 
1203     /***************************************************************************
1204 
1205         Clear buffer content.
1206 
1207         Reset 'position' and 'limit' to zero.
1208         This effectively clears all content from the buffer.
1209 
1210     ***************************************************************************/
1211 
1212     final BufferedOutput clear ()
1213     {
1214         this.index = this.extent = 0;
1215         return this;
1216     }
1217 
1218     /***************************************************************************
1219 
1220         Set the output stream.
1221 
1222     ***************************************************************************/
1223 
1224     final void output (OutputStream sink)
1225     {
1226         this.sink = sink;
1227     }
1228 
1229     /***************************************************************************
1230 
1231         Seek within this stream
1232 
1233         Any and all buffered output is disposed before the upstream is invoked.
1234         Use an explicit `flush()` to emit content prior to seeking.
1235 
1236     ***************************************************************************/
1237 
1238     final override long seek (long offset, Anchor start = Anchor.Begin)
1239     {
1240         this.clear();
1241         return super.seek(offset, start);
1242     }
1243 
1244     /***************************************************************************
1245 
1246         Write into this buffer.
1247 
1248         Exposes the raw data buffer at the current _write position,
1249         The delegate is provided with a void[] representing space
1250         available within the buffer at the current _write position.
1251 
1252         The delegate should return the appropriate number of bytes
1253         if it writes valid content, or Eof on error.
1254 
1255         Params:
1256             dg = The callback to provide buffer access to.
1257 
1258         Returns:
1259             the delegate return's value
1260 
1261    ***************************************************************************/
1262 
1263     final size_t writer (scope size_t delegate (void[]) dg)
1264     {
1265         auto count = dg (this.data[this.extent..this.dimension]);
1266 
1267         if (count != Eof)
1268         {
1269             this.extent += count;
1270             verify(this.extent <= this.dimension);
1271         }
1272         return count;
1273     }
1274 
1275     /***************************************************************************
1276 
1277         Read directly from this buffer.
1278 
1279         Exposes the raw data buffer at the current _read position.
1280         The delegate is provided with a void[] representing the available data,
1281         and should return zero to leave the current _read position intact.
1282 
1283         If the delegate consumes data, it should return the number of
1284         bytes consumed; or Eof to indicate an error.
1285 
1286         Params:
1287             dg = Callback to provide buffer access to.
1288 
1289         Returns:
1290             the delegate's return value.
1291 
1292    ***************************************************************************/
1293 
1294     private final size_t reader (scope size_t delegate (const(void)[]) dg)
1295     {
1296         auto count = dg (this.data[this.index..this.extent]);
1297 
1298         if (count != Eof)
1299         {
1300             this.index += count;
1301             verify(this.index <= this.extent);
1302         }
1303         return count;
1304     }
1305 
1306     /***************************************************************************
1307 
1308         Available content.
1309 
1310         Returns:
1311             count of _readable bytes remaining in buffer.
1312             This is calculated simply as `limit() - position()`.
1313 
1314     ***************************************************************************/
1315 
1316     private final size_t readable ()
1317     {
1318         return this.extent - this.index;
1319     }
1320 
1321     /***************************************************************************
1322 
1323         Reset the buffer content.
1324 
1325         Set the backing array with some content readable.
1326         Writing to this will either flush it to an associated conduit,
1327         or raise an Eof condition.
1328         Use clear() to reset the content (make it all writable).
1329 
1330         Params:
1331             data =     The backing array to buffer within.
1332             readable = The number of bytes within data considered valid.
1333 
1334         Returns:
1335             The buffer instance.
1336 
1337    ***************************************************************************/
1338 
1339     private final BufferedOutput set (void[] data, size_t readable)
1340     {
1341         this.data = data;
1342         this.extent = readable;
1343         this.dimension = data.length;
1344 
1345         // reset to start of input
1346         this.index = 0;
1347 
1348         return this;
1349     }
1350 
1351     /***************************************************************************
1352 
1353         Compress buffer space.
1354 
1355         Limit is set to the amount of data remaining.
1356         Position is always reset to zero.
1357 
1358         If we have some data left after an export, move it to front of the
1359         buffer and set position to be just after the remains.
1360         This is for supporting certain conduits which choose to write just
1361         the initial portion of a request.
1362 
1363         Returns:
1364             The buffer instance.
1365 
1366    ***************************************************************************/
1367 
1368     private final BufferedOutput compress ()
1369     {
1370         size_t r = this.readable();
1371 
1372         if (this.index > 0 && r > 0)
1373             // content may overlap ...
1374             memmove(&data[0], &data[this.index], r);
1375 
1376         this.index = 0;
1377         this.extent = r;
1378         return this;
1379     }
1380 }
1381 
1382 
1383 unittest
1384 {
1385     scope device = new MemoryDevice;
1386     scope buffer = new BufferedInput(device, 16);
1387 
1388     device.write(
1389         "En 1815, M. Charles-François-Bienvenu Myriel était évêque de Digne. " ~
1390         "C’était un vieillard d’environ soixante-quinze ans; " ~
1391         "il occupait le siège de Digne depuis 1806.");
1392     device.seek(0);
1393 
1394     size_t finder (const(void)[] raw)
1395     {
1396         auto text = cast(cstring) raw;
1397         if (raw.length >= 5 && raw[$ - 5 .. $] == "1806.")
1398             return raw.length - 5;
1399         return BufferedInput.Eof;
1400     }
1401 
1402     test(buffer.next(&finder));
1403 }