1 /******************************************************************************
2 3 Fiber/coroutine based non-blocking output select client base class
4 5 Base class for a non-blocking output select client using a fiber/coroutine to
6 suspend operation while waiting for the read event and resume on that event.
7 8 The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to
9 true. This prevents sending the data passed to send() in partial TCP frames.
10 Note that, if TCP_CORK is enabled, pending data may not be sent immediately.
11 To force sending pending data, call corkFlush().
12 13 @see http://linux.die.net/man/7/tcp
14 15 Copyright:
16 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17 All rights reserved.
18 19 License:
20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21 Alternatively, this file may be distributed under the terms of the Tango
22 3-Clause BSD License (see LICENSE_BSD.txt for details).
23 24 ******************************************************************************/25 26 moduleocean.io.select.protocol.fiber.FiberSelectWriter;
27 28 importocean.meta.types.Qualifiers;
29 importocean.core.Verify;
30 importocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
31 importocean.io.select.client.model.ISelectClient;
32 importocean.io.device.IODevice: IOutputDevice;
33 importcore.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
34 importcore.sys.posix.netinet.in_: IPPROTO_TCP;
35 importcore.sys.posix.sys.socket: setsockopt;
36 importcore.sys.linux.netinet.tcp: TCP_CORK;
37 38 debug (Raw) importocean.io.Stdout: Stderr;
39 40 41 42 /******************************************************************************/43 44 classFiberSelectWriter : IFiberSelectProtocol45 {
46 /**************************************************************************
47 48 Set to true to make send() send all data immediately if the TCP_CORK
49 feature is enabled. This has the same effect as calling corkFlush()
50 after each send().
51 52 **************************************************************************/53 54 publicboolcork_auto_flush = false;
55 56 /**************************************************************************
57 58 Output device
59 60 **************************************************************************/61 62 publicalias .IOutputDeviceIOutputDevice;
63 64 privateIOutputDeviceoutput;
65 66 /**************************************************************************
67 68 Data buffer (slices the buffer provided to send())
69 70 **************************************************************************/71 72 privateconst(void)[] data_slice = null;
73 74 /**************************************************************************
75 76 Number of bytes sent so far
77 78 **************************************************************************/79 80 protectedsize_tsent = 0;
81 82 /**************************************************************************
83 84 true if the TCP_CORK feature should be enabled when sending data through
85 the socket or false if it should not be enabled.
86 87 **************************************************************************/88 89 privateboolcork_ = false;
90 91 /**************************************************************************/92 93 invariant ( )
94 {
95 assert (this.sent <= this.data_slice.length);
96 }
97 98 /**************************************************************************
99 100 Constructor.
101 102 error_e and warning_e may be the same object if distinguishing between
103 error and warning is not required.
104 105 Params:
106 output = output device
107 fiber = output writing fiber
108 warning_e = exception to throw if the remote hung up
109 error_e = exception to throw on I/O error
110 111 **************************************************************************/112 113 publicthis ( IOutputDeviceoutput, SelectFiberfiber,
114 IOWarningwarning_e, IOErrorerror_e )
115 {
116 super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e);
117 }
118 119 /**************************************************************************
120 121 Constructor
122 123 Uses the conduit, fiber and exceptions from the other
124 IFiberSelectProtocol instance. This is useful when this instance shares
125 the conduit and fiber with another IFiberSelectProtocol instance, e.g.
126 a FiberSelectReader.
127 128 The conduit owned by the other instance must have been downcast from
129 IOuputDevice.
130 131 Params:
132 other = other instance of this class
133 134 **************************************************************************/135 136 publicthis ( typeof (super) other )
137 {
138 super(other, Event.EPOLLOUT);
139 140 this.output = cast (IOutputDevice) this.conduit;
141 142 verify(this.output !isnull, typeof (this).stringof ~ ": the conduit of "143 ~ "the other " ~ typeof (super).stringof ~ " instance must be a "144 ~ IOutputDevice.stringof);
145 }
146 147 /**************************************************************************
148 149 Writes data to the output conduit. Whenever the output conduit is not
150 ready for writing, the output writing fiber is suspended and continues
151 writing on resume.
152 153 Params:
154 data = data to send
155 156 Returns:
157 this instance
158 159 Throws:
160 IOException if the connection is closed or broken:
161 - IOWarning if the remote hung up,
162 - IOError (IOWarning subclass) on I/O error.
163 164 **************************************************************************/165 166 publictypeof (this) send ( const(void)[] data )
167 /* in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor
168 {
169 assert (this.data_slice is null);
170 }*/171 /*out // FIXME: DMD bug triggered when overriding method with 'out' contract.
172 {
173 assert (!this.data);
174 }*/175 do176 {
177 if (data.length)
178 {
179 this.data_slice = data;
180 181 if (this.cork_)
182 {
183 this.cork = true;
184 }
185 186 try187 {
188 super.transmitLoop();
189 }
190 finally191 {
192 if (this.cork_ && this.cork_auto_flush)
193 {
194 /*
195 * Disabling TCP_CORK is the only way to explicitly flush
196 * the output buffer.
197 */198 this.setCork(false);
199 this.setCork(true);
200 }
201 202 this.data_slice = null;
203 this.sent = 0;
204 }
205 }
206 207 returnthis;
208 }
209 210 /**************************************************************************
211 212 Enables or disables the TCP_CORK feature.
213 214 Note that, if is enabled, not all data passed to send() may be sent
215 immediately; to force sending pending data, call flush() after send() or
216 set cork_auto_flush to true before calling send().
217 218 If enabled is false but the TCP_CORK is currently enabled, pending data
219 will be sent now.
220 221 Params:
222 enabled = true: enable the TCP_CORK feature; false: disable it.
223 224 Returns:
225 enabled
226 227 Throws:
228 IOError if the TCP_CORK option cannot be set for the socket. If the
229 socket was not yet created by socket() or accept() then the request
230 will fail with the EBADF "Bad file descriptor" error code.
231 232 **************************************************************************/233 234 publicboolcork ( boolenabled )
235 {
236 this.setCork(this.cork_ = enabled);
237 238 returnthis.cork_;
239 }
240 241 /**************************************************************************
242 243 Tells whether the TCP_CORK feature is currently enabled.
244 245 Returns:
246 true if the TCP_CORK feature is currently enabled or false
247 otherwise.
248 249 **************************************************************************/250 251 publicboolcork ( )
252 {
253 returnthis.cork_;
254 }
255 256 /**************************************************************************
257 258 Sends all pending data immediately.
259 May be overridden by a subclass; calls corkFlush() by default.
260 261 Returns:
262 this instance.
263 264 Throws:
265 IOError on I/O error.
266 267 **************************************************************************/268 269 publictypeof (this) flush ( )
270 {
271 if (this.cork_)
272 {
273 /*
274 * Disabling TCP_CORK is the only way to explicitly flush the
275 * output buffer.
276 */277 this.setCork(false);
278 this.setCork(true);
279 }
280 281 returnthis;
282 }
283 284 /**************************************************************************
285 286 Clears any pending data in the buffer.
287 288 Returns:
289 this instance
290 291 **************************************************************************/292 293 publictypeof (this) reset ( )
294 {
295 if (this.cork_)
296 {
297 /*
298 * Disabling TCP_CORK is the only way to explicitly clear the
299 * output buffer.
300 */301 this.setCork(false, false);
302 this.setCork(true, false);
303 }
304 305 returnthis;
306 }
307 308 /**************************************************************************
309 310 Attempts to write data to the output conduit. The output conduit may or
311 may not write all elements of data.
312 313 Params:
314 events = events reported for the output conduit
315 316 Returns:
317 true if all data has been sent or false to try again.
318 319 Throws:
320 IOException if the connection is closed or broken:
321 - IOWarning if the remote hung up,
322 - IOError (IOWarning subclass) on I/O error.
323 324 **************************************************************************/325 326 protectedoverridebooltransmit ( Eventevents )
327 out328 {
329 assert (this);
330 }
331 do332 {
333 debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)",
334 super.conduit.fileHandle, this.data_slice, this.data_slice.length);
335 336 if (this.sent < this.data_slice.length)
337 {
338 .errno = 0;
339 340 output.ssize_tn = this.output.write(this.data_slice[this.sent .. $]);
341 342 if (n >= 0)
343 {
344 this.sent += n;
345 }
346 else347 {
348 this.error_e.checkDeviceError("write error", __FILE__, __LINE__);
349 350 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up");
351 352 interrnum = .errno;
353 354 switch (errnum)
355 {
356 default:
357 throwthis.error_e.set(errnum, "write error");
358 359 caseEINTR, EAGAIN:
360 staticif ( EAGAIN != EWOULDBLOCK )
361 {
362 caseEWOULDBLOCK:
363 }
364 }
365 }
366 }
367 368 returnthis.sent < this.data_slice.length;
369 }
370 371 372 /**************************************************************************
373 374 Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data.
375 376 Params:
377 enable = 0 disables TCP_CORK and flushes if previously enabled, a
378 different value enables TCP_CORK.
379 throw_on_error = true: throw IOError if setting the TCP_CORK option
380 failed, false: ignore errors. Ignoring errors is desired to
381 just clear the cork buffer if TCP_CORK seems to be
382 currently enabled.
383 384 Throws:
385 IOException on error setting the TCP_CORK option for this.conduit if
386 throw_on_error is true.
387 In practice this can fail only for one of the following reasons:
388 - this.conduit does not contain a valid file descriptor
389 (errno == EBADF). This is the case if the socket was not created
390 by socket() or accept() yet.
391 - this.conduit does not refer to a socket (errno == ENOTSOCK)
392 393 **************************************************************************/394 395 privatevoidsetCork ( intenable, boolthrow_on_error = true )
396 {
397 this.error_e.enforce(!.setsockopt(this.conduit.fileHandle,
398 .IPPROTO_TCP, .TCP_CORK,
399 &enable, enable.sizeof) ||
400 !throw_on_error,
401 "error setting TCP_CORK option");
402 }
403 }