1 /*******************************************************************************
2 
3         Copyright:
4             Copyright (c) 2004 Kris Bell.
5             Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH.
6             All rights reserved.
7 
8         License:
9             Tango Dual License: 3-Clause BSD License / Academic Free License v3.0.
10             See LICENSE_TANGO.txt for details.
11 
12         Version: Mar 2004: Initial release
13 
14         Authors: Kris
15 
16 *******************************************************************************/
17 
18 module ocean.io.device.Conduit;
19 
20 import ocean.core.ExceptionDefinitions;
21 import ocean.meta.types.Qualifiers;
22 public import ocean.io.model.IConduit;
23 
24 import core.thread;
25 
26 /*******************************************************************************
27 
28         Conduit abstract base-class, implementing interface IConduit.
29         Only the conduit-specific read(), write(), detach() and
30         bufferSize() need to be implemented for a concrete conduit
31         implementation. See File for an example.
32 
33         Conduits provide virtualized access to external content, and
34         represent things like files or Internet connections. Conduits
35         expose a pair of streams, are modelled by ocean.io.model.IConduit,
36         and are implemented via classes such as File & SocketConduit.
37 
38         Additional kinds of conduit are easy to construct: either subclass
39         ocean.io.device.Conduit, or implement ocean.io.model.IConduit. A
40         conduit typically reads and writes via a Buffer in large chunks,
41         typically the entire buffer. Alternatively, one can invoke method
42         read(dst[]) and/or write(src[]) directly.
43 
44 *******************************************************************************/
45 
46 class Conduit : IConduit
47 {
48         private   uint            duration = -1;        // scheduling timeout
49 
50         /***********************************************************************
51 
52                 Clean up when collected. See method detach().
53 
54         ***********************************************************************/
55 
56         ~this ()
57         {
58                 detach;
59         }
60 
61         /***********************************************************************
62 
63                 Return the name of this conduit.
64 
65         ***********************************************************************/
66 
67         abstract override istring toString ();
68 
69         /***********************************************************************
70 
71                 Return a preferred size for buffering conduit I/O.
72 
73         ***********************************************************************/
74 
75         abstract size_t bufferSize ();
76 
77         /***********************************************************************
78 
79                 Read from conduit into a target array. The provided dst
80                 will be populated with content from the conduit.
81 
82                 Returns the number of bytes read, which may be less than
83                 requested in dst. Eof is returned whenever an end-of-flow
84                 condition arises.
85 
86         ***********************************************************************/
87 
88         abstract size_t read (void[] dst);
89 
90         /***********************************************************************
91 
92                 Write to conduit from a source array. The provided src
93                 content will be written to the conduit.
94 
95                 Returns the number of bytes written from src, which may
96                 be less than the quantity provided. Eof is returned when
97                 an end-of-flow condition arises.
98 
99         ***********************************************************************/
100 
101         abstract size_t write (const(void)[] src);
102 
103         /***********************************************************************
104 
105                 Disconnect this conduit. Note that this may be invoked
106                 both explicitly by the user, and implicitly by the GC.
107                 Be sure to manage multiple detachment requests correctly:
108                 set a flag, or sentinel value as necessary.
109 
110         ***********************************************************************/
111 
112         abstract void detach ();
113 
114         /***********************************************************************
115 
116                 Set the active timeout period for IO calls (in milliseconds.)
117 
118         ***********************************************************************/
119 
120         final void timeout (uint millisec)
121         {
122                 duration = millisec;
123         }
124 
125         /***********************************************************************
126 
127                 Get the active timeout period for IO calls (in milliseconds.)
128 
129         ***********************************************************************/
130 
131         final uint timeout ()
132         {
133                 return duration;
134         }
135 
136         /***********************************************************************
137 
138                 Is the conduit alive? Default behaviour returns true.
139 
140         ***********************************************************************/
141 
142         bool isAlive ()
143         {
144                 return true;
145         }
146 
147         /***********************************************************************
148 
149                 Return the host. This is part of the Stream interface.
150 
151         ***********************************************************************/
152 
153         final IConduit conduit ()
154         {
155                 return this;
156         }
157 
158         /***********************************************************************
159 
160                 Emit buffered output or reset buffered input.
161 
162         ***********************************************************************/
163 
164         IOStream flush ()
165         {
166                 return this;
167         }
168 
169         /***********************************************************************
170 
171                 Close this conduit.
172 
173                 Both input and output are detached, and are no longer usable.
174 
175         ***********************************************************************/
176 
177         void close ()
178         {
179                 this.detach;
180         }
181 
182         /***********************************************************************
183 
184                 Throw an IOException, with the provided message.
185 
186         ***********************************************************************/
187 
188         void error (istring msg)
189         {
190                 throw new IOException (msg);
191         }
192 
193         /*******************************************************************
194 
195             Throw an IOException, with the provided message, function name
196             and error code.
197 
198             Params:
199                 error_num = error code
200                 func_name = name of the method that failed
201                 msg = message description of the error
202                 file = file where exception is thrown
203                 line = line where exception is thrown
204 
205         *******************************************************************/
206 
207         public void error ( int error_code, istring func_name,
208                 istring msg = "", istring file = __FILE__, long line = __LINE__ )
209         {
210             throw new IOException (msg, error_code, func_name, file, line);
211         }
212 
213         /***********************************************************************
214 
215                 Return the input stream.
216 
217         ***********************************************************************/
218 
219         final InputStream input ()
220         {
221                 return this;
222         }
223 
224         /***********************************************************************
225 
226                 Return the output stream.
227 
228         ***********************************************************************/
229 
230         final OutputStream output ()
231         {
232                 return this;
233         }
234 
235         /***********************************************************************
236 
237                 Emit fixed-length content from 'src' into this conduit,
238                 throwing an IOException upon Eof.
239 
240         ***********************************************************************/
241 
242         final Conduit put (void[] src)
243         {
244                 put (src, this);
245                 return this;
246         }
247 
248         /***********************************************************************
249 
250                 Consume fixed-length content into 'dst' from this conduit,
251                 throwing an IOException upon Eof.
252 
253         ***********************************************************************/
254 
255         final Conduit get (void[] dst)
256         {
257                get (dst, this);
258                return this;
259         }
260 
261         /***********************************************************************
262 
263                 Rewind to beginning of file.
264 
265         ***********************************************************************/
266 
267         final Conduit rewind ()
268         {
269                 seek (0);
270                 return this;
271         }
272 
273         /***********************************************************************
274 
275                 Transfer the content of another conduit to this one. Returns
276                 the dst OutputStream, or throws IOException on failure.
277 
278         ***********************************************************************/
279 
280         OutputStream copy (InputStream src, size_t max = -1)
281         {
282                 transfer (src, this, max);
283                 return this;
284         }
285 
286         /***********************************************************************
287 
288                 Seek on this stream. Source conduits that don't support
289                 seeking will throw an IOException.
290 
291         ***********************************************************************/
292 
293         long seek (long offset, Anchor anchor = Anchor.Begin)
294         {
295                 error (this.toString ~ " does not support seek requests");
296                 return 0;
297         }
298 
299         /***********************************************************************
300 
301                 Load text from a stream, and return them all in an
302                 array.
303 
304                 Returns an array representing the content, and throws
305                 IOException on error.
306 
307         ***********************************************************************/
308 
309         char[] text(T=char) (size_t max = -1)
310         {
311                 return cast(T[]) load (max);
312         }
313 
314         /***********************************************************************
315 
316                 Load the bits from a stream, and return them all in an
317                 array. The dst array can be provided as an option, which
318                 will be expanded as necessary to consume the input.
319 
320                 Returns an array representing the content, and throws
321                 IOException on error.
322 
323         ***********************************************************************/
324 
325         void[] load (size_t max = -1)
326         {
327                 return load (this, max);
328         }
329 
330         /***********************************************************************
331 
332                 Load the bits from a stream, and return them all in an
333                 array. The dst array can be provided as an option, which
334                 will be expanded as necessary to consume input.
335 
336                 Returns an array representing the content, and throws
337                 IOException on error.
338 
339         ***********************************************************************/
340 
341         static void[] load (InputStream src, size_t max=-1)
342         {
343                 void[]  dst;
344                 size_t  i,
345                         len,
346                         chunk;
347 
348                 if (max != -1)
349                     chunk = max;
350                 else
351                    chunk = src.conduit.bufferSize;
352 
353                 while (len < max)
354                       {
355                       if (dst.length - len is 0)
356                           dst.length = len + chunk;
357 
358                       if ((i = src.read (dst[len .. $])) is Eof)
359                            break;
360                       len += i;
361                       }
362 
363                 return dst [0 .. len];
364         }
365 
366         /***********************************************************************
367 
368                 Emit fixed-length content from 'src' into 'output',
369                 throwing an IOException upon Eof.
370 
371         ***********************************************************************/
372 
373         static void put (void[] src, OutputStream output)
374         {
375                 while (src.length)
376                       {
377                       auto i = output.write (src);
378                       if (i is Eof)
379                           output.conduit.error ("Conduit.put :: eof while writing");
380                       src = src [i..$];
381                       }
382         }
383 
384         /***********************************************************************
385 
386                 Consume fixed-length content into 'dst' from 'input',
387                 throwing an IOException upon Eof.
388 
389         ***********************************************************************/
390 
391         static void get (void[] dst, InputStream input)
392         {
393                 while (dst.length)
394                       {
395                       auto i = input.read (dst);
396                       if (i is Eof)
397                           input.conduit.error ("Conduit.get :: eof while reading");
398                       dst = dst [i..$];
399                       }
400         }
401 
402         /***********************************************************************
403 
404                 Low-level data transfer, where max represents the maximum
405                 number of bytes to transfer.
406 
407                 Returns Eof on failure, number of bytes copied on success.
408 
409         ***********************************************************************/
410 
411         static size_t transfer (InputStream src, OutputStream dst, size_t max=-1)
412         {
413                 byte[8192] tmp;
414                 size_t     done;
415 
416                 while (max)
417                       {
418                       auto len = max;
419                       if (len > tmp.length)
420                           len = tmp.length;
421 
422                       if ((len = src.read(tmp[0 .. len])) is Eof)
423                            max = 0;
424                       else
425                          {
426                          max -= len;
427                          done += len;
428                          auto p = tmp.ptr;
429                          for (size_t j=0; len > 0; len -= j, p += j)
430                               if ((j = dst.write (p[0 .. len])) is Eof)
431                                    return Eof;
432                          }
433                       }
434 
435                 return done;
436         }
437 }
438 
439 
440 /*******************************************************************************
441 
442         Base class for input stream filtering. The provided source stream
443         should generally never be null, though some filters have a need to
444         set this lazily.
445 
446 *******************************************************************************/
447 
448 class InputFilter : InputStream
449 {
450         protected InputStream source;
451 
452         /***********************************************************************
453 
454                 Attach to the provided stream. The provided source stream
455                 should generally never be null, though some filters have a
456                 need to set this lazily.
457 
458         ***********************************************************************/
459 
460         this (InputStream source)
461         {
462                 this.source = source;
463         }
464 
465         /***********************************************************************
466 
467                 Return the hosting conduit.
468 
469         ***********************************************************************/
470 
471         IConduit conduit ()
472         {
473                 return source.conduit;
474         }
475 
476         /***********************************************************************
477 
478                 Read from conduit into a target array. The provided dst
479                 will be populated with content from the conduit.
480 
481                 Returns the number of bytes read, which may be less than
482                 requested in dst. Eof is returned whenever an end-of-flow
483                 condition arises.
484 
485         ***********************************************************************/
486 
487         size_t read (void[] dst)
488         {
489                 return source.read (dst);
490         }
491 
492         /***********************************************************************
493 
494                 Load the bits from a stream, and return them all in an
495                 array. The dst array can be provided as an option, which
496                 will be expanded as necessary to consume the input.
497 
498                 Returns an array representing the content, and throws
499                 IOException on error.
500 
501         ***********************************************************************/
502 
503         void[] load (size_t max = -1)
504         {
505                 return Conduit.load (this, max);
506         }
507 
508         /***********************************************************************
509 
510                 Clear any buffered content.
511 
512         ***********************************************************************/
513 
514         IOStream flush ()
515         {
516                 source.flush;
517                 return this;
518         }
519 
520         /***********************************************************************
521 
522                 Seek on this stream. Target conduits that don't support
523                 seeking will throw an IOException.
524 
525         ***********************************************************************/
526 
527         long seek (long offset, Anchor anchor = Anchor.Begin)
528         {
529                 return source.seek (offset, anchor);
530         }
531 
532         /***********************************************************************
533 
534                 Return the upstream host of this filter.
535 
536         ***********************************************************************/
537 
538         InputStream input ()
539         {
540                 return source;
541         }
542 
543         /***********************************************************************
544 
545                 Close the input.
546 
547         ***********************************************************************/
548 
549         void close ()
550         {
551                 source.close;
552         }
553 }
554 
555 
556 /*******************************************************************************
557 
558         Base class for output stream filtering. The provided sink stream
559         should generally never be null, though some filters have a need to
560         set this lazily.
561 
562 *******************************************************************************/
563 
564 class OutputFilter : OutputStream
565 {
566         protected OutputStream sink;
567 
568         /***********************************************************************
569 
570                 Attach to the provided stream.
571 
572         ***********************************************************************/
573 
574         this (OutputStream sink)
575         {
576                 this.sink = sink;
577         }
578 
579         /***********************************************************************
580 
581                 Return the hosting conduit.
582 
583         ***********************************************************************/
584 
585         IConduit conduit ()
586         {
587                 return sink.conduit;
588         }
589 
590         /***********************************************************************
591 
592                 Write to conduit from a source array. The provided src
593                 content will be written to the conduit.
594 
595                 Returns the number of bytes written from src, which may
596                 be less than the quantity provided. Eof is returned when
597                 an end-of-flow condition arises.
598 
599         ***********************************************************************/
600 
601         size_t write (const(void)[] src)
602         {
603                 return sink.write (src);
604         }
605 
606         /***********************************************************************
607 
608                 Transfer the content of another conduit to this one. Returns
609                 a reference to this class, or throws IOException on failure.
610 
611         ***********************************************************************/
612 
613         OutputStream copy (InputStream src, size_t max = -1)
614         {
615                 Conduit.transfer (src, this, max);
616                 return this;
617         }
618 
619         /***********************************************************************
620 
621                 Emit/purge buffered content.
622 
623         ***********************************************************************/
624 
625         IOStream flush ()
626         {
627                 sink.flush;
628                 return this;
629         }
630 
631         /***********************************************************************
632 
633                 Seek on this stream. Target conduits that don't support
634                 seeking will throw an IOException.
635 
636         ***********************************************************************/
637 
638         long seek (long offset, Anchor anchor = Anchor.Begin)
639         {
640                 return sink.seek (offset, anchor);
641         }
642 
643         /***********************************************************************
644 
645                 Return the upstream host of this filter.
646 
647         ***********************************************************************/
648 
649         OutputStream output ()
650         {
651                 return sink;
652         }
653 
654         /***********************************************************************
655 
656                 Close the output.
657 
658         ***********************************************************************/
659 
660         void close ()
661         {
662                 sink.close;
663         }
664 }