1 /*******************************************************************************
2 
3     File-based queue implementation.
4 
5     The flexible file queue can be set to either open any existing files it
6     finds, or always delete existing files when it is created using the
7     open_existing parameter in the contructor.
8 
9     Note that the queue file is deleted in the following cases:
10         1. Upon calling the clear() method.
11         2. Upon calling pop() on an empty queue.
12 
13     Copyright:
14         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
15         All rights reserved.
16 
17     License:
18         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
19         Alternatively, this file may be distributed under the terms of the Tango
20         3-Clause BSD License (see LICENSE_BSD.txt for details).
21 
22 *******************************************************************************/
23 
24 module ocean.util.container.queue.FlexibleFileQueue;
25 
26 
27 import ocean.meta.types.Qualifiers;
28 import ocean.core.Buffer;
29 import ocean.util.container.queue.FlexibleRingQueue;
30 import ocean.util.container.queue.model.IByteQueue;
31 import ocean.util.container.queue.model.IQueueInfo;
32 import ocean.util.log.Logger;
33 import ocean.io.device.File;
34 import Filesystem = ocean.io.Path;
35 import ocean.io.stream.Buffered;
36 
37 
38 private Logger log;
39 static this ( )
40 {
41     log = Log.lookup("ocean.util.container.queue.FlexibleFileQueue");
42 }
43 
44 
45 
46 public class FlexibleFileQueue : IByteQueue
47 {
48     import ocean.core.Verify;
49 
50     /***************************************************************************
51 
52         Header for queue items
53 
54     ***************************************************************************/
55 
56     private struct Header
57     {
58         size_t length;
59 
60         static Header* fromSlice ( void[] slice )
61         {
62             return cast(Header*) slice.ptr;
63         }
64     }
65 
66     /***************************************************************************
67 
68         Buffer used to support const(void)[] push ( size_t )
69 
70     ***************************************************************************/
71 
72     private Buffer!(void) slice_push_buffer;
73 
74     /***************************************************************************
75 
76         Path to write to
77 
78     ***************************************************************************/
79 
80     private mstring path;
81 
82     /***************************************************************************
83 
84         Extension of the index file (appended to `path`)
85 
86     ***************************************************************************/
87 
88     private static immutable istring IndexExtension = ".index";
89 
90     /***************************************************************************
91 
92         Path to the index file to write to if we want to be able to reopen
93         any saved data files
94 
95     ***************************************************************************/
96 
97     private mstring index_path;
98 
99     /***************************************************************************
100 
101         External file that is being written to
102 
103     ***************************************************************************/
104 
105     private File file_out;
106 
107     /***************************************************************************
108 
109         External file that is being read from
110 
111     ***************************************************************************/
112 
113     private File file_in;
114 
115     /***************************************************************************
116 
117         External index file that is being read from
118 
119     ***************************************************************************/
120 
121     private File file_index;
122 
123     /***************************************************************************
124 
125         Buffered output stream to write to the file
126 
127     ***************************************************************************/
128 
129     private BufferedOutput ext_out;
130 
131     /***************************************************************************
132 
133         Buffered input stream to write to the file
134 
135     ***************************************************************************/
136 
137     private BufferedInput ext_in;
138 
139     /***************************************************************************
140 
141         Unread bytes in the file
142 
143     ***************************************************************************/
144 
145     private size_t bytes_in_file;
146 
147     /***************************************************************************
148 
149         Unread items in the file
150 
151     ***************************************************************************/
152 
153     private size_t items_in_file;
154 
155     /***************************************************************************
156 
157         Size of the file read buffer. It is not possible to push items which are
158         larger than this buffer size.
159 
160     ***************************************************************************/
161 
162     private size_t size;
163 
164     /***************************************************************************
165 
166         bool set if the files are currently open
167 
168     ***************************************************************************/
169 
170     private bool files_open;
171 
172     /***************************************************************************
173 
174         bool set if we want to reopen any saved data files if we restart the
175         application.
176 
177     ***************************************************************************/
178 
179     private bool open_existing;
180 
181     /***************************************************************************
182 
183         buffer used to read in the index file
184 
185     ***************************************************************************/
186 
187     private void[] content;
188 
189     /***************************************************************************
190 
191         Constructor. Creates and opens the files and buffered inputs and
192         outputs. Moves the file pointers to the correct position in the files
193         and marks the files as open.
194 
195         Params:
196             path  = path to the file that will be used to swap the queue
197             size = size of file read buffer (== maximum item size)
198             open_existing = do we reopen any existing file queues
199 
200     ***************************************************************************/
201 
202     public this ( cstring path, size_t size, bool open_existing = false )
203     {
204         this.path  = path.dup;
205         this.index_path = path.dup ~ IndexExtension;
206         this.size = size;
207         this.open_existing = open_existing;
208 
209         if (this.open_existing && this.exists(this.path))
210         {
211             this.file_out = new File(this.path, File.WriteAppending);
212             this.file_index = new File(this.index_path, File.ReadWriteExisting);
213             this.readIndex();
214         }
215         else
216         {
217             this.file_out = new File(this.path, File.WriteCreate);
218             if (this.open_existing)
219             {
220                 this.file_index = new File(this.index_path, File.ReadWriteCreate);
221             }
222         }
223 
224         this.file_in = new File(this.path, File.ReadExisting);
225 
226         this.ext_out = new BufferedOutput(this.file_out);
227         this.ext_out.seek(this.file_out.length);
228         this.ext_in = new BufferedInput(this.file_in, this.size+Header.sizeof);
229         this.ext_in.seek(this.file_out.length - this.bytes_in_file);
230 
231         this.files_open = true;
232     }
233 
234 
235     /***************************************************************************
236 
237         Checks whether a file and the associated index file exist at the
238         specified path.
239 
240         Note that this function allocates on every call! It is only intended to
241         be called during application startup.
242 
243         Params:
244             path = path to check (".index" is appended to check for the
245                 corresponding index file)
246 
247         Returns:
248             true if the specified file and index file exist
249 
250     ***************************************************************************/
251 
252     public static bool exists ( cstring path )
253     {
254         return Filesystem.exists(path)
255             && Filesystem.exists(path ~ IndexExtension);
256     }
257 
258 
259     /***************************************************************************
260 
261         Pushes an item into the queue.
262 
263         Params:
264             item = data item to push
265 
266         Returns:
267             true if the item was pushed successfully, false if it didn't fit
268 
269     ***************************************************************************/
270 
271     public bool push ( in void[] item )
272     {
273         verify ( item.length <= this.size,
274                  "Read buffer too small to process this item");
275 
276         this.handleSliceBuffer();
277 
278         if (item.length == 0) return false;
279 
280         return this.filePush(item);
281     }
282 
283 
284     /***************************************************************************
285 
286         Reserves space for an item of <size> bytes on the queue but doesn't
287         fill the content. The caller is expected to fill in the content using
288         the returned slice.
289 
290         Params:
291             size = size of the space of the item that should be reserved
292 
293         Returns:
294             slice to the reserved space if it was successfully reserved,
295             else null
296 
297     ***************************************************************************/
298 
299     public void[] push ( size_t size )
300     {
301         this.handleSliceBuffer();
302 
303         this.slice_push_buffer.length = size;
304 
305         return this.slice_push_buffer[];
306     }
307 
308 
309     /***************************************************************************
310 
311         Reads an item from the queue.
312 
313         Params:
314             eat = whether to remove the item from the queue
315 
316         Returns:
317             item read from queue, may be null if queue is empty
318 
319     ***************************************************************************/
320 
321     private void[] getItem ( bool eat = true )
322     {
323         this.handleSliceBuffer();
324 
325         if (this.bytes_in_file == 0 && this.files_open)
326         {
327             this.closeExternal();
328             return null;
329         }
330 
331         if (this.bytes_in_file > 0) try
332         {
333             try this.ext_out.flush();
334             catch (Exception e)
335             {
336                 log.error("## ERROR: Can't flush file buffer: {}", e.message());
337                 return null;
338             }
339 
340             Header h;
341 
342             if (this.ext_in.readable() <= Header.sizeof && this.fill() == 0)
343             {
344                 return null;
345             }
346 
347             h = *Header.fromSlice(this.ext_in.slice(Header.sizeof, false));
348 
349             verify (h.length <= this.size, "Unrealistic size");
350 
351             if (h.length + Header.sizeof > this.ext_in.readable() &&
352                 this.fill() == 0)
353             {
354                 return null;
355             }
356 
357             if (eat)
358             {
359                 this.items_in_file -= 1;
360                 this.bytes_in_file -= Header.sizeof + h.length;
361 
362                 this.writeIndex();
363             }
364 
365             return this.ext_in.slice(Header.sizeof + h.length,
366                                                    eat)[Header.sizeof .. $];
367         }
368         catch (Exception e)
369         {
370             log.error("## ERROR: Failsafe catch triggered by exception: {} ({}:{})",
371                       e.message(), e.file, e.line);
372         }
373 
374         return null;
375     }
376 
377 
378     /***************************************************************************
379 
380         Popps the next element
381 
382         Returns:
383             item read from queue, may be null if queue is empty
384 
385     ***************************************************************************/
386 
387     public void[] pop ( )
388     {
389         return this.getItem();
390     }
391 
392 
393     /***************************************************************************
394 
395         Returns the element that would be popped next, without poppin' it
396 
397     ***************************************************************************/
398 
399     public void[] peek ( )
400     {
401         return this.getItem(false);
402     }
403 
404 
405     /***************************************************************************
406 
407         Fills the read buffer
408 
409         Returns:
410             How many new bytes were read from the file
411 
412     ***************************************************************************/
413 
414     private size_t fill ( )
415     {
416         this.ext_in.compress();
417 
418         auto bytes    = this.ext_in.populate();
419         auto readable = this.ext_in.readable;
420 
421         if ((bytes == 0 || bytes == File.Eof) && readable == 0)
422         {
423             this.closeExternal();
424             return 0;
425         }
426 
427         return bytes;
428     }
429 
430 
431     /***************************************************************************
432 
433         Finds out whether the provided number of bytes will fit in the queue.
434 
435         Due to the file swap, we have unlimited space, so always return true.
436 
437         Params:
438             bytes = size of item to check
439 
440         Returns:
441             always true
442 
443     ***************************************************************************/
444 
445     public bool willFit ( size_t bytes )
446     {
447         return true;
448     }
449 
450 
451     /***************************************************************************
452 
453         Returns:
454             total number of bytes used by queue (used space + free space)
455 
456     ***************************************************************************/
457 
458     public ulong total_space ( )
459     {
460         return 0;
461     }
462 
463 
464     /***************************************************************************
465 
466         Returns:
467             number of bytes stored in queue
468 
469     ***************************************************************************/
470 
471     public ulong used_space ( )
472     {
473         return this.bytes_in_file + this.slice_push_buffer.length;
474     }
475 
476 
477     /***************************************************************************
478 
479         Returns:
480             number of bytes free in queue
481 
482     ***************************************************************************/
483 
484     public size_t free_space ( )
485     {
486         return 0;
487     }
488 
489 
490     /***************************************************************************
491 
492         Returns:
493             the number of items in the queue
494 
495     ***************************************************************************/
496 
497     public size_t length ( )
498     {
499         return this.items_in_file + (this.slice_push_buffer.length > 0 ? 1 : 0);
500     }
501 
502 
503     /***************************************************************************
504 
505         Tells whether the queue is empty.
506 
507         Returns:
508             true if the queue is empty
509 
510     ***************************************************************************/
511 
512     public bool is_empty ( )
513     {
514         return this.items_in_file == 0 && this.slice_push_buffer.length == 0;
515     }
516 
517 
518     /***************************************************************************
519 
520         Deletes all items
521 
522     ***************************************************************************/
523 
524     public void clear ( )
525     {
526         this.items_in_file = this.bytes_in_file = 0;
527         this.slice_push_buffer.length = 0;
528         this.closeExternal();
529     }
530 
531 
532 
533     /***************************************************************************
534 
535         Pushes the buffered slice from a previous slice-push operation
536 
537     ***************************************************************************/
538 
539     private void handleSliceBuffer ( )
540     {
541         if (this.slice_push_buffer.length != 0)
542         {
543             if (!this.files_open)
544                 this.openExternal();
545 
546             this.filePush(this.slice_push_buffer[]);
547             this.slice_push_buffer.length = 0;
548         }
549     }
550 
551 
552     /***************************************************************************
553 
554         Pushes item into file. If the file queue is set to re-open then flush
555         the write buffer after each push so that the files and index do not get
556         out of sync.
557 
558         Params:
559             item = data to push
560 
561         Returns:
562             true when write was successful, else false
563 
564     ***************************************************************************/
565 
566     private bool filePush ( in void[] item )
567     {
568         verify(item.length <= this.size, "Pushed item will not fit read buffer");
569         verify(item.length > 0, "denied push of item of size zero");
570 
571         try
572         {
573             if (!this.files_open)
574                 this.openExternal();
575 
576             Header h = Header(item.length);
577 
578             this.ext_out.write((&h)[0 .. 1]);
579             this.ext_out.write(item);
580 
581             if (this.open_existing)
582             {
583                 this.ext_out.flush();
584             }
585 
586             this.bytes_in_file += Header.sizeof + item.length;
587             this.items_in_file += 1;
588 
589             this.writeIndex();
590 
591             return true;
592         }
593         catch (Exception e)
594         {
595             log.error("## ERROR: Exception happened while writing to disk: {}", e.message());
596             return false;
597         }
598     }
599 
600 
601     /***************************************************************************
602 
603         If the file queue is set to re-open, write the current index position
604         in the file to the index file.
605 
606     ***************************************************************************/
607 
608     private void writeIndex ( )
609     {
610         if (this.open_existing)
611         {
612             this.file_index.seek(0);
613             this.file_index.write(cast(void[])(&this.bytes_in_file)
614                 [0..this.bytes_in_file.sizeof]);
615             this.file_index.write(cast(void[])(&this.items_in_file)
616                 [0..this.items_in_file.sizeof]);
617         }
618     }
619 
620 
621     /***************************************************************************
622 
623         If the file queue is set to re-open, read the saved index in to memory.
624         Set the void array to the correct length to read the result, seek to
625         the start of the index file, then cast the results from void[] arrays
626         to size_t values.
627 
628     ***************************************************************************/
629 
630     private void readIndex ( )
631     {
632         if (this.open_existing)
633         {
634             void[] content;
635             content.length = this.bytes_in_file.sizeof +
636                 this.items_in_file.sizeof;
637 
638             this.file_index.seek(0);
639 
640             if (this.file_index.read(content)
641                 == (this.bytes_in_file.sizeof + this.items_in_file.sizeof))
642             {
643                 this.bytes_in_file =
644                     *cast(size_t*)content[0..this.bytes_in_file.sizeof].ptr;
645                 this.items_in_file =
646                     *cast(size_t*)content[this.bytes_in_file.sizeof..$].ptr;
647             }
648         }
649     }
650 
651 
652     /***************************************************************************
653 
654         Opens the files and associated buffers. Only open the index file if
655         this file queue is set to be able to reopen. Mark the files open.
656 
657     ***************************************************************************/
658 
659     private void openExternal ( )
660     {
661         this.file_out.open(this.path, File.WriteCreate);
662         this.file_in.open(this.path, File.ReadExisting);
663 
664         if (this.open_existing)
665         {
666             this.file_index.open(this.index_path, File.WriteCreate);
667         }
668 
669         this.files_open = true;
670     }
671 
672 
673     /***************************************************************************
674 
675         Closes the files and clear the related buffers. Mark the files closed.
676 
677     ***************************************************************************/
678 
679     private void closeExternal ( )
680     {
681         verify(this.ext_in.readable() == 0,
682                "Still unread data in input buffer");
683 
684         verify(this.bytes_in_file - this.ext_in.readable() == 0,
685                "Still bytes in the file");
686 
687         verify(this.items_in_file - this.ext_in.readable() == 0,
688                "Still items in the file");
689 
690         this.ext_in.clear();
691         this.ext_out.clear();
692         this.file_out.close();
693         this.file_in.close();
694 
695         Filesystem.remove(this.path);
696 
697         if (this.open_existing)
698         {
699             this.file_index.close();
700             Filesystem.remove(this.index_path);
701         }
702 
703         this.files_open = false;
704     }
705 }