1 /******************************************************************************
2 3 Fiber/coroutine based non-blocking output select client base class
4 5 Base class for a non-blocking output select client using a fiber/coroutine to
6 suspend operation while waiting for the read event and resume on that event.
7 8 The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to
9 true. This prevents sending the data passed to send() in partial TCP frames.
10 Note that, if TCP_CORK is enabled, pending data may not be sent immediately.
11 To force sending pending data, call corkFlush().
12 13 @see http://linux.die.net/man/7/tcp
14 15 Copyright:
16 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17 All rights reserved.
18 19 License:
20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21 Alternatively, this file may be distributed under the terms of the Tango
22 3-Clause BSD License (see LICENSE_BSD.txt for details).
23 24 ******************************************************************************/25 26 moduleocean.io.select.protocol.fiber.FiberSelectWriter;
27 28 importocean.transition;
29 30 importocean.core.Verify;
31 32 importocean.io.select.protocol.fiber.model.IFiberSelectProtocol;
33 34 importocean.io.select.client.model.ISelectClient;
35 36 importocean.io.device.IODevice: IOutputDevice;
37 38 importcore.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR;
39 40 importocean.stdc.posix.sys.socket: setsockopt;
41 42 importocean.stdc.posix.netinet.in_: IPPROTO_TCP;
43 44 staticif (__VERSION__ >= 2000 && __VERSION__ < 2073)
45 enum { TCP_CORK = 3 }
46 elsestaticif (__VERSION__ >= 2077)
47 importcore.sys.linux.netinet.tcp: TCP_CORK;
48 else49 importcore.sys.linux.sys.netinet.tcp: TCP_CORK;
50 51 debug (Raw) importocean.io.Stdout: Stderr;
52 53 54 55 /******************************************************************************/56 57 classFiberSelectWriter : IFiberSelectProtocol58 {
59 /**************************************************************************
60 61 Set to true to make send() send all data immediately if the TCP_CORK
62 feature is enabled. This has the same effect as calling corkFlush()
63 after each send().
64 65 **************************************************************************/66 67 publicboolcork_auto_flush = false;
68 69 /**************************************************************************
70 71 Output device
72 73 **************************************************************************/74 75 publicalias .IOutputDeviceIOutputDevice;
76 77 privateIOutputDeviceoutput;
78 79 /**************************************************************************
80 81 Data buffer (slices the buffer provided to send())
82 83 **************************************************************************/84 85 privateConst!(void)[] data_slice = null;
86 87 /**************************************************************************
88 89 Number of bytes sent so far
90 91 **************************************************************************/92 93 protectedsize_tsent = 0;
94 95 /**************************************************************************
96 97 true if the TCP_CORK feature should be enabled when sending data through
98 the socket or false if it should not be enabled.
99 100 **************************************************************************/101 102 privateboolcork_ = false;
103 104 /**************************************************************************/105 106 invariant ( )
107 {
108 assert (this.sent <= this.data_slice.length);
109 }
110 111 /**************************************************************************
112 113 Constructor.
114 115 error_e and warning_e may be the same object if distinguishing between
116 error and warning is not required.
117 118 Params:
119 output = output device
120 fiber = output writing fiber
121 warning_e = exception to throw if the remote hung up
122 error_e = exception to throw on I/O error
123 124 **************************************************************************/125 126 publicthis ( IOutputDeviceoutput, SelectFiberfiber,
127 IOWarningwarning_e, IOErrorerror_e )
128 {
129 super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e);
130 }
131 132 /**************************************************************************
133 134 Constructor
135 136 Uses the conduit, fiber and exceptions from the other
137 IFiberSelectProtocol instance. This is useful when this instance shares
138 the conduit and fiber with another IFiberSelectProtocol instance, e.g.
139 a FiberSelectReader.
140 141 The conduit owned by the other instance must have been downcast from
142 IOuputDevice.
143 144 Params:
145 other = other instance of this class
146 147 **************************************************************************/148 149 publicthis ( typeof (super) other )
150 {
151 super(other, Event.EPOLLOUT);
152 153 this.output = cast (IOutputDevice) this.conduit;
154 155 verify(this.output !isnull, typeof (this).stringof ~ ": the conduit of "156 ~ "the other " ~ typeof (super).stringof ~ " instance must be a "157 ~ IOutputDevice.stringof);
158 }
159 160 /**************************************************************************
161 162 Writes data to the output conduit. Whenever the output conduit is not
163 ready for writing, the output writing fiber is suspended and continues
164 writing on resume.
165 166 Params:
167 data = data to send
168 169 Returns:
170 this instance
171 172 Throws:
173 IOException if the connection is closed or broken:
174 - IOWarning if the remote hung up,
175 - IOError (IOWarning subclass) on I/O error.
176 177 **************************************************************************/178 179 publictypeof (this) send ( Const!(void)[] data )
180 /* in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor
181 {
182 assert (this.data_slice is null);
183 }*/184 /*out // FIXME: DMD bug triggered when overriding method with 'out' contract.
185 {
186 assert (!this.data);
187 }*/188 body189 {
190 if (data.length)
191 {
192 this.data_slice = data;
193 194 if (this.cork_)
195 {
196 this.cork = true;
197 }
198 199 try200 {
201 super.transmitLoop();
202 }
203 finally204 {
205 if (this.cork_ && this.cork_auto_flush)
206 {
207 /*
208 * Disabling TCP_CORK is the only way to explicitly flush
209 * the output buffer.
210 */211 this.setCork(false);
212 this.setCork(true);
213 }
214 215 this.data_slice = null;
216 this.sent = 0;
217 }
218 }
219 220 returnthis;
221 }
222 223 /**************************************************************************
224 225 Enables or disables the TCP_CORK feature.
226 227 Note that, if is enabled, not all data passed to send() may be sent
228 immediately; to force sending pending data, call flush() after send() or
229 set cork_auto_flush to true before calling send().
230 231 If enabled is false but the TCP_CORK is currently enabled, pending data
232 will be sent now.
233 234 Params:
235 enabled = true: enable the TCP_CORK feature; false: disable it.
236 237 Returns:
238 enabled
239 240 Throws:
241 IOError if the TCP_CORK option cannot be set for the socket. If the
242 socket was not yet created by socket() or accept() then the request
243 will fail with the EBADF "Bad file descriptor" error code.
244 245 **************************************************************************/246 247 publicboolcork ( boolenabled )
248 {
249 this.setCork(this.cork_ = enabled);
250 251 returnthis.cork_;
252 }
253 254 /**************************************************************************
255 256 Tells whether the TCP_CORK feature is currently enabled.
257 258 Returns:
259 true if the TCP_CORK feature is currently enabled or false
260 otherwise.
261 262 **************************************************************************/263 264 publicboolcork ( )
265 {
266 returnthis.cork_;
267 }
268 269 /**************************************************************************
270 271 Sends all pending data immediately.
272 May be overridden by a subclass; calls corkFlush() by default.
273 274 Returns:
275 this instance.
276 277 Throws:
278 IOError on I/O error.
279 280 **************************************************************************/281 282 publictypeof (this) flush ( )
283 {
284 if (this.cork_)
285 {
286 /*
287 * Disabling TCP_CORK is the only way to explicitly flush the
288 * output buffer.
289 */290 this.setCork(false);
291 this.setCork(true);
292 }
293 294 returnthis;
295 }
296 297 /**************************************************************************
298 299 Clears any pending data in the buffer.
300 301 Returns:
302 this instance
303 304 **************************************************************************/305 306 publictypeof (this) reset ( )
307 {
308 if (this.cork_)
309 {
310 /*
311 * Disabling TCP_CORK is the only way to explicitly clear the
312 * output buffer.
313 */314 this.setCork(false, false);
315 this.setCork(true, false);
316 }
317 318 returnthis;
319 }
320 321 /**************************************************************************
322 323 Attempts to write data to the output conduit. The output conduit may or
324 may not write all elements of data.
325 326 Params:
327 events = events reported for the output conduit
328 329 Returns:
330 true if all data has been sent or false to try again.
331 332 Throws:
333 IOException if the connection is closed or broken:
334 - IOWarning if the remote hung up,
335 - IOError (IOWarning subclass) on I/O error.
336 337 **************************************************************************/338 339 protectedoverridebooltransmit ( Eventevents )
340 out341 {
342 assert (this);
343 }
344 body345 {
346 debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)",
347 super.conduit.fileHandle, this.data_slice, this.data_slice.length);
348 349 if (this.sent < this.data_slice.length)
350 {
351 .errno = 0;
352 353 output.ssize_tn = this.output.write(this.data_slice[this.sent .. $]);
354 355 if (n >= 0)
356 {
357 this.sent += n;
358 }
359 else360 {
361 this.error_e.checkDeviceError("write error", __FILE__, __LINE__);
362 363 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up");
364 365 interrnum = .errno;
366 367 switch (errnum)
368 {
369 default:
370 throwthis.error_e.set(errnum, "write error");
371 372 caseEINTR, EAGAIN:
373 staticif ( EAGAIN != EWOULDBLOCK )
374 {
375 caseEWOULDBLOCK:
376 }
377 }
378 }
379 }
380 381 returnthis.sent < this.data_slice.length;
382 }
383 384 385 /**************************************************************************
386 387 Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data.
388 389 Params:
390 enable = 0 disables TCP_CORK and flushes if previously enabled, a
391 different value enables TCP_CORK.
392 throw_on_error = true: throw IOError if setting the TCP_CORK option
393 failed, false: ignore errors. Ignoring errors is desired to
394 just clear the cork buffer if TCP_CORK seems to be
395 currently enabled.
396 397 Throws:
398 IOException on error setting the TCP_CORK option for this.conduit if
399 throw_on_error is true.
400 In practice this can fail only for one of the following reasons:
401 - this.conduit does not contain a valid file descriptor
402 (errno == EBADF). This is the case if the socket was not created
403 by socket() or accept() yet.
404 - this.conduit does not refer to a socket (errno == ENOTSOCK)
405 406 **************************************************************************/407 408 privatevoidsetCork ( intenable, boolthrow_on_error = true )
409 {
410 this.error_e.enforce(!.setsockopt(this.conduit.fileHandle,
411 .IPPROTO_TCP, .TCP_CORK,
412 &enable, enable.sizeof) ||
413 !throw_on_error,
414 "error setting TCP_CORK option");
415 }
416 }