1 /******************************************************************************
2 
3     Fiber/coroutine based non-blocking output select client base class
4 
5     Base class for a non-blocking output select client using a fiber/coroutine to
6     suspend operation while waiting for the read event and resume on that event.
7 
8     The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to
9     true. This prevents sending the data passed to send() in partial TCP frames.
10     Note that, if TCP_CORK is enabled, pending data may not be sent immediately.
11     To force sending pending data, call corkFlush().
12 
13     @see http://linux.die.net/man/7/tcp
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24  ******************************************************************************/
25 
26 module ocean.io.select.protocol.fiber.FiberSelectWriter;
27 
28 import ocean.meta.types.Qualifiers;
29 import ocean.core.Verify;
30 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
31 import ocean.io.select.client.model.ISelectClient;
32 import ocean.io.device.IODevice: IOutputDevice;
33 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
34 import core.sys.posix.netinet.in_: IPPROTO_TCP;
35 import core.sys.posix.sys.socket: setsockopt;
36 import core.sys.linux.netinet.tcp: TCP_CORK;
37 
38 debug (Raw) import ocean.io.Stdout: Stderr;
39 
40 
41 
42 /******************************************************************************/
43 
44 class FiberSelectWriter : IFiberSelectProtocol
45 {
46     /**************************************************************************
47 
48         Set to true to make send() send all data immediately if the TCP_CORK
49         feature is enabled. This has the same effect as calling corkFlush()
50         after each send().
51 
52      **************************************************************************/
53 
54     public bool cork_auto_flush = false;
55 
56     /**************************************************************************
57 
58         Output device
59 
60      **************************************************************************/
61 
62     public alias .IOutputDevice IOutputDevice;
63 
64     private IOutputDevice output;
65 
66     /**************************************************************************
67 
68         Data buffer (slices the buffer provided to send())
69 
70      **************************************************************************/
71 
72     private const(void)[] data_slice = null;
73 
74     /**************************************************************************
75 
76         Number of bytes sent so far
77 
78      **************************************************************************/
79 
80     protected size_t sent = 0;
81 
82     /**************************************************************************
83 
84         true if the TCP_CORK feature should be enabled when sending data through
85         the socket or false if it should not be enabled.
86 
87      **************************************************************************/
88 
89     private bool cork_ = false;
90 
91     /**************************************************************************/
92 
93     invariant ( )
94     {
95         assert (this.sent <= this.data_slice.length);
96     }
97 
98     /**************************************************************************
99 
100         Constructor.
101 
102         error_e and warning_e may be the same object if distinguishing between
103         error and warning is not required.
104 
105         Params:
106             output    = output device
107             fiber     = output writing fiber
108             warning_e = exception to throw if the remote hung up
109             error_e   = exception to throw on I/O error
110 
111      **************************************************************************/
112 
113     public this ( IOutputDevice output, SelectFiber fiber,
114            IOWarning warning_e, IOError error_e )
115     {
116         super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e);
117     }
118 
119     /**************************************************************************
120 
121         Constructor
122 
123         Uses the conduit, fiber and exceptions from the other
124         IFiberSelectProtocol instance. This is useful when this instance shares
125         the conduit and fiber with another IFiberSelectProtocol instance, e.g.
126         a FiberSelectReader.
127 
128         The conduit owned by the other instance must have been downcast from
129         IOuputDevice.
130 
131         Params:
132             other       = other instance of this class
133 
134      **************************************************************************/
135 
136     public this ( typeof (super) other )
137     {
138         super(other, Event.EPOLLOUT);
139 
140         this.output = cast (IOutputDevice) this.conduit;
141 
142         verify(this.output !is null, typeof (this).stringof ~ ": the conduit of "
143                 ~ "the other " ~ typeof (super).stringof ~ " instance must be a "
144                 ~ IOutputDevice.stringof);
145     }
146 
147     /**************************************************************************
148 
149         Writes data to the output conduit. Whenever the output conduit is not
150         ready for writing, the output writing fiber is suspended and continues
151         writing on resume.
152 
153         Params:
154             data = data to send
155 
156         Returns:
157             this instance
158 
159         Throws:
160             IOException if the connection is closed or broken:
161                 - IOWarning if the remote hung up,
162                 - IOError (IOWarning subclass) on I/O error.
163 
164      **************************************************************************/
165 
166     public typeof (this) send ( const(void)[] data )
167 /*    in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor
168     {
169         assert (this.data_slice is null);
170     }*/
171     /*out // FIXME: DMD bug triggered when overriding method with 'out' contract.
172     {
173         assert (!this.data);
174     }*/
175     do
176     {
177         if (data.length)
178         {
179             this.data_slice = data;
180 
181             if (this.cork_)
182             {
183                 this.cork = true;
184             }
185 
186             try
187             {
188                 super.transmitLoop();
189             }
190             finally
191             {
192                 if (this.cork_ && this.cork_auto_flush)
193                 {
194                     /*
195                      * Disabling TCP_CORK is the only way to explicitly flush
196                      * the output buffer.
197                      */
198                     this.setCork(false);
199                     this.setCork(true);
200                 }
201 
202                 this.data_slice = null;
203                 this.sent = 0;
204             }
205         }
206 
207         return this;
208     }
209 
210     /**************************************************************************
211 
212         Enables or disables the TCP_CORK feature.
213 
214         Note that, if is enabled, not all data passed to send() may be sent
215         immediately; to force sending pending data, call flush() after send() or
216         set cork_auto_flush to true before calling send().
217 
218         If enabled is false but the TCP_CORK is currently enabled, pending data
219         will be sent now.
220 
221         Params:
222             enabled = true: enable the TCP_CORK feature; false: disable it.
223 
224         Returns:
225             enabled
226 
227         Throws:
228             IOError if the TCP_CORK option cannot be set for the socket. If the
229             socket was not yet created by socket() or accept() then the request
230             will fail with the EBADF "Bad file descriptor" error code.
231 
232      **************************************************************************/
233 
234     public bool cork ( bool enabled )
235     {
236         this.setCork(this.cork_ = enabled);
237 
238         return this.cork_;
239     }
240 
241     /**************************************************************************
242 
243         Tells whether the TCP_CORK feature is currently enabled.
244 
245         Returns:
246             true if the TCP_CORK feature is currently enabled or false
247             otherwise.
248 
249      **************************************************************************/
250 
251     public bool cork ( )
252     {
253         return this.cork_;
254     }
255 
256     /**************************************************************************
257 
258         Sends all pending data immediately.
259         May be overridden by a subclass; calls corkFlush() by default.
260 
261         Returns:
262             this instance.
263 
264         Throws:
265             IOError on I/O error.
266 
267      **************************************************************************/
268 
269     public typeof (this) flush ( )
270     {
271         if (this.cork_)
272         {
273             /*
274              * Disabling TCP_CORK is the only way to explicitly flush the
275              * output buffer.
276              */
277             this.setCork(false);
278             this.setCork(true);
279         }
280 
281         return this;
282     }
283 
284     /**************************************************************************
285 
286         Clears any pending data in the buffer.
287 
288         Returns:
289             this instance
290 
291      **************************************************************************/
292 
293     public typeof (this) reset ( )
294     {
295         if (this.cork_)
296         {
297             /*
298              * Disabling TCP_CORK is the only way to explicitly clear the
299              * output buffer.
300              */
301             this.setCork(false, false);
302             this.setCork(true, false);
303         }
304 
305         return this;
306     }
307 
308     /**************************************************************************
309 
310         Attempts to write data to the output conduit. The output conduit may or
311         may not write all elements of data.
312 
313         Params:
314             events = events reported for the output conduit
315 
316         Returns:
317             true if all data has been sent or false to try again.
318 
319         Throws:
320             IOException if the connection is closed or broken:
321                 - IOWarning if the remote hung up,
322                 - IOError (IOWarning subclass) on I/O error.
323 
324      **************************************************************************/
325 
326     protected override bool transmit ( Event events )
327     out
328     {
329         assert (this);
330     }
331     do
332     {
333         debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)",
334             super.conduit.fileHandle, this.data_slice, this.data_slice.length);
335 
336         if (this.sent < this.data_slice.length)
337         {
338             .errno = 0;
339 
340             output.ssize_t n = this.output.write(this.data_slice[this.sent .. $]);
341 
342             if (n >= 0)
343             {
344                 this.sent += n;
345             }
346             else
347             {
348                 this.error_e.checkDeviceError("write error", __FILE__, __LINE__);
349 
350                 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up");
351 
352                 int errnum = .errno;
353 
354                 switch (errnum)
355                 {
356                     default:
357                         throw this.error_e.set(errnum, "write error");
358 
359                     case EINTR, EAGAIN:
360                         static if ( EAGAIN != EWOULDBLOCK )
361                         {
362                             case EWOULDBLOCK:
363                         }
364                 }
365             }
366         }
367 
368         return this.sent < this.data_slice.length;
369     }
370 
371 
372     /**************************************************************************
373 
374         Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data.
375 
376         Params:
377             enable = 0 disables TCP_CORK and flushes if previously enabled, a
378                      different value enables TCP_CORK.
379             throw_on_error = true: throw IOError if setting the TCP_CORK option
380                      failed, false: ignore errors. Ignoring errors is desired to
381                      just clear the cork buffer if TCP_CORK seems to be
382                      currently enabled.
383 
384         Throws:
385             IOException on error setting the TCP_CORK option for this.conduit if
386             throw_on_error is true.
387             In practice this can fail only for one of the following reasons:
388              - this.conduit does not contain a valid file descriptor
389                (errno == EBADF). This is the case if the socket was not created
390                by socket() or accept() yet.
391              - this.conduit does not refer to a socket (errno == ENOTSOCK)
392 
393      **************************************************************************/
394 
395     private void setCork ( int enable, bool throw_on_error = true )
396     {
397         this.error_e.enforce(!.setsockopt(this.conduit.fileHandle,
398                                           .IPPROTO_TCP, .TCP_CORK,
399                                           &enable, enable.sizeof) ||
400                              !throw_on_error,
401                              "error setting TCP_CORK option");
402     }
403 }