1 /*******************************************************************************
2 
3     Direct I/O output and input streams
4 
5     This module provides an OutputStream (BufferedDirectWriteFile) and
6     a InputStream (BufferedDirectReadFile) to do direct I/O using Linux's
7     O_DIRECT flag.
8 
9     This kind of I/O is specially (and probably only) useful when you need to
10     dump a huge amount of data to disk as some kind of backup, i.e., data that
11     you'll probably won't ever read again (or at least in the short term). In
12     those cases, going through the page cache is not only probably slower
13     (because you are doing an extra memory copy), but it can
14     potentially freeze the whole system, if the sysctl vm.dirty_ratio is passed,
15     in that, programs will use their own time to write the page cache to disk.
16     If the data you need to write is small or you are going access it in the
17     short term and you are not experiencing freezes, you probably DON'T want to
18     use this module.
19 
20     Even when these kind of objects should probably derive from File (i.e. be
21     a device, and be selectable), given this type of I/O is very particular, and
22     a lot of details need to be taken into account, they are just implementing
23     the stream interfaces, and separately (for example, direct I/O is supposed
24     to be always blocking, unless you do async I/O too, because the data is
25     being copied directly from your buffer to the disk, is not going through the
26     page cache, which makes non-blocking I/O possible). Even some of the stream
27     interface methods are not implemented (seek(), flush(), copy() and load()).
28     This might change in the future, if needed.
29 
30     Direct I/O also must write complete sectors. This means buffers passed to
31     write(2) must be aligned to the block size too. This is why this class uses
32     internal buffering instead of using the original memory. This is just to
33     make user's lives easier, so they don't have to worry about alignment (if
34     you can, try to keep your buffers aligned though, there are optimizations to
35     avoid copies in those cases). When instantiating the classes, it is safe to
36     pass as buffer memory you just allocated through the GC, since the GC
37     returns memory that's always aligned to the page size (4096) when allocating
38     chunks larger than a page.
39 
40     Users must notice, though, that whole sectors will be written (512 bytes
41     each), so if they write, for example 100 bytes, the file will be still 512
42     bytes long and the final 412 bytes will contain garbage. truncate(2) or
43     ftruncate(2) might be used to truncate the file to its real size if desired.
44 
45     See_Also:
46         http://web.archive.org/web/20160317032821/http://www.westnet.com/~gsmith/content/linux-pdflush.htm
47         https://www.kernel.org/doc/Documentation/sysctl/vm.txt
48 
49     Copyright:
50         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
51         All rights reserved.
52 
53     License:
54         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
55         Alternatively, this file may be distributed under the terms of the Tango
56         3-Clause BSD License (see LICENSE_BSD.txt for details).
57 
58 *******************************************************************************/
59 
60 module ocean.io.device.DirectIO;
61 
62 
63 
64 
65 import ocean.meta.types.Qualifiers;
66 import ocean.core.Verify;
67 
68 import core.memory;
69 
70 import ocean.io.model.IConduit;
71 import ocean.io.device.File;
72 import ocean.core.ExceptionDefinitions: IOException;
73 import ocean.stdc.posix.fcntl : O_DIRECT; // Linux only
74 
75 version (unittest)
76 {
77     import ocean.core.Test;
78 }
79 
80 /*******************************************************************************
81 
82     Mixin template for classes that need to have buffers that are aligned to
83     a certain block size and don't support some operations.
84 
85 *******************************************************************************/
86 
87 private template AlignedBufferedStream ( )
88 {
89 
90     /***************************************************************************
91 
92         Block size.
93 
94         Almost every HDD out there has a block size of 512. But we should be
95         careful about this...
96 
97     ***************************************************************************/
98 
99     public enum { BLOCK_SIZE = 512 }
100 
101     /***************************************************************************
102 
103         Internal buffer (the size needs to be multiple of the block size).
104 
105     ***************************************************************************/
106 
107     protected ubyte[] buffer;
108 
109     /***************************************************************************
110 
111         Internal pointer to the next byte of the buffer that is free.
112 
113     ***************************************************************************/
114 
115     protected size_t free_index;
116 
117     /***************************************************************************
118 
119         Construct the buffer using an existing buffer.
120 
121         This method checks the buffer is properly aligned and the length is
122         a multiple of BLOCK_SIZE too.
123 
124         Params:
125             buffer = buffer to re-use for this aligned buffer.
126 
127     ***************************************************************************/
128 
129     protected void setBuffer ( ubyte[] buffer )
130     {
131         this.buffer = buffer;
132         // Throw an error if the buffer is not aligned to BLOCK_SIZE
133         if ( !this.isAligned(buffer.ptr) )
134             throw new IOException("Buffer is not aligned to BLOCK_SIZE, maybe "
135                     ~ "you should start using posix_memalign(3)");
136         // Throw an error if the buffer length is not a multiple of the
137         // BLOCK_SIZE
138         if ((buffer.length % BLOCK_SIZE) != 0)
139             throw new IOException("Buffer length is not multiple of the "
140                     ~ "BLOCK_SIZE");
141         this.free_index = 0;
142     }
143 
144     /***************************************************************************
145 
146         Construct the buffer with a specified size.
147 
148         This method checks the buffer is properly aligned and the lenght is
149         a multiple of BLOCK_SIZE too.
150 
151         Params:
152             buffer_blocks = Buffer size in blocks
153 
154     ***************************************************************************/
155 
156     protected void createBuffer ( size_t buffer_blocks )
157     {
158         this.setBuffer(createAlignedBuffer(buffer_blocks));
159     }
160 
161     /***************************************************************************
162 
163         Construct a buffer with length equal to the specified multiple of
164         BLOCK_SIZE. This method checks the buffer is properly aligned and the
165         length is a multiple of BLOCK_SIZE too.
166 
167         Note on buffer allocation:
168             O_DIRECT requires BLOCK_SIZE-aligned memory. BLOCK_SIZE is defined
169             as 512 bytes. Conveniently, the current GC implementation always
170             aligns memory to 512 bytes, so we can simply rely on that. In D2,
171             `new` is not guaranteed to provide block-aligned memory, but
172             `GC.malloc` does make that promise in its documentation:
173                 https://dlang.org/phobos/core_memory.html#.GC.malloc
174 
175             (This behaviour is  tested with a unittest, in case BLOCK_SIZE or
176             the GC's behaviour ever changes.)
177 
178         Params:
179             buffer_blocks = Buffer size in blocks
180 
181         Returns:
182             newly allocated buffer of the specified size
183 
184     ***************************************************************************/
185 
186     private static ubyte[] createAlignedBuffer ( size_t buffer_blocks )
187     out ( buffer )
188     {
189         assert(isAligned(buffer.ptr));
190         assert((buffer.length % BLOCK_SIZE) == 0);
191     }
192     do
193     {
194         auto bytes = buffer_blocks * BLOCK_SIZE;
195         auto buffer = cast(ubyte[]) GC.malloc(bytes)[0 .. bytes];
196         return buffer;
197     }
198 
199     /***************************************************************************
200 
201         Unittest to confirm during testing that the statement above about the
202         implementation of GC malloc returning 512-byte-aligned buffers is
203         correct in all builds.
204 
205         If this test ever fails, we should adapt createAlignedBuffer to use
206         posix_memalign(3) instead to allocate the memory.
207 
208     ***************************************************************************/
209 
210     unittest
211     {
212         auto buffer = createAlignedBuffer(1);
213         test(isAligned(buffer.ptr));
214     }
215 
216     /***************************************************************************
217 
218         Return true if the pointer is aligned to the block size.
219 
220     ***************************************************************************/
221 
222     static public bool isAligned ( const(void)* ptr )
223     {
224         return (cast(size_t) ptr & (BLOCK_SIZE - 1)) == 0;
225     }
226 
227     /***************************************************************************
228 
229         Throws an IOException because is not implemented.
230 
231     ***************************************************************************/
232 
233     public override long seek (long offset, Anchor anchor = Anchor.Begin)
234     {
235         throw new IOException("seek() not supported by " ~
236                 this.classinfo.name);
237     }
238 
239     /***************************************************************************
240 
241         Throws an IOException because is not implemented.
242 
243     ***************************************************************************/
244 
245     public override IOStream flush ()
246     {
247         throw new IOException("flush() not supported by " ~
248                 this.classinfo.name);
249     }
250 
251     /***************************************************************************
252 
253         Throws IOException because is not implemented.
254 
255         Only present in OutputStream, so we can't use the override keyword.
256 
257     ***************************************************************************/
258 
259     public OutputStream copy (InputStream src, size_t max = -1)
260     {
261         throw new IOException("copy() not supported by " ~
262                 this.classinfo.name);
263     }
264 
265 }
266 
267 
268 /*******************************************************************************
269 
270     Buffered file to do direct I/O writes.
271 
272     Please read the module documentation for details.
273 
274 *******************************************************************************/
275 
276 public class BufferedDirectWriteFile: OutputStream
277 {
278 
279     /***************************************************************************
280 
281         File to do direct IO writes.
282 
283         Actually there is no way to open files with File specifying custom
284         flags that is not sub-classing. Bummer!
285 
286     ***************************************************************************/
287 
288     static protected class DirectWriteFile : File
289     {
290         /***********************************************************************
291 
292             Opens a direct-write file at the specified path.
293 
294             Params:
295                 path = path at which to open file
296                 style = file open mode
297 
298             Throws:
299                 IOException on error opening the file
300 
301         ***********************************************************************/
302 
303         override public void open (cstring path, Style style = this.WriteCreate)
304         {
305             if (!super.open(path, style, O_DIRECT))
306                 this.error();
307         }
308     }
309 
310     /***************************************************************************
311 
312         Direct I/O file device to write to.
313 
314     ***************************************************************************/
315 
316     private DirectWriteFile file;
317 
318     /***************************************************************************
319 
320         Constructs a new BufferedDirectWriteFile.
321 
322         If a path is specified, the file is open too. A good buffer size depends
323         mostly on the speed of the disk (memory and CPU). If the buffer is too
324         big, you will notice that writing seems to happen in long bursts, with
325         periods of a lot of buffer copying, and long wait periods writing to
326         disk. If the buffer is too small, the throughput will be too small,
327         resulting in bigger total write time.
328 
329         32MiB have shown to be a decent value for a low end magnetic hard
330         drive, according to a few tests.
331 
332         Params:
333             path = Path of the file to write to.
334             buffer = Buffer to use for writing, the length must be multiple of
335                      the BLOCK_SIZE and the memory must be aligned to the
336                      BLOCK_SIZE
337 
338     ***************************************************************************/
339 
340     public this (cstring path, ubyte[] buffer)
341     {
342         this.setBuffer(buffer);
343         this.file = this.newFile();
344         if (path.length > 0)
345             this.open(path);
346     }
347 
348     /***************************************************************************
349 
350         Instantiates the file object to be used to write to. This method may be
351         overridden by derived classes, allowing different types of file to be
352         used with this class.
353 
354         Returns:
355             file object to write to
356 
357     ***************************************************************************/
358 
359     protected DirectWriteFile newFile ( )
360     {
361         return new DirectWriteFile;
362     }
363 
364     /***************************************************************************
365 
366         Constructs a new BufferedDirectWriteFile allocating a new buffer.
367 
368         See documentation for this(cstring, ubyte[]) for details.
369 
370         Params:
371             path = Path of the file to write to.
372             buffer_blocks = Buffer size in blocks (default 32MiB)
373 
374     ***************************************************************************/
375 
376     public this (cstring path = null, size_t buffer_blocks = 32 * 2 * 1024)
377     {
378         this(path, createAlignedBuffer(buffer_blocks));
379     }
380 
381     /***************************************************************************
382 
383         Mixin for common functionality.
384 
385     ***************************************************************************/
386 
387     mixin AlignedBufferedStream;
388 
389     /***************************************************************************
390 
391         Open a BufferedDirectWriteFile file.
392 
393         Params:
394             path = Path of the file to write to.
395 
396     ***************************************************************************/
397 
398     public void open (cstring path)
399     {
400         verify(this.file.fileHandle == -1);
401         this.file.open(path);
402         this.free_index = 0;
403     }
404 
405     /***************************************************************************
406 
407         Returns:
408             what File.toString() returns for the underlying File instance.
409 
410     ***************************************************************************/
411 
412     public cstring path ( )
413     {
414         return this.file.path();
415     }
416 
417     /***************************************************************************
418 
419         Return the host conduit.
420 
421     ***************************************************************************/
422 
423     public IConduit conduit ()
424     {
425         return this.file;
426     }
427 
428     /***************************************************************************
429 
430         Close the underlying file, but calling flushWithPadding() and sync()
431         first.
432 
433     ***************************************************************************/
434 
435     public void close ()
436     {
437         if (this.file.fileHandle == -1)
438             return;
439         this.flushWithPadding();
440         this.sync();
441         this.file.close();
442     }
443 
444     /***************************************************************************
445 
446         Write to stream from a source array. The provided src content will be
447         written to the stream.
448 
449         Returns the number of bytes written from src, which may be less than the
450         quantity provided. Eof is returned when an end-of-flow condition arises.
451 
452     ***************************************************************************/
453 
454     public size_t write (const(void)[] src)
455     {
456         verify(this.file.fileHandle != -1);
457 
458         size_t total = src.length;
459 
460         if (src.length == 0)
461             return 0;
462 
463         // Optimization: avoid extra copy if src is already aligned to the
464         // block size
465         if (this.free_index == 0)
466         {
467             while (src.length >= this.buffer.length)
468             {
469                 if (this.isAligned(src.ptr))
470                 {
471                     this.file.write(src[0 .. this.buffer.length]);
472                     src = src[this.buffer.length .. $];
473                 }
474             }
475         }
476 
477         while (this.free_index + src.length > this.buffer.length)
478         {
479             auto hole = this.buffer.length - this.free_index;
480             this.buffer[this.free_index .. $] = cast(ubyte[]) src[0 .. hole];
481             this.free_index = this.buffer.length;
482             this.flushWithPadding();
483             src = src[hole .. $];
484         }
485 
486         this.buffer[this.free_index .. this.free_index + src.length] =
487                 cast(ubyte[]) src[0 .. $];
488         this.free_index = this.free_index + src.length;
489 
490         return total;
491     }
492 
493     /***************************************************************************
494 
495         Return the upstream sink.
496 
497     ***************************************************************************/
498 
499     public OutputStream output ()
500     {
501         return file;
502     }
503 
504     /**************************************************************************
505 
506         Write the current buffer rounding to the block size (and setting the
507         padding bytes to padding_byte).
508 
509         Params:
510             padding_byte = Byte to use to fill the padding.
511 
512         Returns:
513             Number of bytes that have been flushed.
514 
515     **************************************************************************/
516 
517     public size_t flushWithPadding ( ubyte padding_byte = 0 )
518     {
519         verify(this.file.fileHandle != -1);
520 
521         if (this.free_index == 0)
522             return 0;
523 
524         if ((this.free_index % this.BLOCK_SIZE) != 0)
525         {
526             auto hole = BLOCK_SIZE - this.free_index % BLOCK_SIZE;
527             this.buffer[this.free_index .. this.free_index+hole] = padding_byte;
528             this.free_index += hole;
529         }
530 
531         size_t written = 0;
532         while (written < this.free_index)
533         {
534             written =+ this.file.write(buffer[written .. this.free_index]);
535         }
536 
537         this.free_index = 0;
538 
539         return written;
540     }
541 
542     /**************************************************************************
543 
544         Instructs the OS to flush it's internal buffers to the disk device.
545 
546     **************************************************************************/
547 
548     public void sync ( )
549     {
550         verify(this.file.fileHandle != -1);
551         this.file.sync();
552     }
553 
554 }
555 
556 
557 /*******************************************************************************
558 
559     Buffered file to do direct IO reads.
560 
561     Please read the module documentation for details.
562 
563 *******************************************************************************/
564 
565 public class BufferedDirectReadFile: InputStream
566 {
567 
568     /***************************************************************************
569 
570         File to do direct IO reads.
571 
572         Actually there is no way to open files with File specifying custom
573         flags that is not sub-classing. Bummer!
574 
575     ***************************************************************************/
576 
577     static protected class DirectReadFile : File
578     {
579         override public void open (cstring path, Style style = this.ReadExisting)
580         {
581             if (!super.open(path, style, O_DIRECT))
582                 this.error();
583         }
584     }
585 
586     /***************************************************************************
587 
588         Direct I/O file device to read from.
589 
590     ***************************************************************************/
591 
592     private DirectReadFile file;
593 
594     /***************************************************************************
595 
596         Internal pointer to data we already read but is still pending, waiting
597         for a reader.
598 
599     ***************************************************************************/
600 
601     protected size_t pending_index;
602 
603     /***************************************************************************
604 
605         Constructs a new BufferedDirectReadFile.
606 
607         See notes in BufferedDirectWriteFile about the default buffer size.
608 
609         Params:
610             path = Path of the file to read from.
611             buffer = Buffer to use for reading, the length must be multiple of
612                      the BLOCK_SIZE and the memory must be aligned to the
613                      BLOCK_SIZE
614 
615     ***************************************************************************/
616 
617     public this (cstring path, ubyte[] buffer)
618     {
619         this.setBuffer(buffer);
620         this.pending_index = 0;
621         this.file = this.newFile();
622         if (path.length > 0)
623             this.file.open(path);
624     }
625 
626     /***************************************************************************
627 
628         Instantiates the file object to be used to read from. This method may be
629         overridden by derived classes, allowing different types of file to be
630         used with this class.
631 
632         Returns:
633             file object to read from
634 
635     ***************************************************************************/
636 
637     protected DirectReadFile newFile ( )
638     {
639         return new DirectReadFile;
640     }
641 
642     /***************************************************************************
643 
644         Constructs a new BufferedDirectReadFile allocating a new buffer.
645 
646         See documentation for this(cstring, ubyte[]) for details.
647 
648         Params:
649             path = Path of the file to read from.
650             buffer_blocks = Buffer size in blocks (default 32MiB)
651 
652     ***************************************************************************/
653 
654     public this (cstring path = null, size_t buffer_blocks = 32 * 2 * 1024)
655     {
656         this(path, createAlignedBuffer(buffer_blocks));
657     }
658 
659     /***************************************************************************
660 
661         Mixin for common functionality.
662 
663     ***************************************************************************/
664 
665     mixin AlignedBufferedStream;
666 
667     /***************************************************************************
668 
669         Open a BufferedDirectReadFile file.
670 
671         Params:
672             path = Path of the file to read from.
673 
674     ***************************************************************************/
675 
676     public void open (cstring path)
677     {
678         verify(this.file.fileHandle == -1);
679         this.file.open(path);
680         this.free_index = 0;
681         this.pending_index = 0;
682     }
683 
684     /***************************************************************************
685 
686         Return the host conduit.
687 
688     ***************************************************************************/
689 
690     public IConduit conduit ()
691     {
692         return this.file;
693     }
694 
695     /***************************************************************************
696 
697         Close the underlying file, but calling sync() first.
698 
699     ***************************************************************************/
700 
701     public void close ()
702     {
703         if (this.file.fileHandle == -1)
704             return;
705         this.sync();
706         this.file.close();
707     }
708 
709     /***************************************************************************
710 
711         Read from stream to a destination array. The content read from the
712         stream will be stored in the provided dst.
713 
714         Returns the number of bytes written to dst, which may be less than
715         dst.length. Eof is returned when an end-of-flow condition arises.
716 
717     ***************************************************************************/
718 
719     public size_t read (void[] dst)
720     {
721         verify(this.file.fileHandle != -1);
722 
723         if (dst.length == 0)
724             return 0;
725 
726         size_t bytes_read = 0;
727 
728         // Read from pending data (that was read in a previous read())
729         auto pending_len = this.free_index - this.pending_index;
730         if (pending_len > 0)
731         {
732             if (dst.length <= pending_len)
733             {
734                 pending_len = dst.length;
735             }
736 
737             bytes_read += pending_len;
738             dst[0 .. pending_len] = this.buffer[this.pending_index ..
739                                                 this.pending_index + pending_len];
740             this.pending_index += pending_len;
741             dst = dst[pending_len .. $];
742         }
743 
744         // Reset if we don't have pending data to make next read more efficient
745         if (this.pending_index == this.free_index)
746         {
747             this.free_index = 0;
748             this.pending_index = 0;
749         }
750 
751         // There is no pending data at this point, we work only with the
752         // free_index. Also, we know free_index and pending_index got reset to 0
753 
754         // Optimization: avoid extra copy if dst is already aligned to the
755         // block size
756         if (this.free_index == 0 && this.isAligned(dst.ptr))
757         {
758             while (dst.length >= this.buffer.length)
759             {
760                 auto r = this.file.read(dst[0 .. this.buffer.length]);
761 
762                 if (r == this.file.Eof)
763                 {
764                     return bytes_read ? bytes_read : r;
765                 }
766 
767                 bytes_read += r;
768                 dst = dst[r .. $];
769             }
770         }
771 
772         // Read whole buffer chunks as long as needed
773         while (dst.length > 0)
774         {
775             auto r = this.file.read(buffer);
776 
777             if (r == this.file.Eof)
778             {
779                 return bytes_read ? bytes_read : r;
780             }
781 
782             // Pass to the upper-level as if we just had read dst.length if we
783             // read more (and set the internal pending data state properly)
784             if (r >= dst.length)
785             {
786                 this.pending_index = dst.length;
787                 this.free_index = r;
788                 r = dst.length;
789             }
790 
791             bytes_read += r;
792             dst[0 .. r] = buffer[0 .. r];
793             dst = dst[r .. $];
794         }
795 
796         return bytes_read;
797     }
798 
799     /***************************************************************************
800 
801         Throws IOException because is not implemented.
802 
803     ***************************************************************************/
804 
805     void[] load (size_t max = -1)
806     {
807         throw new IOException("load() not supported by " ~
808                 this.classinfo.name);
809     }
810 
811     /***************************************************************************
812 
813         Return the upstream sink.
814 
815     ***************************************************************************/
816 
817     public InputStream input ()
818     {
819         return file;
820     }
821 
822     /**************************************************************************
823 
824         Instructs the OS to flush it's internal buffers to the disk device.
825 
826     **************************************************************************/
827 
828     public void sync ( )
829     {
830         verify(this.file.fileHandle != -1);
831         this.file.sync();
832     }
833 
834 }