1 /******************************************************************************* 2 3 File-based queue implementation. 4 5 The flexible file queue can be set to either open any existing files it 6 finds, or always delete existing files when it is created using the 7 open_existing parameter in the contructor. 8 9 Note that the queue file is deleted in the following cases: 10 1. Upon calling the clear() method. 11 2. Upon calling pop() on an empty queue. 12 13 Copyright: 14 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 15 All rights reserved. 16 17 License: 18 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 19 Alternatively, this file may be distributed under the terms of the Tango 20 3-Clause BSD License (see LICENSE_BSD.txt for details). 21 22 *******************************************************************************/ 23 24 module ocean.util.container.queue.FlexibleFileQueue; 25 26 27 import ocean.meta.types.Qualifiers; 28 import ocean.core.Buffer; 29 import ocean.util.container.queue.FlexibleRingQueue; 30 import ocean.util.container.queue.model.IByteQueue; 31 import ocean.util.container.queue.model.IQueueInfo; 32 import ocean.util.log.Logger; 33 import ocean.io.device.File; 34 import Filesystem = ocean.io.Path; 35 import ocean.io.stream.Buffered; 36 37 38 private Logger log; 39 static this ( ) 40 { 41 log = Log.lookup("ocean.util.container.queue.FlexibleFileQueue"); 42 } 43 44 45 46 public class FlexibleFileQueue : IByteQueue 47 { 48 import ocean.core.Verify; 49 50 /*************************************************************************** 51 52 Header for queue items 53 54 ***************************************************************************/ 55 56 private struct Header 57 { 58 size_t length; 59 60 static Header* fromSlice ( void[] slice ) 61 { 62 return cast(Header*) slice.ptr; 63 } 64 } 65 66 /*************************************************************************** 67 68 Buffer used to support const(void)[] push ( size_t ) 69 70 ***************************************************************************/ 71 72 private Buffer!(void) slice_push_buffer; 73 74 /*************************************************************************** 75 76 Path to write to 77 78 ***************************************************************************/ 79 80 private mstring path; 81 82 /*************************************************************************** 83 84 Extension of the index file (appended to `path`) 85 86 ***************************************************************************/ 87 88 private static immutable istring IndexExtension = ".index"; 89 90 /*************************************************************************** 91 92 Path to the index file to write to if we want to be able to reopen 93 any saved data files 94 95 ***************************************************************************/ 96 97 private mstring index_path; 98 99 /*************************************************************************** 100 101 External file that is being written to 102 103 ***************************************************************************/ 104 105 private File file_out; 106 107 /*************************************************************************** 108 109 External file that is being read from 110 111 ***************************************************************************/ 112 113 private File file_in; 114 115 /*************************************************************************** 116 117 External index file that is being read from 118 119 ***************************************************************************/ 120 121 private File file_index; 122 123 /*************************************************************************** 124 125 Buffered output stream to write to the file 126 127 ***************************************************************************/ 128 129 private BufferedOutput ext_out; 130 131 /*************************************************************************** 132 133 Buffered input stream to write to the file 134 135 ***************************************************************************/ 136 137 private BufferedInput ext_in; 138 139 /*************************************************************************** 140 141 Unread bytes in the file 142 143 ***************************************************************************/ 144 145 private size_t bytes_in_file; 146 147 /*************************************************************************** 148 149 Unread items in the file 150 151 ***************************************************************************/ 152 153 private size_t items_in_file; 154 155 /*************************************************************************** 156 157 Size of the file read buffer. It is not possible to push items which are 158 larger than this buffer size. 159 160 ***************************************************************************/ 161 162 private size_t size; 163 164 /*************************************************************************** 165 166 bool set if the files are currently open 167 168 ***************************************************************************/ 169 170 private bool files_open; 171 172 /*************************************************************************** 173 174 bool set if we want to reopen any saved data files if we restart the 175 application. 176 177 ***************************************************************************/ 178 179 private bool open_existing; 180 181 /*************************************************************************** 182 183 buffer used to read in the index file 184 185 ***************************************************************************/ 186 187 private void[] content; 188 189 /*************************************************************************** 190 191 Constructor. Creates and opens the files and buffered inputs and 192 outputs. Moves the file pointers to the correct position in the files 193 and marks the files as open. 194 195 Params: 196 path = path to the file that will be used to swap the queue 197 size = size of file read buffer (== maximum item size) 198 open_existing = do we reopen any existing file queues 199 200 ***************************************************************************/ 201 202 public this ( cstring path, size_t size, bool open_existing = false ) 203 { 204 this.path = path.dup; 205 this.index_path = path.dup ~ IndexExtension; 206 this.size = size; 207 this.open_existing = open_existing; 208 209 if (this.open_existing && this.exists(this.path)) 210 { 211 this.file_out = new File(this.path, File.WriteAppending); 212 this.file_index = new File(this.index_path, File.ReadWriteExisting); 213 this.readIndex(); 214 } 215 else 216 { 217 this.file_out = new File(this.path, File.WriteCreate); 218 if (this.open_existing) 219 { 220 this.file_index = new File(this.index_path, File.ReadWriteCreate); 221 } 222 } 223 224 this.file_in = new File(this.path, File.ReadExisting); 225 226 this.ext_out = new BufferedOutput(this.file_out); 227 this.ext_out.seek(this.file_out.length); 228 this.ext_in = new BufferedInput(this.file_in, this.size+Header.sizeof); 229 this.ext_in.seek(this.file_out.length - this.bytes_in_file); 230 231 this.files_open = true; 232 } 233 234 235 /*************************************************************************** 236 237 Checks whether a file and the associated index file exist at the 238 specified path. 239 240 Note that this function allocates on every call! It is only intended to 241 be called during application startup. 242 243 Params: 244 path = path to check (".index" is appended to check for the 245 corresponding index file) 246 247 Returns: 248 true if the specified file and index file exist 249 250 ***************************************************************************/ 251 252 public static bool exists ( cstring path ) 253 { 254 return Filesystem.exists(path) 255 && Filesystem.exists(path ~ IndexExtension); 256 } 257 258 259 /*************************************************************************** 260 261 Pushes an item into the queue. 262 263 Params: 264 item = data item to push 265 266 Returns: 267 true if the item was pushed successfully, false if it didn't fit 268 269 ***************************************************************************/ 270 271 public bool push ( in void[] item ) 272 { 273 verify ( item.length <= this.size, 274 "Read buffer too small to process this item"); 275 276 this.handleSliceBuffer(); 277 278 if (item.length == 0) return false; 279 280 return this.filePush(item); 281 } 282 283 284 /*************************************************************************** 285 286 Reserves space for an item of <size> bytes on the queue but doesn't 287 fill the content. The caller is expected to fill in the content using 288 the returned slice. 289 290 Params: 291 size = size of the space of the item that should be reserved 292 293 Returns: 294 slice to the reserved space if it was successfully reserved, 295 else null 296 297 ***************************************************************************/ 298 299 public void[] push ( size_t size ) 300 { 301 this.handleSliceBuffer(); 302 303 this.slice_push_buffer.length = size; 304 305 return this.slice_push_buffer[]; 306 } 307 308 309 /*************************************************************************** 310 311 Reads an item from the queue. 312 313 Params: 314 eat = whether to remove the item from the queue 315 316 Returns: 317 item read from queue, may be null if queue is empty 318 319 ***************************************************************************/ 320 321 private void[] getItem ( bool eat = true ) 322 { 323 this.handleSliceBuffer(); 324 325 if (this.bytes_in_file == 0 && this.files_open) 326 { 327 this.closeExternal(); 328 return null; 329 } 330 331 if (this.bytes_in_file > 0) try 332 { 333 try this.ext_out.flush(); 334 catch (Exception e) 335 { 336 log.error("## ERROR: Can't flush file buffer: {}", e.message()); 337 return null; 338 } 339 340 Header h; 341 342 if (this.ext_in.readable() <= Header.sizeof && this.fill() == 0) 343 { 344 return null; 345 } 346 347 h = *Header.fromSlice(this.ext_in.slice(Header.sizeof, false)); 348 349 verify (h.length <= this.size, "Unrealistic size"); 350 351 if (h.length + Header.sizeof > this.ext_in.readable() && 352 this.fill() == 0) 353 { 354 return null; 355 } 356 357 if (eat) 358 { 359 this.items_in_file -= 1; 360 this.bytes_in_file -= Header.sizeof + h.length; 361 362 this.writeIndex(); 363 } 364 365 return this.ext_in.slice(Header.sizeof + h.length, 366 eat)[Header.sizeof .. $]; 367 } 368 catch (Exception e) 369 { 370 log.error("## ERROR: Failsafe catch triggered by exception: {} ({}:{})", 371 e.message(), e.file, e.line); 372 } 373 374 return null; 375 } 376 377 378 /*************************************************************************** 379 380 Popps the next element 381 382 Returns: 383 item read from queue, may be null if queue is empty 384 385 ***************************************************************************/ 386 387 public void[] pop ( ) 388 { 389 return this.getItem(); 390 } 391 392 393 /*************************************************************************** 394 395 Returns the element that would be popped next, without poppin' it 396 397 ***************************************************************************/ 398 399 public void[] peek ( ) 400 { 401 return this.getItem(false); 402 } 403 404 405 /*************************************************************************** 406 407 Fills the read buffer 408 409 Returns: 410 How many new bytes were read from the file 411 412 ***************************************************************************/ 413 414 private size_t fill ( ) 415 { 416 this.ext_in.compress(); 417 418 auto bytes = this.ext_in.populate(); 419 auto readable = this.ext_in.readable; 420 421 if ((bytes == 0 || bytes == File.Eof) && readable == 0) 422 { 423 this.closeExternal(); 424 return 0; 425 } 426 427 return bytes; 428 } 429 430 431 /*************************************************************************** 432 433 Finds out whether the provided number of bytes will fit in the queue. 434 435 Due to the file swap, we have unlimited space, so always return true. 436 437 Params: 438 bytes = size of item to check 439 440 Returns: 441 always true 442 443 ***************************************************************************/ 444 445 public bool willFit ( size_t bytes ) 446 { 447 return true; 448 } 449 450 451 /*************************************************************************** 452 453 Returns: 454 total number of bytes used by queue (used space + free space) 455 456 ***************************************************************************/ 457 458 public ulong total_space ( ) 459 { 460 return 0; 461 } 462 463 464 /*************************************************************************** 465 466 Returns: 467 number of bytes stored in queue 468 469 ***************************************************************************/ 470 471 public ulong used_space ( ) 472 { 473 return this.bytes_in_file + this.slice_push_buffer.length; 474 } 475 476 477 /*************************************************************************** 478 479 Returns: 480 number of bytes free in queue 481 482 ***************************************************************************/ 483 484 public size_t free_space ( ) 485 { 486 return 0; 487 } 488 489 490 /*************************************************************************** 491 492 Returns: 493 the number of items in the queue 494 495 ***************************************************************************/ 496 497 public size_t length ( ) 498 { 499 return this.items_in_file + (this.slice_push_buffer.length > 0 ? 1 : 0); 500 } 501 502 503 /*************************************************************************** 504 505 Tells whether the queue is empty. 506 507 Returns: 508 true if the queue is empty 509 510 ***************************************************************************/ 511 512 public bool is_empty ( ) 513 { 514 return this.items_in_file == 0 && this.slice_push_buffer.length == 0; 515 } 516 517 518 /*************************************************************************** 519 520 Deletes all items 521 522 ***************************************************************************/ 523 524 public void clear ( ) 525 { 526 this.items_in_file = this.bytes_in_file = 0; 527 this.slice_push_buffer.length = 0; 528 this.closeExternal(); 529 } 530 531 532 533 /*************************************************************************** 534 535 Pushes the buffered slice from a previous slice-push operation 536 537 ***************************************************************************/ 538 539 private void handleSliceBuffer ( ) 540 { 541 if (this.slice_push_buffer.length != 0) 542 { 543 if (!this.files_open) 544 this.openExternal(); 545 546 this.filePush(this.slice_push_buffer[]); 547 this.slice_push_buffer.length = 0; 548 } 549 } 550 551 552 /*************************************************************************** 553 554 Pushes item into file. If the file queue is set to re-open then flush 555 the write buffer after each push so that the files and index do not get 556 out of sync. 557 558 Params: 559 item = data to push 560 561 Returns: 562 true when write was successful, else false 563 564 ***************************************************************************/ 565 566 private bool filePush ( in void[] item ) 567 { 568 verify(item.length <= this.size, "Pushed item will not fit read buffer"); 569 verify(item.length > 0, "denied push of item of size zero"); 570 571 try 572 { 573 if (!this.files_open) 574 this.openExternal(); 575 576 Header h = Header(item.length); 577 578 this.ext_out.write((&h)[0 .. 1]); 579 this.ext_out.write(item); 580 581 if (this.open_existing) 582 { 583 this.ext_out.flush(); 584 } 585 586 this.bytes_in_file += Header.sizeof + item.length; 587 this.items_in_file += 1; 588 589 this.writeIndex(); 590 591 return true; 592 } 593 catch (Exception e) 594 { 595 log.error("## ERROR: Exception happened while writing to disk: {}", e.message()); 596 return false; 597 } 598 } 599 600 601 /*************************************************************************** 602 603 If the file queue is set to re-open, write the current index position 604 in the file to the index file. 605 606 ***************************************************************************/ 607 608 private void writeIndex ( ) 609 { 610 if (this.open_existing) 611 { 612 this.file_index.seek(0); 613 this.file_index.write(cast(void[])(&this.bytes_in_file) 614 [0..this.bytes_in_file.sizeof]); 615 this.file_index.write(cast(void[])(&this.items_in_file) 616 [0..this.items_in_file.sizeof]); 617 } 618 } 619 620 621 /*************************************************************************** 622 623 If the file queue is set to re-open, read the saved index in to memory. 624 Set the void array to the correct length to read the result, seek to 625 the start of the index file, then cast the results from void[] arrays 626 to size_t values. 627 628 ***************************************************************************/ 629 630 private void readIndex ( ) 631 { 632 if (this.open_existing) 633 { 634 void[] content; 635 content.length = this.bytes_in_file.sizeof + 636 this.items_in_file.sizeof; 637 638 this.file_index.seek(0); 639 640 if (this.file_index.read(content) 641 == (this.bytes_in_file.sizeof + this.items_in_file.sizeof)) 642 { 643 this.bytes_in_file = 644 *cast(size_t*)content[0..this.bytes_in_file.sizeof].ptr; 645 this.items_in_file = 646 *cast(size_t*)content[this.bytes_in_file.sizeof..$].ptr; 647 } 648 } 649 } 650 651 652 /*************************************************************************** 653 654 Opens the files and associated buffers. Only open the index file if 655 this file queue is set to be able to reopen. Mark the files open. 656 657 ***************************************************************************/ 658 659 private void openExternal ( ) 660 { 661 this.file_out.open(this.path, File.WriteCreate); 662 this.file_in.open(this.path, File.ReadExisting); 663 664 if (this.open_existing) 665 { 666 this.file_index.open(this.index_path, File.WriteCreate); 667 } 668 669 this.files_open = true; 670 } 671 672 673 /*************************************************************************** 674 675 Closes the files and clear the related buffers. Mark the files closed. 676 677 ***************************************************************************/ 678 679 private void closeExternal ( ) 680 { 681 verify(this.ext_in.readable() == 0, 682 "Still unread data in input buffer"); 683 684 verify(this.bytes_in_file - this.ext_in.readable() == 0, 685 "Still bytes in the file"); 686 687 verify(this.items_in_file - this.ext_in.readable() == 0, 688 "Still items in the file"); 689 690 this.ext_in.clear(); 691 this.ext_out.clear(); 692 this.file_out.close(); 693 this.file_in.close(); 694 695 Filesystem.remove(this.path); 696 697 if (this.open_existing) 698 { 699 this.file_index.close(); 700 Filesystem.remove(this.index_path); 701 } 702 703 this.files_open = false; 704 } 705 }