1 /*******************************************************************************
2 3 Direct I/O output and input streams
4 5 This module provides an OutputStream (BufferedDirectWriteFile) and
6 a InputStream (BufferedDirectReadFile) to do direct I/O using Linux's
7 O_DIRECT flag.
8 9 This kind of I/O is specially (and probably only) useful when you need to
10 dump a huge amount of data to disk as some kind of backup, i.e., data that
11 you'll probably won't ever read again (or at least in the short term). In
12 those cases, going through the page cache is not only probably slower
13 (because you are doing an extra memory copy), but it can
14 potentially freeze the whole system, if the sysctl vm.dirty_ratio is passed,
15 in that, programs will use their own time to write the page cache to disk.
16 If the data you need to write is small or you are going access it in the
17 short term and you are not experiencing freezes, you probably DON'T want to
18 use this module.
19 20 Even when these kind of objects should probably derive from File (i.e. be
21 a device, and be selectable), given this type of I/O is very particular, and
22 a lot of details need to be taken into account, they are just implementing
23 the stream interfaces, and separately (for example, direct I/O is supposed
24 to be always blocking, unless you do async I/O too, because the data is
25 being copied directly from your buffer to the disk, is not going through the
26 page cache, which makes non-blocking I/O possible). Even some of the stream
27 interface methods are not implemented (seek(), flush(), copy() and load()).
28 This might change in the future, if needed.
29 30 Direct I/O also must write complete sectors. This means buffers passed to
31 write(2) must be aligned to the block size too. This is why this class uses
32 internal buffering instead of using the original memory. This is just to
33 make user's lives easier, so they don't have to worry about alignment (if
34 you can, try to keep your buffers aligned though, there are optimizations to
35 avoid copies in those cases). When instantiating the classes, it is safe to
36 pass as buffer memory you just allocated through the GC, since the GC
37 returns memory that's always aligned to the page size (4096) when allocating
38 chunks larger than a page.
39 40 Users must notice, though, that whole sectors will be written (512 bytes
41 each), so if they write, for example 100 bytes, the file will be still 512
42 bytes long and the final 412 bytes will contain garbage. truncate(2) or
43 ftruncate(2) might be used to truncate the file to its real size if desired.
44 45 See_Also:
46 http://web.archive.org/web/20160317032821/http://www.westnet.com/~gsmith/content/linux-pdflush.htm
47 https://www.kernel.org/doc/Documentation/sysctl/vm.txt
48 49 Copyright:
50 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
51 All rights reserved.
52 53 License:
54 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
55 Alternatively, this file may be distributed under the terms of the Tango
56 3-Clause BSD License (see LICENSE_BSD.txt for details).
57 58 *******************************************************************************/59 60 moduleocean.io.device.DirectIO;
61 62 63 64 65 importocean.meta.types.Qualifiers;
66 importocean.core.Verify;
67 68 importcore.memory;
69 70 importocean.io.model.IConduit;
71 importocean.io.device.File;
72 importocean.core.ExceptionDefinitions: IOException;
73 importocean.stdc.posix.fcntl : O_DIRECT; // Linux only74 75 version (unittest)
76 {
77 importocean.core.Test;
78 }
79 80 /*******************************************************************************
81 82 Mixin template for classes that need to have buffers that are aligned to
83 a certain block size and don't support some operations.
84 85 *******************************************************************************/86 87 privatetemplateAlignedBufferedStream ( )
88 {
89 90 /***************************************************************************
91 92 Block size.
93 94 Almost every HDD out there has a block size of 512. But we should be
95 careful about this...
96 97 ***************************************************************************/98 99 publicenum { BLOCK_SIZE = 512 }
100 101 /***************************************************************************
102 103 Internal buffer (the size needs to be multiple of the block size).
104 105 ***************************************************************************/106 107 protectedubyte[] buffer;
108 109 /***************************************************************************
110 111 Internal pointer to the next byte of the buffer that is free.
112 113 ***************************************************************************/114 115 protectedsize_tfree_index;
116 117 /***************************************************************************
118 119 Construct the buffer using an existing buffer.
120 121 This method checks the buffer is properly aligned and the length is
122 a multiple of BLOCK_SIZE too.
123 124 Params:
125 buffer = buffer to re-use for this aligned buffer.
126 127 ***************************************************************************/128 129 protectedvoidsetBuffer ( ubyte[] buffer )
130 {
131 this.buffer = buffer;
132 // Throw an error if the buffer is not aligned to BLOCK_SIZE133 if ( !this.isAligned(buffer.ptr) )
134 thrownewIOException("Buffer is not aligned to BLOCK_SIZE, maybe "135 ~ "you should start using posix_memalign(3)");
136 // Throw an error if the buffer length is not a multiple of the137 // BLOCK_SIZE138 if ((buffer.length % BLOCK_SIZE) != 0)
139 thrownewIOException("Buffer length is not multiple of the "140 ~ "BLOCK_SIZE");
141 this.free_index = 0;
142 }
143 144 /***************************************************************************
145 146 Construct the buffer with a specified size.
147 148 This method checks the buffer is properly aligned and the lenght is
149 a multiple of BLOCK_SIZE too.
150 151 Params:
152 buffer_blocks = Buffer size in blocks
153 154 ***************************************************************************/155 156 protectedvoidcreateBuffer ( size_tbuffer_blocks )
157 {
158 this.setBuffer(createAlignedBuffer(buffer_blocks));
159 }
160 161 /***************************************************************************
162 163 Construct a buffer with length equal to the specified multiple of
164 BLOCK_SIZE. This method checks the buffer is properly aligned and the
165 length is a multiple of BLOCK_SIZE too.
166 167 Note on buffer allocation:
168 O_DIRECT requires BLOCK_SIZE-aligned memory. BLOCK_SIZE is defined
169 as 512 bytes. Conveniently, the current GC implementation always
170 aligns memory to 512 bytes, so we can simply rely on that. In D2,
171 `new` is not guaranteed to provide block-aligned memory, but
172 `GC.malloc` does make that promise in its documentation:
173 https://dlang.org/phobos/core_memory.html#.GC.malloc
174 175 (This behaviour is tested with a unittest, in case BLOCK_SIZE or
176 the GC's behaviour ever changes.)
177 178 Params:
179 buffer_blocks = Buffer size in blocks
180 181 Returns:
182 newly allocated buffer of the specified size
183 184 ***************************************************************************/185 186 privatestaticubyte[] createAlignedBuffer ( size_tbuffer_blocks )
187 out ( buffer )
188 {
189 assert(isAligned(buffer.ptr));
190 assert((buffer.length % BLOCK_SIZE) == 0);
191 }
192 do193 {
194 autobytes = buffer_blocks * BLOCK_SIZE;
195 autobuffer = cast(ubyte[]) GC.malloc(bytes)[0 .. bytes];
196 returnbuffer;
197 }
198 199 /***************************************************************************
200 201 Unittest to confirm during testing that the statement above about the
202 implementation of GC malloc returning 512-byte-aligned buffers is
203 correct in all builds.
204 205 If this test ever fails, we should adapt createAlignedBuffer to use
206 posix_memalign(3) instead to allocate the memory.
207 208 ***************************************************************************/209 210 unittest211 {
212 autobuffer = createAlignedBuffer(1);
213 test(isAligned(buffer.ptr));
214 }
215 216 /***************************************************************************
217 218 Return true if the pointer is aligned to the block size.
219 220 ***************************************************************************/221 222 staticpublicboolisAligned ( const(void)* ptr )
223 {
224 return (cast(size_t) ptr & (BLOCK_SIZE - 1)) == 0;
225 }
226 227 /***************************************************************************
228 229 Throws an IOException because is not implemented.
230 231 ***************************************************************************/232 233 publicoverridelongseek (longoffset, Anchoranchor = Anchor.Begin)
234 {
235 thrownewIOException("seek() not supported by " ~
236 this.classinfo.name);
237 }
238 239 /***************************************************************************
240 241 Throws an IOException because is not implemented.
242 243 ***************************************************************************/244 245 publicoverrideIOStreamflush ()
246 {
247 thrownewIOException("flush() not supported by " ~
248 this.classinfo.name);
249 }
250 251 /***************************************************************************
252 253 Throws IOException because is not implemented.
254 255 Only present in OutputStream, so we can't use the override keyword.
256 257 ***************************************************************************/258 259 publicOutputStreamcopy (InputStreamsrc, size_tmax = -1)
260 {
261 thrownewIOException("copy() not supported by " ~
262 this.classinfo.name);
263 }
264 265 }
266 267 268 /*******************************************************************************
269 270 Buffered file to do direct I/O writes.
271 272 Please read the module documentation for details.
273 274 *******************************************************************************/275 276 publicclassBufferedDirectWriteFile: OutputStream277 {
278 279 /***************************************************************************
280 281 File to do direct IO writes.
282 283 Actually there is no way to open files with File specifying custom
284 flags that is not sub-classing. Bummer!
285 286 ***************************************************************************/287 288 staticprotectedclassDirectWriteFile : File289 {
290 /***********************************************************************
291 292 Opens a direct-write file at the specified path.
293 294 Params:
295 path = path at which to open file
296 style = file open mode
297 298 Throws:
299 IOException on error opening the file
300 301 ***********************************************************************/302 303 overridepublicvoidopen (cstringpath, Stylestyle = this.WriteCreate)
304 {
305 if (!super.open(path, style, O_DIRECT))
306 this.error();
307 }
308 }
309 310 /***************************************************************************
311 312 Direct I/O file device to write to.
313 314 ***************************************************************************/315 316 privateDirectWriteFilefile;
317 318 /***************************************************************************
319 320 Constructs a new BufferedDirectWriteFile.
321 322 If a path is specified, the file is open too. A good buffer size depends
323 mostly on the speed of the disk (memory and CPU). If the buffer is too
324 big, you will notice that writing seems to happen in long bursts, with
325 periods of a lot of buffer copying, and long wait periods writing to
326 disk. If the buffer is too small, the throughput will be too small,
327 resulting in bigger total write time.
328 329 32MiB have shown to be a decent value for a low end magnetic hard
330 drive, according to a few tests.
331 332 Params:
333 path = Path of the file to write to.
334 buffer = Buffer to use for writing, the length must be multiple of
335 the BLOCK_SIZE and the memory must be aligned to the
336 BLOCK_SIZE
337 338 ***************************************************************************/339 340 publicthis (cstringpath, ubyte[] buffer)
341 {
342 this.setBuffer(buffer);
343 this.file = this.newFile();
344 if (path.length > 0)
345 this.open(path);
346 }
347 348 /***************************************************************************
349 350 Instantiates the file object to be used to write to. This method may be
351 overridden by derived classes, allowing different types of file to be
352 used with this class.
353 354 Returns:
355 file object to write to
356 357 ***************************************************************************/358 359 protectedDirectWriteFilenewFile ( )
360 {
361 returnnewDirectWriteFile;
362 }
363 364 /***************************************************************************
365 366 Constructs a new BufferedDirectWriteFile allocating a new buffer.
367 368 See documentation for this(cstring, ubyte[]) for details.
369 370 Params:
371 path = Path of the file to write to.
372 buffer_blocks = Buffer size in blocks (default 32MiB)
373 374 ***************************************************************************/375 376 publicthis (cstringpath = null, size_tbuffer_blocks = 32 * 2 * 1024)
377 {
378 this(path, createAlignedBuffer(buffer_blocks));
379 }
380 381 /***************************************************************************
382 383 Mixin for common functionality.
384 385 ***************************************************************************/386 387 mixinAlignedBufferedStream;
388 389 /***************************************************************************
390 391 Open a BufferedDirectWriteFile file.
392 393 Params:
394 path = Path of the file to write to.
395 396 ***************************************************************************/397 398 publicvoidopen (cstringpath)
399 {
400 verify(this.file.fileHandle == -1);
401 this.file.open(path);
402 this.free_index = 0;
403 }
404 405 /***************************************************************************
406 407 Returns:
408 what File.toString() returns for the underlying File instance.
409 410 ***************************************************************************/411 412 publiccstringpath ( )
413 {
414 returnthis.file.path();
415 }
416 417 /***************************************************************************
418 419 Return the host conduit.
420 421 ***************************************************************************/422 423 publicIConduitconduit ()
424 {
425 returnthis.file;
426 }
427 428 /***************************************************************************
429 430 Close the underlying file, but calling flushWithPadding() and sync()
431 first.
432 433 ***************************************************************************/434 435 publicvoidclose ()
436 {
437 if (this.file.fileHandle == -1)
438 return;
439 this.flushWithPadding();
440 this.sync();
441 this.file.close();
442 }
443 444 /***************************************************************************
445 446 Write to stream from a source array. The provided src content will be
447 written to the stream.
448 449 Returns the number of bytes written from src, which may be less than the
450 quantity provided. Eof is returned when an end-of-flow condition arises.
451 452 ***************************************************************************/453 454 publicsize_twrite (const(void)[] src)
455 {
456 verify(this.file.fileHandle != -1);
457 458 size_ttotal = src.length;
459 460 if (src.length == 0)
461 return0;
462 463 // Optimization: avoid extra copy if src is already aligned to the464 // block size465 if (this.free_index == 0)
466 {
467 while (src.length >= this.buffer.length)
468 {
469 if (this.isAligned(src.ptr))
470 {
471 this.file.write(src[0 .. this.buffer.length]);
472 src = src[this.buffer.length .. $];
473 }
474 }
475 }
476 477 while (this.free_index + src.length > this.buffer.length)
478 {
479 autohole = this.buffer.length - this.free_index;
480 this.buffer[this.free_index .. $] = cast(ubyte[]) src[0 .. hole];
481 this.free_index = this.buffer.length;
482 this.flushWithPadding();
483 src = src[hole .. $];
484 }
485 486 this.buffer[this.free_index .. this.free_index + src.length] =
487 cast(ubyte[]) src[0 .. $];
488 this.free_index = this.free_index + src.length;
489 490 returntotal;
491 }
492 493 /***************************************************************************
494 495 Return the upstream sink.
496 497 ***************************************************************************/498 499 publicOutputStreamoutput ()
500 {
501 returnfile;
502 }
503 504 /**************************************************************************
505 506 Write the current buffer rounding to the block size (and setting the
507 padding bytes to padding_byte).
508 509 Params:
510 padding_byte = Byte to use to fill the padding.
511 512 Returns:
513 Number of bytes that have been flushed.
514 515 **************************************************************************/516 517 publicsize_tflushWithPadding ( ubytepadding_byte = 0 )
518 {
519 verify(this.file.fileHandle != -1);
520 521 if (this.free_index == 0)
522 return0;
523 524 if ((this.free_index % this.BLOCK_SIZE) != 0)
525 {
526 autohole = BLOCK_SIZE - this.free_index % BLOCK_SIZE;
527 this.buffer[this.free_index .. this.free_index+hole] = padding_byte;
528 this.free_index += hole;
529 }
530 531 size_twritten = 0;
532 while (written < this.free_index)
533 {
534 written =+ this.file.write(buffer[written .. this.free_index]);
535 }
536 537 this.free_index = 0;
538 539 returnwritten;
540 }
541 542 /**************************************************************************
543 544 Instructs the OS to flush it's internal buffers to the disk device.
545 546 **************************************************************************/547 548 publicvoidsync ( )
549 {
550 verify(this.file.fileHandle != -1);
551 this.file.sync();
552 }
553 554 }
555 556 557 /*******************************************************************************
558 559 Buffered file to do direct IO reads.
560 561 Please read the module documentation for details.
562 563 *******************************************************************************/564 565 publicclassBufferedDirectReadFile: InputStream566 {
567 568 /***************************************************************************
569 570 File to do direct IO reads.
571 572 Actually there is no way to open files with File specifying custom
573 flags that is not sub-classing. Bummer!
574 575 ***************************************************************************/576 577 staticprotectedclassDirectReadFile : File578 {
579 overridepublicvoidopen (cstringpath, Stylestyle = this.ReadExisting)
580 {
581 if (!super.open(path, style, O_DIRECT))
582 this.error();
583 }
584 }
585 586 /***************************************************************************
587 588 Direct I/O file device to read from.
589 590 ***************************************************************************/591 592 privateDirectReadFilefile;
593 594 /***************************************************************************
595 596 Internal pointer to data we already read but is still pending, waiting
597 for a reader.
598 599 ***************************************************************************/600 601 protectedsize_tpending_index;
602 603 /***************************************************************************
604 605 Constructs a new BufferedDirectReadFile.
606 607 See notes in BufferedDirectWriteFile about the default buffer size.
608 609 Params:
610 path = Path of the file to read from.
611 buffer = Buffer to use for reading, the length must be multiple of
612 the BLOCK_SIZE and the memory must be aligned to the
613 BLOCK_SIZE
614 615 ***************************************************************************/616 617 publicthis (cstringpath, ubyte[] buffer)
618 {
619 this.setBuffer(buffer);
620 this.pending_index = 0;
621 this.file = this.newFile();
622 if (path.length > 0)
623 this.file.open(path);
624 }
625 626 /***************************************************************************
627 628 Instantiates the file object to be used to read from. This method may be
629 overridden by derived classes, allowing different types of file to be
630 used with this class.
631 632 Returns:
633 file object to read from
634 635 ***************************************************************************/636 637 protectedDirectReadFilenewFile ( )
638 {
639 returnnewDirectReadFile;
640 }
641 642 /***************************************************************************
643 644 Constructs a new BufferedDirectReadFile allocating a new buffer.
645 646 See documentation for this(cstring, ubyte[]) for details.
647 648 Params:
649 path = Path of the file to read from.
650 buffer_blocks = Buffer size in blocks (default 32MiB)
651 652 ***************************************************************************/653 654 publicthis (cstringpath = null, size_tbuffer_blocks = 32 * 2 * 1024)
655 {
656 this(path, createAlignedBuffer(buffer_blocks));
657 }
658 659 /***************************************************************************
660 661 Mixin for common functionality.
662 663 ***************************************************************************/664 665 mixinAlignedBufferedStream;
666 667 /***************************************************************************
668 669 Open a BufferedDirectReadFile file.
670 671 Params:
672 path = Path of the file to read from.
673 674 ***************************************************************************/675 676 publicvoidopen (cstringpath)
677 {
678 verify(this.file.fileHandle == -1);
679 this.file.open(path);
680 this.free_index = 0;
681 this.pending_index = 0;
682 }
683 684 /***************************************************************************
685 686 Return the host conduit.
687 688 ***************************************************************************/689 690 publicIConduitconduit ()
691 {
692 returnthis.file;
693 }
694 695 /***************************************************************************
696 697 Close the underlying file, but calling sync() first.
698 699 ***************************************************************************/700 701 publicvoidclose ()
702 {
703 if (this.file.fileHandle == -1)
704 return;
705 this.sync();
706 this.file.close();
707 }
708 709 /***************************************************************************
710 711 Read from stream to a destination array. The content read from the
712 stream will be stored in the provided dst.
713 714 Returns the number of bytes written to dst, which may be less than
715 dst.length. Eof is returned when an end-of-flow condition arises.
716 717 ***************************************************************************/718 719 publicsize_tread (void[] dst)
720 {
721 verify(this.file.fileHandle != -1);
722 723 if (dst.length == 0)
724 return0;
725 726 size_tbytes_read = 0;
727 728 // Read from pending data (that was read in a previous read())729 autopending_len = this.free_index - this.pending_index;
730 if (pending_len > 0)
731 {
732 if (dst.length <= pending_len)
733 {
734 pending_len = dst.length;
735 }
736 737 bytes_read += pending_len;
738 dst[0 .. pending_len] = this.buffer[this.pending_index ..
739 this.pending_index + pending_len];
740 this.pending_index += pending_len;
741 dst = dst[pending_len .. $];
742 }
743 744 // Reset if we don't have pending data to make next read more efficient745 if (this.pending_index == this.free_index)
746 {
747 this.free_index = 0;
748 this.pending_index = 0;
749 }
750 751 // There is no pending data at this point, we work only with the752 // free_index. Also, we know free_index and pending_index got reset to 0753 754 // Optimization: avoid extra copy if dst is already aligned to the755 // block size756 if (this.free_index == 0 && this.isAligned(dst.ptr))
757 {
758 while (dst.length >= this.buffer.length)
759 {
760 autor = this.file.read(dst[0 .. this.buffer.length]);
761 762 if (r == this.file.Eof)
763 {
764 returnbytes_read ? bytes_read : r;
765 }
766 767 bytes_read += r;
768 dst = dst[r .. $];
769 }
770 }
771 772 // Read whole buffer chunks as long as needed773 while (dst.length > 0)
774 {
775 autor = this.file.read(buffer);
776 777 if (r == this.file.Eof)
778 {
779 returnbytes_read ? bytes_read : r;
780 }
781 782 // Pass to the upper-level as if we just had read dst.length if we783 // read more (and set the internal pending data state properly)784 if (r >= dst.length)
785 {
786 this.pending_index = dst.length;
787 this.free_index = r;
788 r = dst.length;
789 }
790 791 bytes_read += r;
792 dst[0 .. r] = buffer[0 .. r];
793 dst = dst[r .. $];
794 }
795 796 returnbytes_read;
797 }
798 799 /***************************************************************************
800 801 Throws IOException because is not implemented.
802 803 ***************************************************************************/804 805 void[] load (size_tmax = -1)
806 {
807 thrownewIOException("load() not supported by " ~
808 this.classinfo.name);
809 }
810 811 /***************************************************************************
812 813 Return the upstream sink.
814 815 ***************************************************************************/816 817 publicInputStreaminput ()
818 {
819 returnfile;
820 }
821 822 /**************************************************************************
823 824 Instructs the OS to flush it's internal buffers to the disk device.
825 826 **************************************************************************/827 828 publicvoidsync ( )
829 {
830 verify(this.file.fileHandle != -1);
831 this.file.sync();
832 }
833 834 }