1 /*******************************************************************************
2 
3   Copyright:
4       Copyright (c) 2006 Juan Jose Comellas.
5       Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH.
6       All rights reserved.
7 
8   License:
9       Tango Dual License: 3-Clause BSD License / Academic Free License v3.0.
10       See LICENSE_TANGO.txt for details.
11 
12   Authors: Juan Jose Comellas <juanjo@comellas.com.ar>
13 
14 *******************************************************************************/
15 
16 module ocean.sys.Pipe;
17 
18 import ocean.meta.types.Qualifiers;
19 
20 import ocean.sys.Common;
21 import ocean.io.device.Device;
22 
23 import ocean.core.ExceptionDefinitions;
24 
25 import core.sys.posix.unistd;
26 
27 debug (PipeConduit)
28 {
29     import ocean.io.Stdout;
30 }
31 
32 private enum {DefaultBufferSize = 8 * 1024}
33 
34 
35 /**
36  * Conduit for pipes.
37  *
38  * Each PipeConduit can only read or write, depending on the way it has been
39  * created.
40  */
41 
42 class PipeConduit : Device
43 {
44     private uint _bufferSize;
45 
46 
47     /**
48      * Create a PipeConduit with the provided handle and access permissions.
49      *
50      * Params:
51      * handle       = handle of the operating system pipe we will wrap inside
52      *                the PipeConduit.
53      * style        = access flags for the pipe (readable, writable, etc.).
54      * bufferSize   = buffer size.
55      */
56     private this(Handle handle, uint bufferSize = DefaultBufferSize)
57     {
58         this.handle = handle;
59         _bufferSize = bufferSize;
60     }
61 
62     /**
63      * Destructor.
64      */
65     public ~this()
66     {
67         close();
68     }
69 
70     /**
71      * Returns the buffer size for the PipeConduit.
72      */
73     public override size_t bufferSize()
74     {
75         return _bufferSize;
76     }
77 
78     /**
79      * Returns the name of the device.
80      */
81     public override istring toString()
82     {
83         return "<pipe>";
84     }
85 }
86 
87 /**
88  * Factory class for Pipes.
89  */
90 class Pipe
91 {
92     private PipeConduit _source;
93     private PipeConduit _sink;
94 
95     /**
96      * Create a Pipe.
97      */
98     public this(uint bufferSize = DefaultBufferSize)
99     {
100         int[2] fd;
101 
102         if (pipe(fd) == 0)
103         {
104             _source = new PipeConduit(cast(ISelectable.Handle) fd[0], bufferSize);
105             _sink = new PipeConduit(cast(ISelectable.Handle) fd[1], bufferSize);
106         }
107         else
108         {
109             error();
110         }
111     }
112 
113     /* Replaces the old pipe with a new one. No memory allocation is performed.
114     */
115     public void recreate(uint bufferSize = DefaultBufferSize)
116     {
117         int[2] fd;
118 
119         if (pipe(fd) == 0)
120         {
121             _source.reopen(cast(ISelectable.Handle) fd[0]);
122             _source._bufferSize = bufferSize;
123             _sink.reopen(cast(ISelectable.Handle) fd[1]);
124             _sink._bufferSize = bufferSize;
125         }
126         else
127         {
128             error();
129         }
130     }
131 
132     /**
133      * Return the PipeConduit that you can write to.
134      */
135     public PipeConduit sink()
136     {
137         return _sink;
138     }
139 
140     /**
141      * Return the PipeConduit that you can read from.
142      */
143     public PipeConduit source()
144     {
145         return _source;
146     }
147 
148     /**
149      * Closes source and sink conduits.
150      */
151     public void close()
152     {
153         _source.close();
154         _sink.close();
155     }
156 
157     /**
158      *
159      */
160     private final void error ()
161     {
162         throw new IOException("Pipe error: " ~ SysError.lastMsg);
163     }
164 }
165