1 /******************************************************************************
2 
3     Fiber/coroutine based buffered non-blocking output select client
4 
5     Copyright:
6         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14  ******************************************************************************/
15 
16 module ocean.io.select.protocol.fiber.BufferedFiberSelectWriter;
17 
18 import ocean.meta.types.Qualifiers;
19 
20 import ocean.core.Verify;
21 
22 import ocean.io.select.protocol.fiber.FiberSelectWriter;
23 
24 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
25 
26 import ocean.util.container.AppendBuffer;
27 
28 /******************************************************************************/
29 
30 class BufferedFiberSelectWriter : FiberSelectWriter
31 {
32     /**************************************************************************
33 
34         Default output buffer size (64 kB)
35 
36      **************************************************************************/
37 
38     public static immutable default_buffer_size = 64 * 1024;
39 
40     /**************************************************************************
41 
42         AppendBuffer instance
43 
44      **************************************************************************/
45 
46     private AppendBuffer!(void) buffer;
47 
48     /**************************************************************************
49 
50         Constructor
51 
52         Params:
53             output = output device
54             fiber = output reading fiber
55             warning_e = exception to throw on end-of-flow condition or if the
56                 remote hung up
57             error_e = exception to throw on I/O error
58             size = buffer size
59 
60         In:
61             The buffer size must not be 0.
62 
63      **************************************************************************/
64 
65     public this ( IOutputDevice output, SelectFiber fiber,
66                   IOWarning warning_e, IOError error_e,
67                   size_t size = default_buffer_size )
68     {
69         verify(size != 0, "zero input buffer size specified");
70 
71         super(output, fiber, warning_e, error_e);
72         this.buffer = new AppendBuffer!(void)(size, true);
73     }
74 
75     /**************************************************************************
76 
77         Constructor
78 
79         Uses the conduit, fiber and exceptions from the other
80         IFiberSelectProtocol instance. This is useful when this instance shares
81         the conduit and fiber with another IFiberSelectProtocol instance, e.g.
82         a FiberSelectWriter.
83 
84         The conduit owned by the other instance must have been downcast from
85         IInputDevice.
86 
87         Params:
88             other       = other instance of this class
89             size        = output buffer size
90 
91         In:
92             buffer_size must not be 0.
93 
94      **************************************************************************/
95 
96     public this ( IFiberSelectProtocol other, size_t size = default_buffer_size )
97     {
98         verify(size != 0, "zero input buffer size specified");
99 
100         super(other);
101         this.buffer = new AppendBuffer!(void)(size, true);
102     }
103 
104     /**************************************************************************
105 
106         Returns:
107             current buffer size
108 
109      **************************************************************************/
110 
111     public size_t buffer_size ( )
112     {
113         return this.buffer.capacity;
114     }
115 
116     /**************************************************************************
117 
118         Flushes the buffer and sends all pending data.
119 
120         Returns:
121             this instance.
122 
123      **************************************************************************/
124 
125     public override typeof (this) flush ( )
126     {
127         this.flushBuffer();
128         super.flush();
129 
130         return this;
131     }
132 
133     /**************************************************************************
134 
135         Clears any pending data in the buffer.
136 
137         Returns:
138             this instance
139 
140      **************************************************************************/
141 
142     public override typeof (this) reset ( )
143     {
144         this.buffer.clear();
145         super.reset();
146 
147         return this;
148     }
149 
150     /**************************************************************************
151 
152         Sets the buffer size to s. If there are currently more than s bytes of
153         data in the buffer, flush() is called before setting the size.
154 
155         Params:
156             s = new buffer size
157 
158         Returns:
159             new buffer size
160 
161         In:
162             The new buffer size must not be 0.
163 
164      **************************************************************************/
165 
166     public size_t buffer_size ( size_t s )
167     out (n)
168     {
169         assert (n == s);
170     }
171     do
172     {
173         verify(s != 0, typeof (this).stringof ~ ".buffer_size: 0 specified");
174 
175         if (s < this.buffer.length)
176         {
177             this.flushBuffer();
178         }
179 
180         return this.buffer.capacity = s;
181     }
182 
183     /**************************************************************************
184 
185         Sends data_.
186 
187         Params:
188             data = data to send
189 
190         Returns:
191             this instance.
192 
193      **************************************************************************/
194 
195     public override typeof (this) send ( const(void)[] data )
196     {
197         if (data.length < this.buffer.capacity)
198         {
199             auto dst = this.buffer.extend(data.length);
200 
201             dst[] = data[0 .. dst.length];
202 
203             auto left = data[dst.length .. $];
204 
205             if (left.length || this.buffer.length == this.buffer.capacity)
206             {
207                 this.flushBuffer();
208             }
209 
210             if (left.length)
211             {
212                 this.buffer ~= left;
213             }
214         }
215         else
216         {
217             this.flushBuffer();
218             super.send(data);
219         }
220 
221         return this;
222     }
223 
224     /**************************************************************************
225 
226         Flushes the buffer. Pending data may not be sent immediately, for
227         example, if the TCP_CORK feature is enabled in the super class.
228 
229      **************************************************************************/
230 
231     private void flushBuffer ( )
232     {
233         super.send(this.buffer.dump());
234     }
235 }