1 /******************************************************************************
2 
3     Fiber/coroutine based non-blocking output select client base class
4 
5     Base class for a non-blocking output select client using a fiber/coroutine to
6     suspend operation while waiting for the read event and resume on that event.
7 
8     The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to
9     true. This prevents sending the data passed to send() in partial TCP frames.
10     Note that, if TCP_CORK is enabled, pending data may not be sent immediately.
11     To force sending pending data, call corkFlush().
12 
13     @see http://linux.die.net/man/7/tcp
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24  ******************************************************************************/
25 
26 module ocean.io.select.protocol.fiber.FiberSelectWriter;
27 
28 import ocean.transition;
29 
30 import ocean.core.Verify;
31 
32 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
33 
34 import ocean.io.select.client.model.ISelectClient;
35 
36 import ocean.io.device.IODevice: IOutputDevice;
37 
38 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
39 
40 import ocean.stdc.posix.sys.socket: setsockopt;
41 
42 import ocean.stdc.posix.netinet.in_: IPPROTO_TCP;
43 
44 static if (__VERSION__ >= 2000 && __VERSION__ < 2073)
45     enum { TCP_CORK = 3 }
46 else static if (__VERSION__ >= 2077)
47     import core.sys.linux.netinet.tcp: TCP_CORK;
48 else
49     import core.sys.linux.sys.netinet.tcp: TCP_CORK;
50 
51 debug (Raw) import ocean.io.Stdout: Stderr;
52 
53 
54 
55 /******************************************************************************/
56 
57 class FiberSelectWriter : IFiberSelectProtocol
58 {
59     /**************************************************************************
60 
61         Set to true to make send() send all data immediately if the TCP_CORK
62         feature is enabled. This has the same effect as calling corkFlush()
63         after each send().
64 
65      **************************************************************************/
66 
67     public bool cork_auto_flush = false;
68 
69     /**************************************************************************
70 
71         Output device
72 
73      **************************************************************************/
74 
75     public alias .IOutputDevice IOutputDevice;
76 
77     private IOutputDevice output;
78 
79     /**************************************************************************
80 
81         Data buffer (slices the buffer provided to send())
82 
83      **************************************************************************/
84 
85     private Const!(void)[] data_slice = null;
86 
87     /**************************************************************************
88 
89         Number of bytes sent so far
90 
91      **************************************************************************/
92 
93     protected size_t sent = 0;
94 
95     /**************************************************************************
96 
97         true if the TCP_CORK feature should be enabled when sending data through
98         the socket or false if it should not be enabled.
99 
100      **************************************************************************/
101 
102     private bool cork_ = false;
103 
104     /**************************************************************************/
105 
106     invariant ( )
107     {
108         assert (this.sent <= this.data_slice.length);
109     }
110 
111     /**************************************************************************
112 
113         Constructor.
114 
115         error_e and warning_e may be the same object if distinguishing between
116         error and warning is not required.
117 
118         Params:
119             output    = output device
120             fiber     = output writing fiber
121             warning_e = exception to throw if the remote hung up
122             error_e   = exception to throw on I/O error
123 
124      **************************************************************************/
125 
126     public this ( IOutputDevice output, SelectFiber fiber,
127            IOWarning warning_e, IOError error_e )
128     {
129         super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e);
130     }
131 
132     /**************************************************************************
133 
134         Constructor
135 
136         Uses the conduit, fiber and exceptions from the other
137         IFiberSelectProtocol instance. This is useful when this instance shares
138         the conduit and fiber with another IFiberSelectProtocol instance, e.g.
139         a FiberSelectReader.
140 
141         The conduit owned by the other instance must have been downcast from
142         IOuputDevice.
143 
144         Params:
145             other       = other instance of this class
146 
147      **************************************************************************/
148 
149     public this ( typeof (super) other )
150     {
151         super(other, Event.EPOLLOUT);
152 
153         this.output = cast (IOutputDevice) this.conduit;
154 
155         verify(this.output !is null, typeof (this).stringof ~ ": the conduit of "
156                 ~ "the other " ~ typeof (super).stringof ~ " instance must be a "
157                 ~ IOutputDevice.stringof);
158     }
159 
160     /**************************************************************************
161 
162         Writes data to the output conduit. Whenever the output conduit is not
163         ready for writing, the output writing fiber is suspended and continues
164         writing on resume.
165 
166         Params:
167             data = data to send
168 
169         Returns:
170             this instance
171 
172         Throws:
173             IOException if the connection is closed or broken:
174                 - IOWarning if the remote hung up,
175                 - IOError (IOWarning subclass) on I/O error.
176 
177      **************************************************************************/
178 
179     public typeof (this) send ( Const!(void)[] data )
180 /*    in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor
181     {
182         assert (this.data_slice is null);
183     }*/
184     /*out // FIXME: DMD bug triggered when overriding method with 'out' contract.
185     {
186         assert (!this.data);
187     }*/
188     body
189     {
190         if (data.length)
191         {
192             this.data_slice = data;
193 
194             if (this.cork_)
195             {
196                 this.cork = true;
197             }
198 
199             try
200             {
201                 super.transmitLoop();
202             }
203             finally
204             {
205                 if (this.cork_ && this.cork_auto_flush)
206                 {
207                     /*
208                      * Disabling TCP_CORK is the only way to explicitly flush
209                      * the output buffer.
210                      */
211                     this.setCork(false);
212                     this.setCork(true);
213                 }
214 
215                 this.data_slice = null;
216                 this.sent = 0;
217             }
218         }
219 
220         return this;
221     }
222 
223     /**************************************************************************
224 
225         Enables or disables the TCP_CORK feature.
226 
227         Note that, if is enabled, not all data passed to send() may be sent
228         immediately; to force sending pending data, call flush() after send() or
229         set cork_auto_flush to true before calling send().
230 
231         If enabled is false but the TCP_CORK is currently enabled, pending data
232         will be sent now.
233 
234         Params:
235             enabled = true: enable the TCP_CORK feature; false: disable it.
236 
237         Returns:
238             enabled
239 
240         Throws:
241             IOError if the TCP_CORK option cannot be set for the socket. If the
242             socket was not yet created by socket() or accept() then the request
243             will fail with the EBADF "Bad file descriptor" error code.
244 
245      **************************************************************************/
246 
247     public bool cork ( bool enabled )
248     {
249         this.setCork(this.cork_ = enabled);
250 
251         return this.cork_;
252     }
253 
254     /**************************************************************************
255 
256         Tells whether the TCP_CORK feature is currently enabled.
257 
258         Returns:
259             true if the TCP_CORK feature is currently enabled or false
260             otherwise.
261 
262      **************************************************************************/
263 
264     public bool cork ( )
265     {
266         return this.cork_;
267     }
268 
269     /**************************************************************************
270 
271         Sends all pending data immediately.
272         May be overridden by a subclass; calls corkFlush() by default.
273 
274         Returns:
275             this instance.
276 
277         Throws:
278             IOError on I/O error.
279 
280      **************************************************************************/
281 
282     public typeof (this) flush ( )
283     {
284         if (this.cork_)
285         {
286             /*
287              * Disabling TCP_CORK is the only way to explicitly flush the
288              * output buffer.
289              */
290             this.setCork(false);
291             this.setCork(true);
292         }
293 
294         return this;
295     }
296 
297     /**************************************************************************
298 
299         Clears any pending data in the buffer.
300 
301         Returns:
302             this instance
303 
304      **************************************************************************/
305 
306     public typeof (this) reset ( )
307     {
308         if (this.cork_)
309         {
310             /*
311              * Disabling TCP_CORK is the only way to explicitly clear the
312              * output buffer.
313              */
314             this.setCork(false, false);
315             this.setCork(true, false);
316         }
317 
318         return this;
319     }
320 
321     /**************************************************************************
322 
323         Attempts to write data to the output conduit. The output conduit may or
324         may not write all elements of data.
325 
326         Params:
327             events = events reported for the output conduit
328 
329         Returns:
330             true if all data has been sent or false to try again.
331 
332         Throws:
333             IOException if the connection is closed or broken:
334                 - IOWarning if the remote hung up,
335                 - IOError (IOWarning subclass) on I/O error.
336 
337      **************************************************************************/
338 
339     protected override bool transmit ( Event events )
340     out
341     {
342         assert (this);
343     }
344     body
345     {
346         debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)",
347             super.conduit.fileHandle, this.data_slice, this.data_slice.length);
348 
349         if (this.sent < this.data_slice.length)
350         {
351             .errno = 0;
352 
353             output.ssize_t n = this.output.write(this.data_slice[this.sent .. $]);
354 
355             if (n >= 0)
356             {
357                 this.sent += n;
358             }
359             else
360             {
361                 this.error_e.checkDeviceError("write error", __FILE__, __LINE__);
362 
363                 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up");
364 
365                 int errnum = .errno;
366 
367                 switch (errnum)
368                 {
369                     default:
370                         throw this.error_e.set(errnum, "write error");
371 
372                     case EINTR, EAGAIN:
373                         static if ( EAGAIN != EWOULDBLOCK )
374                         {
375                             case EWOULDBLOCK:
376                         }
377                 }
378             }
379         }
380 
381         return this.sent < this.data_slice.length;
382     }
383 
384 
385     /**************************************************************************
386 
387         Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data.
388 
389         Params:
390             enable = 0 disables TCP_CORK and flushes if previously enabled, a
391                      different value enables TCP_CORK.
392             throw_on_error = true: throw IOError if setting the TCP_CORK option
393                      failed, false: ignore errors. Ignoring errors is desired to
394                      just clear the cork buffer if TCP_CORK seems to be
395                      currently enabled.
396 
397         Throws:
398             IOException on error setting the TCP_CORK option for this.conduit if
399             throw_on_error is true.
400             In practice this can fail only for one of the following reasons:
401              - this.conduit does not contain a valid file descriptor
402                (errno == EBADF). This is the case if the socket was not created
403                by socket() or accept() yet.
404              - this.conduit does not refer to a socket (errno == ENOTSOCK)
405 
406      **************************************************************************/
407 
408     private void setCork ( int enable, bool throw_on_error = true )
409     {
410         this.error_e.enforce(!.setsockopt(this.conduit.fileHandle,
411                                           .IPPROTO_TCP, .TCP_CORK,
412                                           &enable, enable.sizeof) ||
413                              !throw_on_error,
414                              "error setting TCP_CORK option");
415     }
416 }