1 /******************************************************************************* 2 3 Direct I/O output and input streams 4 5 This module provides an OutputStream (BufferedDirectWriteFile) and 6 a InputStream (BufferedDirectReadFile) to do direct I/O using Linux's 7 O_DIRECT flag. 8 9 This kind of I/O is specially (and probably only) useful when you need to 10 dump a huge amount of data to disk as some kind of backup, i.e., data that 11 you'll probably won't ever read again (or at least in the short term). In 12 those cases, going through the page cache is not only probably slower 13 (because you are doing an extra memory copy), but it can 14 potentially freeze the whole system, if the sysctl vm.dirty_ratio is passed, 15 in that, programs will use their own time to write the page cache to disk. 16 If the data you need to write is small or you are going access it in the 17 short term and you are not experiencing freezes, you probably DON'T want to 18 use this module. 19 20 Even when these kind of objects should probably derive from File (i.e. be 21 a device, and be selectable), given this type of I/O is very particular, and 22 a lot of details need to be taken into account, they are just implementing 23 the stream interfaces, and separately (for example, direct I/O is supposed 24 to be always blocking, unless you do async I/O too, because the data is 25 being copied directly from your buffer to the disk, is not going through the 26 page cache, which makes non-blocking I/O possible). Even some of the stream 27 interface methods are not implemented (seek(), flush(), copy() and load()). 28 This might change in the future, if needed. 29 30 Direct I/O also must write complete sectors. This means buffers passed to 31 write(2) must be aligned to the block size too. This is why this class uses 32 internal buffering instead of using the original memory. This is just to 33 make user's lives easier, so they don't have to worry about alignment (if 34 you can, try to keep your buffers aligned though, there are optimizations to 35 avoid copies in those cases). When instantiating the classes, it is safe to 36 pass as buffer memory you just allocated through the GC, since the GC 37 returns memory that's always aligned to the page size (4096) when allocating 38 chunks larger than a page. 39 40 Users must notice, though, that whole sectors will be written (512 bytes 41 each), so if they write, for example 100 bytes, the file will be still 512 42 bytes long and the final 412 bytes will contain garbage. truncate(2) or 43 ftruncate(2) might be used to truncate the file to its real size if desired. 44 45 See_Also: 46 http://web.archive.org/web/20160317032821/http://www.westnet.com/~gsmith/content/linux-pdflush.htm 47 https://www.kernel.org/doc/Documentation/sysctl/vm.txt 48 49 Copyright: 50 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 51 All rights reserved. 52 53 License: 54 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 55 Alternatively, this file may be distributed under the terms of the Tango 56 3-Clause BSD License (see LICENSE_BSD.txt for details). 57 58 *******************************************************************************/ 59 60 module ocean.io.device.DirectIO; 61 62 63 64 65 import ocean.meta.types.Qualifiers; 66 import ocean.core.Verify; 67 68 import core.memory; 69 70 import ocean.io.model.IConduit; 71 import ocean.io.device.File; 72 import ocean.core.ExceptionDefinitions: IOException; 73 import ocean.stdc.posix.fcntl : O_DIRECT; // Linux only 74 75 version (unittest) 76 { 77 import ocean.core.Test; 78 } 79 80 /******************************************************************************* 81 82 Mixin template for classes that need to have buffers that are aligned to 83 a certain block size and don't support some operations. 84 85 *******************************************************************************/ 86 87 private template AlignedBufferedStream ( ) 88 { 89 90 /*************************************************************************** 91 92 Block size. 93 94 Almost every HDD out there has a block size of 512. But we should be 95 careful about this... 96 97 ***************************************************************************/ 98 99 public enum { BLOCK_SIZE = 512 } 100 101 /*************************************************************************** 102 103 Internal buffer (the size needs to be multiple of the block size). 104 105 ***************************************************************************/ 106 107 protected ubyte[] buffer; 108 109 /*************************************************************************** 110 111 Internal pointer to the next byte of the buffer that is free. 112 113 ***************************************************************************/ 114 115 protected size_t free_index; 116 117 /*************************************************************************** 118 119 Construct the buffer using an existing buffer. 120 121 This method checks the buffer is properly aligned and the length is 122 a multiple of BLOCK_SIZE too. 123 124 Params: 125 buffer = buffer to re-use for this aligned buffer. 126 127 ***************************************************************************/ 128 129 protected void setBuffer ( ubyte[] buffer ) 130 { 131 this.buffer = buffer; 132 // Throw an error if the buffer is not aligned to BLOCK_SIZE 133 if ( !this.isAligned(buffer.ptr) ) 134 throw new IOException("Buffer is not aligned to BLOCK_SIZE, maybe " 135 ~ "you should start using posix_memalign(3)"); 136 // Throw an error if the buffer length is not a multiple of the 137 // BLOCK_SIZE 138 if ((buffer.length % BLOCK_SIZE) != 0) 139 throw new IOException("Buffer length is not multiple of the " 140 ~ "BLOCK_SIZE"); 141 this.free_index = 0; 142 } 143 144 /*************************************************************************** 145 146 Construct the buffer with a specified size. 147 148 This method checks the buffer is properly aligned and the lenght is 149 a multiple of BLOCK_SIZE too. 150 151 Params: 152 buffer_blocks = Buffer size in blocks 153 154 ***************************************************************************/ 155 156 protected void createBuffer ( size_t buffer_blocks ) 157 { 158 this.setBuffer(createAlignedBuffer(buffer_blocks)); 159 } 160 161 /*************************************************************************** 162 163 Construct a buffer with length equal to the specified multiple of 164 BLOCK_SIZE. This method checks the buffer is properly aligned and the 165 length is a multiple of BLOCK_SIZE too. 166 167 Note on buffer allocation: 168 O_DIRECT requires BLOCK_SIZE-aligned memory. BLOCK_SIZE is defined 169 as 512 bytes. Conveniently, the current GC implementation always 170 aligns memory to 512 bytes, so we can simply rely on that. In D2, 171 `new` is not guaranteed to provide block-aligned memory, but 172 `GC.malloc` does make that promise in its documentation: 173 https://dlang.org/phobos/core_memory.html#.GC.malloc 174 175 (This behaviour is tested with a unittest, in case BLOCK_SIZE or 176 the GC's behaviour ever changes.) 177 178 Params: 179 buffer_blocks = Buffer size in blocks 180 181 Returns: 182 newly allocated buffer of the specified size 183 184 ***************************************************************************/ 185 186 private static ubyte[] createAlignedBuffer ( size_t buffer_blocks ) 187 out ( buffer ) 188 { 189 assert(isAligned(buffer.ptr)); 190 assert((buffer.length % BLOCK_SIZE) == 0); 191 } 192 do 193 { 194 auto bytes = buffer_blocks * BLOCK_SIZE; 195 auto buffer = cast(ubyte[]) GC.malloc(bytes)[0 .. bytes]; 196 return buffer; 197 } 198 199 /*************************************************************************** 200 201 Unittest to confirm during testing that the statement above about the 202 implementation of GC malloc returning 512-byte-aligned buffers is 203 correct in all builds. 204 205 If this test ever fails, we should adapt createAlignedBuffer to use 206 posix_memalign(3) instead to allocate the memory. 207 208 ***************************************************************************/ 209 210 unittest 211 { 212 auto buffer = createAlignedBuffer(1); 213 test(isAligned(buffer.ptr)); 214 } 215 216 /*************************************************************************** 217 218 Return true if the pointer is aligned to the block size. 219 220 ***************************************************************************/ 221 222 static public bool isAligned ( const(void)* ptr ) 223 { 224 return (cast(size_t) ptr & (BLOCK_SIZE - 1)) == 0; 225 } 226 227 /*************************************************************************** 228 229 Throws an IOException because is not implemented. 230 231 ***************************************************************************/ 232 233 public override long seek (long offset, Anchor anchor = Anchor.Begin) 234 { 235 throw new IOException("seek() not supported by " ~ 236 this.classinfo.name); 237 } 238 239 /*************************************************************************** 240 241 Throws an IOException because is not implemented. 242 243 ***************************************************************************/ 244 245 public override IOStream flush () 246 { 247 throw new IOException("flush() not supported by " ~ 248 this.classinfo.name); 249 } 250 251 /*************************************************************************** 252 253 Throws IOException because is not implemented. 254 255 Only present in OutputStream, so we can't use the override keyword. 256 257 ***************************************************************************/ 258 259 public OutputStream copy (InputStream src, size_t max = -1) 260 { 261 throw new IOException("copy() not supported by " ~ 262 this.classinfo.name); 263 } 264 265 } 266 267 268 /******************************************************************************* 269 270 Buffered file to do direct I/O writes. 271 272 Please read the module documentation for details. 273 274 *******************************************************************************/ 275 276 public class BufferedDirectWriteFile: OutputStream 277 { 278 279 /*************************************************************************** 280 281 File to do direct IO writes. 282 283 Actually there is no way to open files with File specifying custom 284 flags that is not sub-classing. Bummer! 285 286 ***************************************************************************/ 287 288 static protected class DirectWriteFile : File 289 { 290 /*********************************************************************** 291 292 Opens a direct-write file at the specified path. 293 294 Params: 295 path = path at which to open file 296 style = file open mode 297 298 Throws: 299 IOException on error opening the file 300 301 ***********************************************************************/ 302 303 override public void open (cstring path, Style style = this.WriteCreate) 304 { 305 if (!super.open(path, style, O_DIRECT)) 306 this.error(); 307 } 308 } 309 310 /*************************************************************************** 311 312 Direct I/O file device to write to. 313 314 ***************************************************************************/ 315 316 private DirectWriteFile file; 317 318 /*************************************************************************** 319 320 Constructs a new BufferedDirectWriteFile. 321 322 If a path is specified, the file is open too. A good buffer size depends 323 mostly on the speed of the disk (memory and CPU). If the buffer is too 324 big, you will notice that writing seems to happen in long bursts, with 325 periods of a lot of buffer copying, and long wait periods writing to 326 disk. If the buffer is too small, the throughput will be too small, 327 resulting in bigger total write time. 328 329 32MiB have shown to be a decent value for a low end magnetic hard 330 drive, according to a few tests. 331 332 Params: 333 path = Path of the file to write to. 334 buffer = Buffer to use for writing, the length must be multiple of 335 the BLOCK_SIZE and the memory must be aligned to the 336 BLOCK_SIZE 337 338 ***************************************************************************/ 339 340 public this (cstring path, ubyte[] buffer) 341 { 342 this.setBuffer(buffer); 343 this.file = this.newFile(); 344 if (path.length > 0) 345 this.open(path); 346 } 347 348 /*************************************************************************** 349 350 Instantiates the file object to be used to write to. This method may be 351 overridden by derived classes, allowing different types of file to be 352 used with this class. 353 354 Returns: 355 file object to write to 356 357 ***************************************************************************/ 358 359 protected DirectWriteFile newFile ( ) 360 { 361 return new DirectWriteFile; 362 } 363 364 /*************************************************************************** 365 366 Constructs a new BufferedDirectWriteFile allocating a new buffer. 367 368 See documentation for this(cstring, ubyte[]) for details. 369 370 Params: 371 path = Path of the file to write to. 372 buffer_blocks = Buffer size in blocks (default 32MiB) 373 374 ***************************************************************************/ 375 376 public this (cstring path = null, size_t buffer_blocks = 32 * 2 * 1024) 377 { 378 this(path, createAlignedBuffer(buffer_blocks)); 379 } 380 381 /*************************************************************************** 382 383 Mixin for common functionality. 384 385 ***************************************************************************/ 386 387 mixin AlignedBufferedStream; 388 389 /*************************************************************************** 390 391 Open a BufferedDirectWriteFile file. 392 393 Params: 394 path = Path of the file to write to. 395 396 ***************************************************************************/ 397 398 public void open (cstring path) 399 { 400 verify(this.file.fileHandle == -1); 401 this.file.open(path); 402 this.free_index = 0; 403 } 404 405 /*************************************************************************** 406 407 Returns: 408 what File.toString() returns for the underlying File instance. 409 410 ***************************************************************************/ 411 412 public cstring path ( ) 413 { 414 return this.file.path(); 415 } 416 417 /*************************************************************************** 418 419 Return the host conduit. 420 421 ***************************************************************************/ 422 423 public IConduit conduit () 424 { 425 return this.file; 426 } 427 428 /*************************************************************************** 429 430 Close the underlying file, but calling flushWithPadding() and sync() 431 first. 432 433 ***************************************************************************/ 434 435 public void close () 436 { 437 if (this.file.fileHandle == -1) 438 return; 439 this.flushWithPadding(); 440 this.sync(); 441 this.file.close(); 442 } 443 444 /*************************************************************************** 445 446 Write to stream from a source array. The provided src content will be 447 written to the stream. 448 449 Returns the number of bytes written from src, which may be less than the 450 quantity provided. Eof is returned when an end-of-flow condition arises. 451 452 ***************************************************************************/ 453 454 public size_t write (const(void)[] src) 455 { 456 verify(this.file.fileHandle != -1); 457 458 size_t total = src.length; 459 460 if (src.length == 0) 461 return 0; 462 463 // Optimization: avoid extra copy if src is already aligned to the 464 // block size 465 if (this.free_index == 0) 466 { 467 while (src.length >= this.buffer.length) 468 { 469 if (this.isAligned(src.ptr)) 470 { 471 this.file.write(src[0 .. this.buffer.length]); 472 src = src[this.buffer.length .. $]; 473 } 474 } 475 } 476 477 while (this.free_index + src.length > this.buffer.length) 478 { 479 auto hole = this.buffer.length - this.free_index; 480 this.buffer[this.free_index .. $] = cast(ubyte[]) src[0 .. hole]; 481 this.free_index = this.buffer.length; 482 this.flushWithPadding(); 483 src = src[hole .. $]; 484 } 485 486 this.buffer[this.free_index .. this.free_index + src.length] = 487 cast(ubyte[]) src[0 .. $]; 488 this.free_index = this.free_index + src.length; 489 490 return total; 491 } 492 493 /*************************************************************************** 494 495 Return the upstream sink. 496 497 ***************************************************************************/ 498 499 public OutputStream output () 500 { 501 return file; 502 } 503 504 /************************************************************************** 505 506 Write the current buffer rounding to the block size (and setting the 507 padding bytes to padding_byte). 508 509 Params: 510 padding_byte = Byte to use to fill the padding. 511 512 Returns: 513 Number of bytes that have been flushed. 514 515 **************************************************************************/ 516 517 public size_t flushWithPadding ( ubyte padding_byte = 0 ) 518 { 519 verify(this.file.fileHandle != -1); 520 521 if (this.free_index == 0) 522 return 0; 523 524 if ((this.free_index % this.BLOCK_SIZE) != 0) 525 { 526 auto hole = BLOCK_SIZE - this.free_index % BLOCK_SIZE; 527 this.buffer[this.free_index .. this.free_index+hole] = padding_byte; 528 this.free_index += hole; 529 } 530 531 size_t written = 0; 532 while (written < this.free_index) 533 { 534 written =+ this.file.write(buffer[written .. this.free_index]); 535 } 536 537 this.free_index = 0; 538 539 return written; 540 } 541 542 /************************************************************************** 543 544 Instructs the OS to flush it's internal buffers to the disk device. 545 546 **************************************************************************/ 547 548 public void sync ( ) 549 { 550 verify(this.file.fileHandle != -1); 551 this.file.sync(); 552 } 553 554 } 555 556 557 /******************************************************************************* 558 559 Buffered file to do direct IO reads. 560 561 Please read the module documentation for details. 562 563 *******************************************************************************/ 564 565 public class BufferedDirectReadFile: InputStream 566 { 567 568 /*************************************************************************** 569 570 File to do direct IO reads. 571 572 Actually there is no way to open files with File specifying custom 573 flags that is not sub-classing. Bummer! 574 575 ***************************************************************************/ 576 577 static protected class DirectReadFile : File 578 { 579 override public void open (cstring path, Style style = this.ReadExisting) 580 { 581 if (!super.open(path, style, O_DIRECT)) 582 this.error(); 583 } 584 } 585 586 /*************************************************************************** 587 588 Direct I/O file device to read from. 589 590 ***************************************************************************/ 591 592 private DirectReadFile file; 593 594 /*************************************************************************** 595 596 Internal pointer to data we already read but is still pending, waiting 597 for a reader. 598 599 ***************************************************************************/ 600 601 protected size_t pending_index; 602 603 /*************************************************************************** 604 605 Constructs a new BufferedDirectReadFile. 606 607 See notes in BufferedDirectWriteFile about the default buffer size. 608 609 Params: 610 path = Path of the file to read from. 611 buffer = Buffer to use for reading, the length must be multiple of 612 the BLOCK_SIZE and the memory must be aligned to the 613 BLOCK_SIZE 614 615 ***************************************************************************/ 616 617 public this (cstring path, ubyte[] buffer) 618 { 619 this.setBuffer(buffer); 620 this.pending_index = 0; 621 this.file = this.newFile(); 622 if (path.length > 0) 623 this.file.open(path); 624 } 625 626 /*************************************************************************** 627 628 Instantiates the file object to be used to read from. This method may be 629 overridden by derived classes, allowing different types of file to be 630 used with this class. 631 632 Returns: 633 file object to read from 634 635 ***************************************************************************/ 636 637 protected DirectReadFile newFile ( ) 638 { 639 return new DirectReadFile; 640 } 641 642 /*************************************************************************** 643 644 Constructs a new BufferedDirectReadFile allocating a new buffer. 645 646 See documentation for this(cstring, ubyte[]) for details. 647 648 Params: 649 path = Path of the file to read from. 650 buffer_blocks = Buffer size in blocks (default 32MiB) 651 652 ***************************************************************************/ 653 654 public this (cstring path = null, size_t buffer_blocks = 32 * 2 * 1024) 655 { 656 this(path, createAlignedBuffer(buffer_blocks)); 657 } 658 659 /*************************************************************************** 660 661 Mixin for common functionality. 662 663 ***************************************************************************/ 664 665 mixin AlignedBufferedStream; 666 667 /*************************************************************************** 668 669 Open a BufferedDirectReadFile file. 670 671 Params: 672 path = Path of the file to read from. 673 674 ***************************************************************************/ 675 676 public void open (cstring path) 677 { 678 verify(this.file.fileHandle == -1); 679 this.file.open(path); 680 this.free_index = 0; 681 this.pending_index = 0; 682 } 683 684 /*************************************************************************** 685 686 Return the host conduit. 687 688 ***************************************************************************/ 689 690 public IConduit conduit () 691 { 692 return this.file; 693 } 694 695 /*************************************************************************** 696 697 Close the underlying file, but calling sync() first. 698 699 ***************************************************************************/ 700 701 public void close () 702 { 703 if (this.file.fileHandle == -1) 704 return; 705 this.sync(); 706 this.file.close(); 707 } 708 709 /*************************************************************************** 710 711 Read from stream to a destination array. The content read from the 712 stream will be stored in the provided dst. 713 714 Returns the number of bytes written to dst, which may be less than 715 dst.length. Eof is returned when an end-of-flow condition arises. 716 717 ***************************************************************************/ 718 719 public size_t read (void[] dst) 720 { 721 verify(this.file.fileHandle != -1); 722 723 if (dst.length == 0) 724 return 0; 725 726 size_t bytes_read = 0; 727 728 // Read from pending data (that was read in a previous read()) 729 auto pending_len = this.free_index - this.pending_index; 730 if (pending_len > 0) 731 { 732 if (dst.length <= pending_len) 733 { 734 pending_len = dst.length; 735 } 736 737 bytes_read += pending_len; 738 dst[0 .. pending_len] = this.buffer[this.pending_index .. 739 this.pending_index + pending_len]; 740 this.pending_index += pending_len; 741 dst = dst[pending_len .. $]; 742 } 743 744 // Reset if we don't have pending data to make next read more efficient 745 if (this.pending_index == this.free_index) 746 { 747 this.free_index = 0; 748 this.pending_index = 0; 749 } 750 751 // There is no pending data at this point, we work only with the 752 // free_index. Also, we know free_index and pending_index got reset to 0 753 754 // Optimization: avoid extra copy if dst is already aligned to the 755 // block size 756 if (this.free_index == 0 && this.isAligned(dst.ptr)) 757 { 758 while (dst.length >= this.buffer.length) 759 { 760 auto r = this.file.read(dst[0 .. this.buffer.length]); 761 762 if (r == this.file.Eof) 763 { 764 return bytes_read ? bytes_read : r; 765 } 766 767 bytes_read += r; 768 dst = dst[r .. $]; 769 } 770 } 771 772 // Read whole buffer chunks as long as needed 773 while (dst.length > 0) 774 { 775 auto r = this.file.read(buffer); 776 777 if (r == this.file.Eof) 778 { 779 return bytes_read ? bytes_read : r; 780 } 781 782 // Pass to the upper-level as if we just had read dst.length if we 783 // read more (and set the internal pending data state properly) 784 if (r >= dst.length) 785 { 786 this.pending_index = dst.length; 787 this.free_index = r; 788 r = dst.length; 789 } 790 791 bytes_read += r; 792 dst[0 .. r] = buffer[0 .. r]; 793 dst = dst[r .. $]; 794 } 795 796 return bytes_read; 797 } 798 799 /*************************************************************************** 800 801 Throws IOException because is not implemented. 802 803 ***************************************************************************/ 804 805 void[] load (size_t max = -1) 806 { 807 throw new IOException("load() not supported by " ~ 808 this.classinfo.name); 809 } 810 811 /*************************************************************************** 812 813 Return the upstream sink. 814 815 ***************************************************************************/ 816 817 public InputStream input () 818 { 819 return file; 820 } 821 822 /************************************************************************** 823 824 Instructs the OS to flush it's internal buffers to the disk device. 825 826 **************************************************************************/ 827 828 public void sync ( ) 829 { 830 verify(this.file.fileHandle != -1); 831 this.file.sync(); 832 } 833 834 }