1 /******************************************************************************* 2 3 Copyright: 4 Copyright (c) 2004 Kris Bell. 5 Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH. 6 All rights reserved. 7 8 License: 9 Tango Dual License: 3-Clause BSD License / Academic Free License v3.0. 10 See LICENSE_TANGO.txt for details. 11 12 Version: 13 Mar 2004: Initial release$(BR) 14 Dec 2006: Outback release 15 16 Authors: Kris Bell 17 18 *******************************************************************************/ 19 20 module ocean.io.stream.Buffered; 21 22 import core.stdc..string; 23 24 import ocean.meta.types.Qualifiers; 25 version (unittest) import ocean.core.Test; 26 import ocean.core.Verify; 27 28 public import ocean.io.model.IConduit; 29 30 import ocean.io.device.Conduit; 31 version (unittest) import ocean.io.device.MemoryDevice; 32 33 34 /// Shorthand aliases 35 public alias BufferedInput Bin; 36 /// ditto 37 public alias BufferedOutput Bout; 38 39 40 /******************************************************************************* 41 42 Buffers the flow of data from a upstream input 43 44 A downstream neighbour can locate and use this buffer instead of creating 45 another instance of their own. 46 47 Note: 48 upstream is closer to the source, and downstream is further away 49 50 *******************************************************************************/ 51 52 public class BufferedInput : InputFilter, InputBuffer 53 { 54 /// Clear/flush are the same 55 public alias flush clear; 56 /// Access the source. 57 public alias InputFilter.input input; 58 59 private void[] data; // The raw data buffer. 60 private size_t index; // Current read position. 61 private size_t extent; // Limit of valid content. 62 63 invariant () 64 { 65 assert(this.index <= this.extent); 66 assert(this.extent <= this.data.length); 67 } 68 69 /*************************************************************************** 70 71 Construct a buffer. 72 73 Construct a Buffer upon the provided input stream. 74 75 Params: 76 stream = An input stream. 77 78 ***************************************************************************/ 79 80 public this (InputStream stream) 81 { 82 verify(stream !is null); 83 this(stream, stream.conduit.bufferSize); 84 } 85 86 /*************************************************************************** 87 88 Construct a buffer. 89 90 Construct a Buffer upon the provided input stream. 91 92 Params: 93 stream = An input stream. 94 capacity = Desired buffer capacity. 95 96 ***************************************************************************/ 97 98 public this (InputStream stream, size_t capacity) 99 { 100 this.set(new ubyte[capacity], 0); 101 super(this.source = stream); 102 } 103 104 /*************************************************************************** 105 106 Attempt to share an upstream Buffer, and create an instance 107 where there's not one available. 108 109 If an upstream Buffer instances is visible, it will be shared. 110 Otherwise, a new instance is created based upon the bufferSize 111 exposed by the stream endpoint (conduit). 112 113 Params: 114 stream = An input stream. 115 116 ***************************************************************************/ 117 118 public static InputBuffer create (InputStream stream) 119 { 120 auto source = stream; 121 auto conduit = source.conduit; 122 while (cast(Mutator) source is null) 123 { 124 auto b = cast(InputBuffer) source; 125 if (b) 126 return b; 127 if (source is conduit) 128 break; 129 source = source.input; 130 verify(source !is null); 131 } 132 133 return new BufferedInput(stream, conduit.bufferSize); 134 } 135 136 /*************************************************************************** 137 138 Place more data from the source stream into this buffer, and returns 139 the number of bytes added. 140 141 This does not compress the current buffer content, so consider doing 142 that explicitly. 143 144 Returns: 145 Number of bytes added, which will be Eof when there is no further 146 input available. 147 Zero is also a valid response, meaning no data was actually added. 148 149 ***************************************************************************/ 150 151 public final size_t populate () 152 { 153 return this.writer(&this.input.read); 154 } 155 156 /*************************************************************************** 157 158 Returns: 159 a void[] slice of the buffer from start to end, 160 where end is exclusive. 161 162 ***************************************************************************/ 163 164 public final void[] opSlice (size_t start, size_t end) 165 { 166 verify(start <= this.extent && end <= this.extent && start <= end); 167 return this.data[start .. end]; 168 } 169 170 /*************************************************************************** 171 172 Retrieve the valid content. 173 174 Returns: 175 A void[] slice of the buffer, from the current position up to 176 the limit of valid content. 177 The content remains in the buffer for future extraction. 178 179 ***************************************************************************/ 180 181 public final void[] slice () 182 { 183 return this.data[this.index .. this.extent]; 184 } 185 186 /*************************************************************************** 187 188 Access buffer content. 189 190 Read a slice of data from the buffer, loading from the 191 conduit as necessary. The specified number of bytes is 192 sliced from the buffer, and marked as having been read 193 when the 'eat' parameter is set true. When 'eat' is set 194 false, the read position is not adjusted. 195 196 The slice cannot be larger than the size of the buffer: use method 197 `fill(void[])` instead where you simply want the content copied, 198 or use `conduit.read()` to extract directly from an attached conduit 199 Also if you need to retain the slice, then it should be `.dup`'d 200 before the buffer is compressed or repopulated. 201 202 Params: 203 size = Number of bytes to access. 204 eat = Whether to consume the content or not. 205 206 Returns: 207 The corresponding buffer slice when successful, or 208 null if there's not enough data available (Eof; Eob). 209 210 Examples: 211 --- 212 // create a buffer with some content 213 auto buffer = new Buffer("hello world"); 214 215 // consume everything unread 216 auto slice = buffer.slice(buffer.this.readable()); 217 --- 218 219 ***************************************************************************/ 220 221 public final void[] slice (size_t size, bool eat = true) 222 { 223 if (size > this.readable()) 224 { 225 // make some space? This will try to leave as much content 226 // in the buffer as possible, such that entire records may 227 // be aliased directly from within. 228 if (size > (this.data.length - this.index)) 229 { 230 if (size <= this.data.length) 231 this.compress(); 232 else 233 this.conduit.error("input buffer is empty"); 234 } 235 236 // populate tail of buffer with new content 237 do { 238 if (this.writer(&this.source.read) is Eof) 239 this.conduit.error("end-of-flow whilst reading"); 240 } while (size > this.readable()); 241 } 242 243 auto i = this.index; 244 if (eat) 245 this.index += size; 246 return this.data[i .. i + size]; 247 } 248 249 /*************************************************************************** 250 251 Read directly from this buffer. 252 253 Exposes the raw data buffer at the current _read position. 254 The delegate is provided with a void[] representing the available 255 data, and should return zero to leave the current _read position 256 intact. 257 258 If the delegate consumes data, it should return the number of 259 bytes consumed; or IConduit.Eof to indicate an error. 260 261 Params: 262 dg = Callback to provide buffer access to. 263 264 Returns: 265 the delegate's return value 266 267 ***************************************************************************/ 268 269 public final size_t reader (scope size_t delegate (const(void)[]) dg) 270 { 271 auto count = dg(this.data[this.index .. this.extent]); 272 273 if (count != Eof) 274 { 275 this.index += count; 276 verify(this.index <= this.extent); 277 } 278 return count; 279 } 280 281 /*************************************************************************** 282 283 Write into this buffer. 284 285 Exposes the raw data buffer at the current _write position, 286 The delegate is provided with a void[] representing space 287 available within the buffer at the current _write position. 288 289 The delegate should return the appropriate number of bytes 290 if it writes valid content, or IConduit.Eof on error. 291 292 Params: 293 dg = The callback to provide buffer access to. 294 295 Returns: 296 the delegate return's value. 297 298 ***************************************************************************/ 299 300 public size_t writer (scope size_t delegate (void[]) dg) 301 { 302 auto count = dg(this.data[this.extent .. $]); 303 304 if (count != Eof) 305 { 306 this.extent += count; 307 verify(this.extent <= this.data.length); 308 } 309 return count; 310 } 311 312 /*************************************************************************** 313 314 Transfer content into the provided dst. 315 316 Populates the provided array with content. We try to 317 satisfy the request from the buffer content, and read 318 directly from an attached conduit when the buffer is empty. 319 320 Params: 321 dst = Destination of the content. 322 323 Returns: 324 the number of bytes read, which may be less than `dst.length`. 325 Eof is returned when no further content is available. 326 327 ***************************************************************************/ 328 329 public final override size_t read (void[] dst) 330 { 331 size_t content = this.readable; 332 if (content) 333 { 334 if (content >= dst.length) 335 content = dst.length; 336 337 // transfer buffer content 338 dst[0 .. content] = this.data[this.index .. this.index + content]; 339 this.index += content; 340 } 341 // pathological cases read directly from conduit 342 else if (dst.length > this.data.length) 343 content = this.source.read(dst); 344 else 345 { 346 if (this.writable is 0) 347 this.index = this.extent = 0; // same as clear, without call-chain 348 349 // keep buffer partially populated 350 if ((content = this.writer(&this.source.read)) != Eof && content > 0) 351 content = this.read(dst); 352 } 353 return content; 354 } 355 356 /************************************************************************** 357 358 Fill the provided buffer 359 360 Returns: 361 the number of bytes actually read, which will be less than 362 `dst.length` when Eof has been reached and Eof thereafter. 363 364 Params: 365 dst = Where data should be placed. 366 exact = Whether to throw an exception when dst is not 367 filled (an Eof occurs first). Defaults to false. 368 369 **************************************************************************/ 370 371 public final size_t fill (void[] dst, bool exact = false) 372 { 373 size_t len = 0; 374 375 while (len < dst.length) 376 { 377 size_t i = this.read(dst[len .. $]); 378 if (i is Eof) 379 { 380 if (exact && len < dst.length) 381 this.conduit.error("end-of-flow whilst reading"); 382 return (len > 0) ? len : Eof; 383 } 384 len += i; 385 } 386 return len; 387 } 388 389 /*************************************************************************** 390 391 Move the current read location. 392 393 Skips ahead by the specified number of bytes, streaming from 394 the associated conduit as necessary. 395 396 Can also reverse the read position by 'size' bytes, when size 397 is negative. This may be used to support lookahead operations. 398 Note that a negative size will fail where there is not sufficient 399 content available in the buffer (can't _skip beyond the beginning). 400 401 Params: 402 size = The number of bytes to move. 403 404 Returns: 405 `true` if successful, `false` otherwise. 406 407 ***************************************************************************/ 408 409 public final bool skip (int size) 410 { 411 if (size < 0) 412 { 413 size = -size; 414 if (this.index >= size) 415 { 416 this.index -= size; 417 return true; 418 } 419 return false; 420 } 421 return this.slice(size) !is null; 422 } 423 424 /*************************************************************************** 425 426 Move the current read location. 427 428 ***************************************************************************/ 429 430 public final override long seek (long offset, Anchor start = Anchor.Begin) 431 { 432 if (start is Anchor.Current) 433 { 434 // handle this specially because we know this is 435 // buffered - we should take into account the buffer 436 // position when seeking 437 offset -= this.readable; 438 auto bpos = offset + this.limit; 439 440 if (bpos >= 0 && bpos < this.limit) 441 { 442 // the new position is within the current 443 // buffer, skip to that position. 444 this.skip(cast(int) bpos - cast(int) position); 445 446 // see if we can return a valid offset 447 auto pos = this.source.seek(0, Anchor.Current); 448 if (pos != Eof) 449 return pos - this.readable(); 450 return Eof; 451 } 452 // else, position is outside the buffer. Do a real 453 // seek using the adjusted position. 454 } 455 456 this.clear(); 457 return this.source.seek(offset, start); 458 } 459 460 /*************************************************************************** 461 462 Iterator support. 463 464 Upon success, the delegate should return the byte-based index of 465 the consumed pattern (tail end of it). 466 Failure to match a pattern should be indicated by returning an Eof 467 468 Each pattern is expected to be stripped of the delimiter. 469 An end-of-file condition causes trailing content to be 470 placed into the token. Requests made beyond Eof result 471 in empty matches (length is zero). 472 473 Additional iterator and/or reader instances 474 will operate in lockstep when bound to a common buffer. 475 476 Params: 477 scan = The delegate to invoke with the current content. 478 479 Returns: 480 `true` if a token was isolated, `false` otherwise. 481 482 ***************************************************************************/ 483 484 public final bool next (scope size_t delegate (const(void)[]) scan) 485 { 486 while (this.reader(scan) is Eof) 487 { 488 // did we start at the beginning? 489 if (this.position) 490 // yep - move partial token to start of buffer 491 this.compress(); 492 // no more space in the buffer? 493 else if (this.writable is 0) 494 this.extend(); 495 496 verify(this.writable() > 0); 497 498 // read another chunk of data 499 if (this.writer(&this.source.read) is Eof) 500 return false; 501 } 502 return true; 503 } 504 505 /*************************************************************************** 506 507 Reserve the specified space within the buffer, compressing 508 existing content as necessary to make room. 509 510 Returns: 511 the current read point, after compression if that was required. 512 513 ***************************************************************************/ 514 515 public final size_t reserve (size_t space) 516 { 517 verify(space < this.data.length); 518 519 if ((this.data.length - this.index) < space) 520 this.compress(); 521 return this.index; 522 } 523 524 /*************************************************************************** 525 526 Compress buffer space. 527 528 Limit is set to the amount of data remaining. 529 Position is always reset to zero. 530 531 If we have some data left after an export, move it to the front of 532 the buffer and set position to be just after the remains. 533 This is for supporting certain conduits which choose to write just 534 the initial portion of a request. 535 536 Returns: 537 The buffer instance. 538 539 ***************************************************************************/ 540 541 public final BufferedInput compress () 542 { 543 auto r = this.readable(); 544 545 if (this.index > 0 && r > 0) 546 // content may overlap ... 547 memmove(&data[0], &data[this.index], r); 548 549 this.index = 0; 550 this.extent = r; 551 return this; 552 } 553 554 /*************************************************************************** 555 556 Drain buffer content to the specific conduit. 557 558 Returns: 559 the number of bytes written, or Eof. 560 561 Note: 562 Write as much of the buffer that the associated conduit can consume. 563 The conduit is not obliged to consume all content, 564 so some may remain within the buffer. 565 566 ***************************************************************************/ 567 568 public final size_t drain (OutputStream dst) 569 { 570 verify(dst !is null); 571 572 size_t ret = this.reader(&dst.write); 573 this.compress(); 574 return ret; 575 } 576 577 /*************************************************************************** 578 579 Access buffer limit. 580 581 Each buffer has a capacity, a limit, and a position. 582 The capacity is the maximum content a buffer can contain, 583 limit represents the extent of valid content, and position marks 584 the current read location. 585 586 Returns: 587 the limit of readable content within this buffer. 588 589 ***************************************************************************/ 590 591 public final size_t limit () 592 { 593 return this.extent; 594 } 595 596 /*************************************************************************** 597 598 Access buffer capacity. 599 600 Each buffer has a capacity, a limit, and a position. 601 The capacity is the maximum content a buffer can contain, limit 602 represents the extent of valid content, and position marks 603 the current read location. 604 605 Returns: 606 the maximum capacity of this buffer. 607 608 ***************************************************************************/ 609 610 public final size_t capacity () 611 { 612 return this.data.length; 613 } 614 615 /*************************************************************************** 616 617 Access buffer read position. 618 619 Each buffer has a capacity, a limit, and a position. 620 The capacity is the maximum content a buffer can contain, limit 621 represents the extent of valid content, and position marks 622 the current read location. 623 624 Returns: 625 the current read-position within this buffer. 626 627 ***************************************************************************/ 628 629 final size_t position () 630 { 631 return this.index; 632 } 633 634 /*************************************************************************** 635 636 Available content. 637 638 Returns: 639 count of _readable bytes remaining in buffer. 640 This is calculated simply as `this.limit() - this.position()`. 641 642 ***************************************************************************/ 643 644 public final size_t readable () 645 { 646 return this.extent - this.index; 647 } 648 649 /*************************************************************************** 650 651 Cast to a target type without invoking the wrath of the 652 runtime checks for misalignment. Instead, we truncate the 653 array length. 654 655 ***************************************************************************/ 656 657 static inout(T)[] convert (T) (inout(void)[] x) 658 { 659 return (cast(inout(T)*) x.ptr) [0 .. (x.length / T.sizeof)]; 660 } 661 662 /*************************************************************************** 663 664 Clear buffer content. 665 666 Note: 667 Reset 'position' and 'limit' to zero. This effectively 668 clears all content from the buffer. 669 670 ***************************************************************************/ 671 672 public final override BufferedInput flush () 673 { 674 this.index = this.extent = 0; 675 676 // clear the filter chain also 677 if (this.source) 678 super.flush(); 679 return this; 680 } 681 682 /*************************************************************************** 683 684 Set the input stream. 685 686 ***************************************************************************/ 687 688 public final void input (InputStream source) 689 { 690 this.source = source; 691 } 692 693 /*************************************************************************** 694 695 Load the bits from a stream, up to an indicated length, and 696 return them all in an array. 697 698 The function may consume more than the indicated size where additional 699 data is available during a block read operation, but will not wait for 700 more than specified. 701 An Eof terminates the operation. 702 703 Returns: 704 an array representing the content 705 706 Throws: 707 `IOException` on error. 708 709 ***************************************************************************/ 710 711 public final override void[] load (size_t max = size_t.max) 712 { 713 this.load(super.input, super.conduit.bufferSize, max); 714 return this.slice; 715 } 716 717 /*************************************************************************** 718 719 Import content from the specified conduit, expanding as necessary 720 up to the indicated maximum or until an Eof occurs. 721 722 Returns: 723 the number of bytes contained. 724 725 ***************************************************************************/ 726 727 private size_t load (InputStream src, size_t increment, size_t max) 728 { 729 size_t len, count; 730 731 // make some room 732 this.compress(); 733 734 // explicitly resize? 735 if (max != max.max) 736 if ((len = this.writable()) < max) 737 increment = max - len; 738 739 while (count < max) 740 { 741 if (!this.writable()) 742 this.data.length = (this.data.length + increment); 743 if ((len = this.writer(&src.read)) is Eof) 744 break; 745 else 746 count += len; 747 } 748 return count; 749 } 750 751 /*************************************************************************** 752 753 Reset the buffer content. 754 755 Set the backing array with some content readable. 756 Writing to this will either flush it to an associated conduit, 757 or raise an Eof condition. 758 Use clear() to reset the content (make it all writable). 759 760 Params: 761 data = The backing array to buffer within. 762 readable = The number of bytes within data considered valid. 763 764 Returns: 765 The buffer instance. 766 767 ***************************************************************************/ 768 769 private final BufferedInput set (void[] data, size_t readable) 770 { 771 this.data = data; 772 this.extent = readable; 773 774 // reset to start of input 775 this.index = 0; 776 777 return this; 778 } 779 780 /*************************************************************************** 781 782 Available space. 783 784 Returns: 785 count of _writable bytes available in buffer. 786 This is calculated simply as `this.capacity() - this.limit()`. 787 788 ***************************************************************************/ 789 790 private final size_t writable () 791 { 792 return this.data.length - this.extent; 793 } 794 795 /*************************************************************************** 796 797 Extend the buffer by half of its size 798 799 ***************************************************************************/ 800 801 private void extend () 802 { 803 this.data.length = this.data.length + (this.data.length / 2); 804 } 805 } 806 807 808 /******************************************************************************* 809 810 Buffers the flow of data from a upstream output. 811 812 A downstream neighbour can locate and use this buffer instead of creating 813 another instance of their own. 814 815 Don't forget to flush() buffered content before closing. 816 817 Note: 818 upstream is closer to the source, and downstream is further away 819 820 *******************************************************************************/ 821 822 public class BufferedOutput : OutputFilter, OutputBuffer 823 { 824 /// access the sink 825 alias OutputFilter.output output; 826 827 private void[] data; // the raw data buffer 828 private size_t index; // current read position 829 private size_t extent; // limit of valid content 830 private size_t dimension; // maximum extent of content 831 832 /// Notifier that will be called on flush. 833 private void delegate() flush_notifier; 834 835 invariant () 836 { 837 assert (this.index <= this.extent); 838 assert (this.extent <= this.dimension); 839 } 840 841 /*************************************************************************** 842 843 Construct a Buffer upon the provided input stream. 844 845 Params: 846 stream = An input stream. 847 flush_notifier = user specified delegate called after the content 848 of the buffer has been flushed to upstream output. 849 850 ***************************************************************************/ 851 852 public this (OutputStream stream, scope void delegate() flush_notifier = null) 853 { 854 verify(stream !is null); 855 this(stream, stream.conduit.bufferSize, flush_notifier); 856 } 857 858 /*************************************************************************** 859 860 Construct a Buffer upon the provided input stream. 861 862 Params: 863 stream = An input stream. 864 capacity = Desired buffer capacity. 865 flush_notifier = user specified delegate called after the content 866 of the buffer has been flushed to upstream output. 867 868 ***************************************************************************/ 869 870 public this (OutputStream stream, size_t capacity, 871 scope void delegate() flush_notifier = null) 872 { 873 this.set(new ubyte[capacity], 0); 874 this.flush_notifier = flush_notifier; 875 super(this.sink = stream); 876 } 877 878 /*************************************************************************** 879 880 Attempts to share an upstream BufferedOutput, and creates a new 881 instance where there's not a shared one available. 882 883 Where an upstream instance is visible it will be returned. 884 Otherwise, a new instance is created based upon the bufferSize 885 exposed by the associated conduit 886 887 Params: 888 stream = An output stream. 889 890 ***************************************************************************/ 891 892 public static OutputBuffer create (OutputStream stream) 893 { 894 auto sink = stream; 895 auto conduit = sink.conduit; 896 while (cast(Mutator) sink is null) 897 { 898 auto b = cast(OutputBuffer) sink; 899 if (b) 900 return b; 901 if (sink is conduit) 902 break; 903 sink = sink.output; 904 verify(sink !is null); 905 } 906 907 return new BufferedOutput(stream, conduit.bufferSize); 908 } 909 910 /*************************************************************************** 911 912 Retrieve the valid content. 913 914 Returns: 915 A void[] slice of the buffer. 916 917 Returns: 918 a slice of the buffer, from the current position up to the limit 919 of valid content. 920 The content remains in the buffer for future extraction. 921 922 ***************************************************************************/ 923 924 public final void[] slice () 925 { 926 return this.data[this.index .. this.extent]; 927 } 928 929 /*************************************************************************** 930 931 Emulate OutputStream.write(). 932 933 Appends src content to the buffer, flushing to an attached conduit 934 as necessary. An IOException is thrown upon write failure. 935 936 Params: 937 src = The content to write. 938 939 Returns: 940 the number of bytes written, which may be less than provided 941 (conceptually). 942 943 ***************************************************************************/ 944 945 public final override size_t write (const(void)[] src) 946 { 947 this.append(src.ptr, src.length); 948 return src.length; 949 } 950 951 /*************************************************************************** 952 953 Append content. 954 955 Append an array to this buffer, flush to the conduit as necessary. 956 This is often used in lieu of a Writer. 957 958 Params: 959 src = The content to _append. 960 961 Returns: 962 a chaining reference if all content was written. 963 964 Throws: 965 an IOException indicating Eof or Eob if not. 966 967 ***************************************************************************/ 968 969 public final BufferedOutput append (const(void)[] src) 970 { 971 return this.append(src.ptr, src.length); 972 } 973 974 /*************************************************************************** 975 976 Append content. 977 978 Append an array to this buffer, flush to the conduit as necessary. 979 This is often used in lieu of a Writer. 980 981 Params: 982 src = The content to _append. 983 length = The number of bytes in src. 984 985 Returns: 986 a chaining reference if all content was written. 987 988 Throws: 989 an IOException indicating Eof or Eob if not. 990 991 ***************************************************************************/ 992 993 public final BufferedOutput append (const(void)* src, size_t length) 994 { 995 if (length > this.writable) 996 { 997 this.flush(); 998 999 // check for pathological case 1000 if (length > this.dimension) 1001 do { 1002 auto written = this.sink.write(src [0 .. length]); 1003 if (written is Eof) 1004 this.conduit.error("end-of-flow whilst writing"); 1005 length -= written; 1006 src += written; 1007 } while (length > this.dimension); 1008 } 1009 1010 // avoid "out of bounds" test on zero length 1011 if (length) 1012 { 1013 // content may overlap ... 1014 memmove(&this.data[this.extent], src, length); 1015 this.extent += length; 1016 } 1017 return this; 1018 } 1019 1020 /*************************************************************************** 1021 1022 Available space. 1023 1024 Returns: 1025 count of _writable bytes available in buffer. 1026 This is calculated as `capacity() - limit()`. 1027 1028 ***************************************************************************/ 1029 1030 public final size_t writable () 1031 { 1032 return this.dimension - this.extent; 1033 } 1034 1035 /*************************************************************************** 1036 1037 Access buffer limit. 1038 1039 Each buffer has a capacity, a limit, and a position. 1040 The capacity is the maximum content a buffer can contain, 1041 limit represents the extent of valid content, and position marks 1042 the current read location. 1043 1044 Returns: 1045 the limit of readable content within this buffer. 1046 1047 ***************************************************************************/ 1048 1049 public final size_t limit () 1050 { 1051 return this.extent; 1052 } 1053 1054 /*************************************************************************** 1055 1056 Access buffer capacity. 1057 1058 Each buffer has a capacity, a limit, and a position. 1059 The capacity is the maximum content a buffer can contain, 1060 limit represents the extent of valid content, and position marks 1061 the current read location. 1062 1063 Returns: 1064 the maximum capacity of this buffer. 1065 1066 ***************************************************************************/ 1067 1068 public final size_t capacity () 1069 { 1070 return this.dimension; 1071 } 1072 1073 /*************************************************************************** 1074 1075 Truncate the buffer within its extent. 1076 1077 Returns: 1078 `true` if the new length is valid, `false` otherwise. 1079 1080 ***************************************************************************/ 1081 1082 public final bool truncate (size_t length) 1083 { 1084 if (length <= this.data.length) 1085 { 1086 this.extent = length; 1087 return true; 1088 } 1089 return false; 1090 } 1091 1092 /*************************************************************************** 1093 1094 Cast to a target type without invoking the wrath of the 1095 runtime checks for misalignment. Instead, we truncate the 1096 array length. 1097 1098 ***************************************************************************/ 1099 1100 static T[] convert(T)(void[] x) 1101 { 1102 return (cast(T*) x.ptr) [0 .. (x.length / T.sizeof)]; 1103 } 1104 1105 /*************************************************************************** 1106 1107 Flush all buffer content to the specific conduit. 1108 1109 Flush the contents of this buffer. 1110 This will block until all content is actually flushed via the associated 1111 conduit, whereas `drain()` will not. 1112 1113 Throws: 1114 an IOException on premature Eof. 1115 1116 ***************************************************************************/ 1117 1118 final override BufferedOutput flush () 1119 { 1120 while (this.readable() > 0) 1121 { 1122 auto ret = this.reader(&this.sink.write); 1123 if (ret is Eof) 1124 this.conduit.error("end-of-flow whilst writing"); 1125 } 1126 1127 // flush the filter chain also 1128 this.clear(); 1129 super.flush; 1130 1131 if (this.flush_notifier) 1132 { 1133 this.flush_notifier(); 1134 } 1135 1136 return this; 1137 } 1138 1139 /*************************************************************************** 1140 1141 Copy content via this buffer from the provided src conduit. 1142 1143 The src conduit has its content transferred through this buffer via 1144 a series of fill & drain operations, 1145 until there is no more content available. 1146 The buffer content should be explicitly flushed by the caller. 1147 1148 Throws: 1149 an IOException on premature Eof. 1150 1151 ***************************************************************************/ 1152 1153 final override BufferedOutput copy (InputStream src, size_t max = -1) 1154 { 1155 size_t chunk, copied; 1156 1157 while (copied < max && (chunk = this.writer(&src.read)) != Eof) 1158 { 1159 copied += chunk; 1160 1161 // don't drain until we actually need to 1162 if (this.writable is 0) 1163 if (this.drain(this.sink) is Eof) 1164 this.conduit.error("end-of-flow whilst writing"); 1165 } 1166 return this; 1167 } 1168 1169 /*************************************************************************** 1170 1171 Flushes the buffer and closes the stream. 1172 1173 ***************************************************************************/ 1174 1175 final override void close ( ) 1176 { 1177 this.flush(); 1178 super.close(); 1179 } 1180 1181 /*************************************************************************** 1182 1183 Drain buffer content to the specific conduit. 1184 1185 Write as much of the buffer that the associated conduit can consume. 1186 The conduit is not obliged to consume all content, 1187 so some may remain within the buffer. 1188 1189 Returns: 1190 the number of bytes written, or Eof. 1191 1192 ***************************************************************************/ 1193 1194 final size_t drain (OutputStream dst) 1195 { 1196 verify(dst !is null); 1197 1198 size_t ret = this.reader(&dst.write); 1199 this.compress(); 1200 return ret; 1201 } 1202 1203 /*************************************************************************** 1204 1205 Clear buffer content. 1206 1207 Reset 'position' and 'limit' to zero. 1208 This effectively clears all content from the buffer. 1209 1210 ***************************************************************************/ 1211 1212 final BufferedOutput clear () 1213 { 1214 this.index = this.extent = 0; 1215 return this; 1216 } 1217 1218 /*************************************************************************** 1219 1220 Set the output stream. 1221 1222 ***************************************************************************/ 1223 1224 final void output (OutputStream sink) 1225 { 1226 this.sink = sink; 1227 } 1228 1229 /*************************************************************************** 1230 1231 Seek within this stream 1232 1233 Any and all buffered output is disposed before the upstream is invoked. 1234 Use an explicit `flush()` to emit content prior to seeking. 1235 1236 ***************************************************************************/ 1237 1238 final override long seek (long offset, Anchor start = Anchor.Begin) 1239 { 1240 this.clear(); 1241 return super.seek(offset, start); 1242 } 1243 1244 /*************************************************************************** 1245 1246 Write into this buffer. 1247 1248 Exposes the raw data buffer at the current _write position, 1249 The delegate is provided with a void[] representing space 1250 available within the buffer at the current _write position. 1251 1252 The delegate should return the appropriate number of bytes 1253 if it writes valid content, or Eof on error. 1254 1255 Params: 1256 dg = The callback to provide buffer access to. 1257 1258 Returns: 1259 the delegate return's value 1260 1261 ***************************************************************************/ 1262 1263 final size_t writer (scope size_t delegate (void[]) dg) 1264 { 1265 auto count = dg (this.data[this.extent..this.dimension]); 1266 1267 if (count != Eof) 1268 { 1269 this.extent += count; 1270 verify(this.extent <= this.dimension); 1271 } 1272 return count; 1273 } 1274 1275 /*************************************************************************** 1276 1277 Read directly from this buffer. 1278 1279 Exposes the raw data buffer at the current _read position. 1280 The delegate is provided with a void[] representing the available data, 1281 and should return zero to leave the current _read position intact. 1282 1283 If the delegate consumes data, it should return the number of 1284 bytes consumed; or Eof to indicate an error. 1285 1286 Params: 1287 dg = Callback to provide buffer access to. 1288 1289 Returns: 1290 the delegate's return value. 1291 1292 ***************************************************************************/ 1293 1294 private final size_t reader (scope size_t delegate (const(void)[]) dg) 1295 { 1296 auto count = dg (this.data[this.index..this.extent]); 1297 1298 if (count != Eof) 1299 { 1300 this.index += count; 1301 verify(this.index <= this.extent); 1302 } 1303 return count; 1304 } 1305 1306 /*************************************************************************** 1307 1308 Available content. 1309 1310 Returns: 1311 count of _readable bytes remaining in buffer. 1312 This is calculated simply as `limit() - position()`. 1313 1314 ***************************************************************************/ 1315 1316 private final size_t readable () 1317 { 1318 return this.extent - this.index; 1319 } 1320 1321 /*************************************************************************** 1322 1323 Reset the buffer content. 1324 1325 Set the backing array with some content readable. 1326 Writing to this will either flush it to an associated conduit, 1327 or raise an Eof condition. 1328 Use clear() to reset the content (make it all writable). 1329 1330 Params: 1331 data = The backing array to buffer within. 1332 readable = The number of bytes within data considered valid. 1333 1334 Returns: 1335 The buffer instance. 1336 1337 ***************************************************************************/ 1338 1339 private final BufferedOutput set (void[] data, size_t readable) 1340 { 1341 this.data = data; 1342 this.extent = readable; 1343 this.dimension = data.length; 1344 1345 // reset to start of input 1346 this.index = 0; 1347 1348 return this; 1349 } 1350 1351 /*************************************************************************** 1352 1353 Compress buffer space. 1354 1355 Limit is set to the amount of data remaining. 1356 Position is always reset to zero. 1357 1358 If we have some data left after an export, move it to front of the 1359 buffer and set position to be just after the remains. 1360 This is for supporting certain conduits which choose to write just 1361 the initial portion of a request. 1362 1363 Returns: 1364 The buffer instance. 1365 1366 ***************************************************************************/ 1367 1368 private final BufferedOutput compress () 1369 { 1370 size_t r = this.readable(); 1371 1372 if (this.index > 0 && r > 0) 1373 // content may overlap ... 1374 memmove(&data[0], &data[this.index], r); 1375 1376 this.index = 0; 1377 this.extent = r; 1378 return this; 1379 } 1380 } 1381 1382 1383 unittest 1384 { 1385 scope device = new MemoryDevice; 1386 scope buffer = new BufferedInput(device, 16); 1387 1388 device.write( 1389 "En 1815, M. Charles-François-Bienvenu Myriel était évêque de Digne. " ~ 1390 "C’était un vieillard d’environ soixante-quinze ans; " ~ 1391 "il occupait le siège de Digne depuis 1806."); 1392 device.seek(0); 1393 1394 size_t finder (const(void)[] raw) 1395 { 1396 auto text = cast(cstring) raw; 1397 if (raw.length >= 5 && raw[$ - 5 .. $] == "1806.") 1398 return raw.length - 5; 1399 return BufferedInput.Eof; 1400 } 1401 1402 test(buffer.next(&finder)); 1403 }