1 /******************************************************************************* 2 3 Copyright: 4 Copyright (c) 2004 Kris Bell. 5 Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH. 6 All rights reserved. 7 8 License: 9 Tango Dual License: 3-Clause BSD License / Academic Free License v3.0. 10 See LICENSE_TANGO.txt for details. 11 12 Version: Mar 2004: Initial release 13 14 Authors: Kris 15 16 *******************************************************************************/ 17 18 module ocean.io.device.Conduit; 19 20 import ocean.core.ExceptionDefinitions; 21 import ocean.meta.types.Qualifiers; 22 public import ocean.io.model.IConduit; 23 24 import core.thread; 25 26 /******************************************************************************* 27 28 Conduit abstract base-class, implementing interface IConduit. 29 Only the conduit-specific read(), write(), detach() and 30 bufferSize() need to be implemented for a concrete conduit 31 implementation. See File for an example. 32 33 Conduits provide virtualized access to external content, and 34 represent things like files or Internet connections. Conduits 35 expose a pair of streams, are modelled by ocean.io.model.IConduit, 36 and are implemented via classes such as File & SocketConduit. 37 38 Additional kinds of conduit are easy to construct: either subclass 39 ocean.io.device.Conduit, or implement ocean.io.model.IConduit. A 40 conduit typically reads and writes via a Buffer in large chunks, 41 typically the entire buffer. Alternatively, one can invoke method 42 read(dst[]) and/or write(src[]) directly. 43 44 *******************************************************************************/ 45 46 class Conduit : IConduit 47 { 48 private uint duration = -1; // scheduling timeout 49 50 /*********************************************************************** 51 52 Clean up when collected. See method detach(). 53 54 ***********************************************************************/ 55 56 ~this () 57 { 58 detach; 59 } 60 61 /*********************************************************************** 62 63 Return the name of this conduit. 64 65 ***********************************************************************/ 66 67 abstract override istring toString (); 68 69 /*********************************************************************** 70 71 Return a preferred size for buffering conduit I/O. 72 73 ***********************************************************************/ 74 75 abstract size_t bufferSize (); 76 77 /*********************************************************************** 78 79 Read from conduit into a target array. The provided dst 80 will be populated with content from the conduit. 81 82 Returns the number of bytes read, which may be less than 83 requested in dst. Eof is returned whenever an end-of-flow 84 condition arises. 85 86 ***********************************************************************/ 87 88 abstract size_t read (void[] dst); 89 90 /*********************************************************************** 91 92 Write to conduit from a source array. The provided src 93 content will be written to the conduit. 94 95 Returns the number of bytes written from src, which may 96 be less than the quantity provided. Eof is returned when 97 an end-of-flow condition arises. 98 99 ***********************************************************************/ 100 101 abstract size_t write (const(void)[] src); 102 103 /*********************************************************************** 104 105 Disconnect this conduit. Note that this may be invoked 106 both explicitly by the user, and implicitly by the GC. 107 Be sure to manage multiple detachment requests correctly: 108 set a flag, or sentinel value as necessary. 109 110 ***********************************************************************/ 111 112 abstract void detach (); 113 114 /*********************************************************************** 115 116 Set the active timeout period for IO calls (in milliseconds.) 117 118 ***********************************************************************/ 119 120 final void timeout (uint millisec) 121 { 122 duration = millisec; 123 } 124 125 /*********************************************************************** 126 127 Get the active timeout period for IO calls (in milliseconds.) 128 129 ***********************************************************************/ 130 131 final uint timeout () 132 { 133 return duration; 134 } 135 136 /*********************************************************************** 137 138 Is the conduit alive? Default behaviour returns true. 139 140 ***********************************************************************/ 141 142 bool isAlive () 143 { 144 return true; 145 } 146 147 /*********************************************************************** 148 149 Return the host. This is part of the Stream interface. 150 151 ***********************************************************************/ 152 153 final IConduit conduit () 154 { 155 return this; 156 } 157 158 /*********************************************************************** 159 160 Emit buffered output or reset buffered input. 161 162 ***********************************************************************/ 163 164 IOStream flush () 165 { 166 return this; 167 } 168 169 /*********************************************************************** 170 171 Close this conduit. 172 173 Both input and output are detached, and are no longer usable. 174 175 ***********************************************************************/ 176 177 void close () 178 { 179 this.detach; 180 } 181 182 /*********************************************************************** 183 184 Throw an IOException, with the provided message. 185 186 ***********************************************************************/ 187 188 void error (istring msg) 189 { 190 throw new IOException (msg); 191 } 192 193 /******************************************************************* 194 195 Throw an IOException, with the provided message, function name 196 and error code. 197 198 Params: 199 error_num = error code 200 func_name = name of the method that failed 201 msg = message description of the error 202 file = file where exception is thrown 203 line = line where exception is thrown 204 205 *******************************************************************/ 206 207 public void error ( int error_code, istring func_name, 208 istring msg = "", istring file = __FILE__, long line = __LINE__ ) 209 { 210 throw new IOException (msg, error_code, func_name, file, line); 211 } 212 213 /*********************************************************************** 214 215 Return the input stream. 216 217 ***********************************************************************/ 218 219 final InputStream input () 220 { 221 return this; 222 } 223 224 /*********************************************************************** 225 226 Return the output stream. 227 228 ***********************************************************************/ 229 230 final OutputStream output () 231 { 232 return this; 233 } 234 235 /*********************************************************************** 236 237 Emit fixed-length content from 'src' into this conduit, 238 throwing an IOException upon Eof. 239 240 ***********************************************************************/ 241 242 final Conduit put (void[] src) 243 { 244 put (src, this); 245 return this; 246 } 247 248 /*********************************************************************** 249 250 Consume fixed-length content into 'dst' from this conduit, 251 throwing an IOException upon Eof. 252 253 ***********************************************************************/ 254 255 final Conduit get (void[] dst) 256 { 257 get (dst, this); 258 return this; 259 } 260 261 /*********************************************************************** 262 263 Rewind to beginning of file. 264 265 ***********************************************************************/ 266 267 final Conduit rewind () 268 { 269 seek (0); 270 return this; 271 } 272 273 /*********************************************************************** 274 275 Transfer the content of another conduit to this one. Returns 276 the dst OutputStream, or throws IOException on failure. 277 278 ***********************************************************************/ 279 280 OutputStream copy (InputStream src, size_t max = -1) 281 { 282 transfer (src, this, max); 283 return this; 284 } 285 286 /*********************************************************************** 287 288 Seek on this stream. Source conduits that don't support 289 seeking will throw an IOException. 290 291 ***********************************************************************/ 292 293 long seek (long offset, Anchor anchor = Anchor.Begin) 294 { 295 error (this.toString ~ " does not support seek requests"); 296 return 0; 297 } 298 299 /*********************************************************************** 300 301 Load text from a stream, and return them all in an 302 array. 303 304 Returns an array representing the content, and throws 305 IOException on error. 306 307 ***********************************************************************/ 308 309 char[] text(T=char) (size_t max = -1) 310 { 311 return cast(T[]) load (max); 312 } 313 314 /*********************************************************************** 315 316 Load the bits from a stream, and return them all in an 317 array. The dst array can be provided as an option, which 318 will be expanded as necessary to consume the input. 319 320 Returns an array representing the content, and throws 321 IOException on error. 322 323 ***********************************************************************/ 324 325 void[] load (size_t max = -1) 326 { 327 return load (this, max); 328 } 329 330 /*********************************************************************** 331 332 Load the bits from a stream, and return them all in an 333 array. The dst array can be provided as an option, which 334 will be expanded as necessary to consume input. 335 336 Returns an array representing the content, and throws 337 IOException on error. 338 339 ***********************************************************************/ 340 341 static void[] load (InputStream src, size_t max=-1) 342 { 343 void[] dst; 344 size_t i, 345 len, 346 chunk; 347 348 if (max != -1) 349 chunk = max; 350 else 351 chunk = src.conduit.bufferSize; 352 353 while (len < max) 354 { 355 if (dst.length - len is 0) 356 dst.length = len + chunk; 357 358 if ((i = src.read (dst[len .. $])) is Eof) 359 break; 360 len += i; 361 } 362 363 return dst [0 .. len]; 364 } 365 366 /*********************************************************************** 367 368 Emit fixed-length content from 'src' into 'output', 369 throwing an IOException upon Eof. 370 371 ***********************************************************************/ 372 373 static void put (void[] src, OutputStream output) 374 { 375 while (src.length) 376 { 377 auto i = output.write (src); 378 if (i is Eof) 379 output.conduit.error ("Conduit.put :: eof while writing"); 380 src = src [i..$]; 381 } 382 } 383 384 /*********************************************************************** 385 386 Consume fixed-length content into 'dst' from 'input', 387 throwing an IOException upon Eof. 388 389 ***********************************************************************/ 390 391 static void get (void[] dst, InputStream input) 392 { 393 while (dst.length) 394 { 395 auto i = input.read (dst); 396 if (i is Eof) 397 input.conduit.error ("Conduit.get :: eof while reading"); 398 dst = dst [i..$]; 399 } 400 } 401 402 /*********************************************************************** 403 404 Low-level data transfer, where max represents the maximum 405 number of bytes to transfer. 406 407 Returns Eof on failure, number of bytes copied on success. 408 409 ***********************************************************************/ 410 411 static size_t transfer (InputStream src, OutputStream dst, size_t max=-1) 412 { 413 byte[8192] tmp; 414 size_t done; 415 416 while (max) 417 { 418 auto len = max; 419 if (len > tmp.length) 420 len = tmp.length; 421 422 if ((len = src.read(tmp[0 .. len])) is Eof) 423 max = 0; 424 else 425 { 426 max -= len; 427 done += len; 428 auto p = tmp.ptr; 429 for (size_t j=0; len > 0; len -= j, p += j) 430 if ((j = dst.write (p[0 .. len])) is Eof) 431 return Eof; 432 } 433 } 434 435 return done; 436 } 437 } 438 439 440 /******************************************************************************* 441 442 Base class for input stream filtering. The provided source stream 443 should generally never be null, though some filters have a need to 444 set this lazily. 445 446 *******************************************************************************/ 447 448 class InputFilter : InputStream 449 { 450 protected InputStream source; 451 452 /*********************************************************************** 453 454 Attach to the provided stream. The provided source stream 455 should generally never be null, though some filters have a 456 need to set this lazily. 457 458 ***********************************************************************/ 459 460 this (InputStream source) 461 { 462 this.source = source; 463 } 464 465 /*********************************************************************** 466 467 Return the hosting conduit. 468 469 ***********************************************************************/ 470 471 IConduit conduit () 472 { 473 return source.conduit; 474 } 475 476 /*********************************************************************** 477 478 Read from conduit into a target array. The provided dst 479 will be populated with content from the conduit. 480 481 Returns the number of bytes read, which may be less than 482 requested in dst. Eof is returned whenever an end-of-flow 483 condition arises. 484 485 ***********************************************************************/ 486 487 size_t read (void[] dst) 488 { 489 return source.read (dst); 490 } 491 492 /*********************************************************************** 493 494 Load the bits from a stream, and return them all in an 495 array. The dst array can be provided as an option, which 496 will be expanded as necessary to consume the input. 497 498 Returns an array representing the content, and throws 499 IOException on error. 500 501 ***********************************************************************/ 502 503 void[] load (size_t max = -1) 504 { 505 return Conduit.load (this, max); 506 } 507 508 /*********************************************************************** 509 510 Clear any buffered content. 511 512 ***********************************************************************/ 513 514 IOStream flush () 515 { 516 source.flush; 517 return this; 518 } 519 520 /*********************************************************************** 521 522 Seek on this stream. Target conduits that don't support 523 seeking will throw an IOException. 524 525 ***********************************************************************/ 526 527 long seek (long offset, Anchor anchor = Anchor.Begin) 528 { 529 return source.seek (offset, anchor); 530 } 531 532 /*********************************************************************** 533 534 Return the upstream host of this filter. 535 536 ***********************************************************************/ 537 538 InputStream input () 539 { 540 return source; 541 } 542 543 /*********************************************************************** 544 545 Close the input. 546 547 ***********************************************************************/ 548 549 void close () 550 { 551 source.close; 552 } 553 } 554 555 556 /******************************************************************************* 557 558 Base class for output stream filtering. The provided sink stream 559 should generally never be null, though some filters have a need to 560 set this lazily. 561 562 *******************************************************************************/ 563 564 class OutputFilter : OutputStream 565 { 566 protected OutputStream sink; 567 568 /*********************************************************************** 569 570 Attach to the provided stream. 571 572 ***********************************************************************/ 573 574 this (OutputStream sink) 575 { 576 this.sink = sink; 577 } 578 579 /*********************************************************************** 580 581 Return the hosting conduit. 582 583 ***********************************************************************/ 584 585 IConduit conduit () 586 { 587 return sink.conduit; 588 } 589 590 /*********************************************************************** 591 592 Write to conduit from a source array. The provided src 593 content will be written to the conduit. 594 595 Returns the number of bytes written from src, which may 596 be less than the quantity provided. Eof is returned when 597 an end-of-flow condition arises. 598 599 ***********************************************************************/ 600 601 size_t write (const(void)[] src) 602 { 603 return sink.write (src); 604 } 605 606 /*********************************************************************** 607 608 Transfer the content of another conduit to this one. Returns 609 a reference to this class, or throws IOException on failure. 610 611 ***********************************************************************/ 612 613 OutputStream copy (InputStream src, size_t max = -1) 614 { 615 Conduit.transfer (src, this, max); 616 return this; 617 } 618 619 /*********************************************************************** 620 621 Emit/purge buffered content. 622 623 ***********************************************************************/ 624 625 IOStream flush () 626 { 627 sink.flush; 628 return this; 629 } 630 631 /*********************************************************************** 632 633 Seek on this stream. Target conduits that don't support 634 seeking will throw an IOException. 635 636 ***********************************************************************/ 637 638 long seek (long offset, Anchor anchor = Anchor.Begin) 639 { 640 return sink.seek (offset, anchor); 641 } 642 643 /*********************************************************************** 644 645 Return the upstream host of this filter. 646 647 ***********************************************************************/ 648 649 OutputStream output () 650 { 651 return sink; 652 } 653 654 /*********************************************************************** 655 656 Close the output. 657 658 ***********************************************************************/ 659 660 void close () 661 { 662 sink.close; 663 } 664 }