1 /****************************************************************************** 2 3 Fiber/coroutine based non-blocking output select client base class 4 5 Base class for a non-blocking output select client using a fiber/coroutine to 6 suspend operation while waiting for the read event and resume on that event. 7 8 The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to 9 true. This prevents sending the data passed to send() in partial TCP frames. 10 Note that, if TCP_CORK is enabled, pending data may not be sent immediately. 11 To force sending pending data, call corkFlush(). 12 13 @see http://linux.die.net/man/7/tcp 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 ******************************************************************************/ 25 26 module ocean.io.select.protocol.fiber.FiberSelectWriter; 27 28 import ocean.transition; 29 30 import ocean.core.Verify; 31 32 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 33 34 import ocean.io.select.client.model.ISelectClient; 35 36 import ocean.io.device.IODevice: IOutputDevice; 37 38 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR; 39 40 import ocean.stdc.posix.sys.socket: setsockopt; 41 42 import ocean.stdc.posix.netinet.in_: IPPROTO_TCP; 43 44 static if (__VERSION__ >= 2000 && __VERSION__ < 2073) 45 enum { TCP_CORK = 3 } 46 else static if (__VERSION__ >= 2077) 47 import core.sys.linux.netinet.tcp: TCP_CORK; 48 else 49 import core.sys.linux.sys.netinet.tcp: TCP_CORK; 50 51 debug (Raw) import ocean.io.Stdout: Stderr; 52 53 54 55 /******************************************************************************/ 56 57 class FiberSelectWriter : IFiberSelectProtocol 58 { 59 /************************************************************************** 60 61 Set to true to make send() send all data immediately if the TCP_CORK 62 feature is enabled. This has the same effect as calling corkFlush() 63 after each send(). 64 65 **************************************************************************/ 66 67 public bool cork_auto_flush = false; 68 69 /************************************************************************** 70 71 Output device 72 73 **************************************************************************/ 74 75 public alias .IOutputDevice IOutputDevice; 76 77 private IOutputDevice output; 78 79 /************************************************************************** 80 81 Data buffer (slices the buffer provided to send()) 82 83 **************************************************************************/ 84 85 private Const!(void)[] data_slice = null; 86 87 /************************************************************************** 88 89 Number of bytes sent so far 90 91 **************************************************************************/ 92 93 protected size_t sent = 0; 94 95 /************************************************************************** 96 97 true if the TCP_CORK feature should be enabled when sending data through 98 the socket or false if it should not be enabled. 99 100 **************************************************************************/ 101 102 private bool cork_ = false; 103 104 /**************************************************************************/ 105 106 invariant ( ) 107 { 108 assert (this.sent <= this.data_slice.length); 109 } 110 111 /************************************************************************** 112 113 Constructor. 114 115 error_e and warning_e may be the same object if distinguishing between 116 error and warning is not required. 117 118 Params: 119 output = output device 120 fiber = output writing fiber 121 warning_e = exception to throw if the remote hung up 122 error_e = exception to throw on I/O error 123 124 **************************************************************************/ 125 126 public this ( IOutputDevice output, SelectFiber fiber, 127 IOWarning warning_e, IOError error_e ) 128 { 129 super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e); 130 } 131 132 /************************************************************************** 133 134 Constructor 135 136 Uses the conduit, fiber and exceptions from the other 137 IFiberSelectProtocol instance. This is useful when this instance shares 138 the conduit and fiber with another IFiberSelectProtocol instance, e.g. 139 a FiberSelectReader. 140 141 The conduit owned by the other instance must have been downcast from 142 IOuputDevice. 143 144 Params: 145 other = other instance of this class 146 147 **************************************************************************/ 148 149 public this ( typeof (super) other ) 150 { 151 super(other, Event.EPOLLOUT); 152 153 this.output = cast (IOutputDevice) this.conduit; 154 155 verify(this.output !is null, typeof (this).stringof ~ ": the conduit of " 156 ~ "the other " ~ typeof (super).stringof ~ " instance must be a " 157 ~ IOutputDevice.stringof); 158 } 159 160 /************************************************************************** 161 162 Writes data to the output conduit. Whenever the output conduit is not 163 ready for writing, the output writing fiber is suspended and continues 164 writing on resume. 165 166 Params: 167 data = data to send 168 169 Returns: 170 this instance 171 172 Throws: 173 IOException if the connection is closed or broken: 174 - IOWarning if the remote hung up, 175 - IOError (IOWarning subclass) on I/O error. 176 177 **************************************************************************/ 178 179 public typeof (this) send ( Const!(void)[] data ) 180 /* in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor 181 { 182 assert (this.data_slice is null); 183 }*/ 184 /*out // FIXME: DMD bug triggered when overriding method with 'out' contract. 185 { 186 assert (!this.data); 187 }*/ 188 body 189 { 190 if (data.length) 191 { 192 this.data_slice = data; 193 194 if (this.cork_) 195 { 196 this.cork = true; 197 } 198 199 try 200 { 201 super.transmitLoop(); 202 } 203 finally 204 { 205 if (this.cork_ && this.cork_auto_flush) 206 { 207 /* 208 * Disabling TCP_CORK is the only way to explicitly flush 209 * the output buffer. 210 */ 211 this.setCork(false); 212 this.setCork(true); 213 } 214 215 this.data_slice = null; 216 this.sent = 0; 217 } 218 } 219 220 return this; 221 } 222 223 /************************************************************************** 224 225 Enables or disables the TCP_CORK feature. 226 227 Note that, if is enabled, not all data passed to send() may be sent 228 immediately; to force sending pending data, call flush() after send() or 229 set cork_auto_flush to true before calling send(). 230 231 If enabled is false but the TCP_CORK is currently enabled, pending data 232 will be sent now. 233 234 Params: 235 enabled = true: enable the TCP_CORK feature; false: disable it. 236 237 Returns: 238 enabled 239 240 Throws: 241 IOError if the TCP_CORK option cannot be set for the socket. If the 242 socket was not yet created by socket() or accept() then the request 243 will fail with the EBADF "Bad file descriptor" error code. 244 245 **************************************************************************/ 246 247 public bool cork ( bool enabled ) 248 { 249 this.setCork(this.cork_ = enabled); 250 251 return this.cork_; 252 } 253 254 /************************************************************************** 255 256 Tells whether the TCP_CORK feature is currently enabled. 257 258 Returns: 259 true if the TCP_CORK feature is currently enabled or false 260 otherwise. 261 262 **************************************************************************/ 263 264 public bool cork ( ) 265 { 266 return this.cork_; 267 } 268 269 /************************************************************************** 270 271 Sends all pending data immediately. 272 May be overridden by a subclass; calls corkFlush() by default. 273 274 Returns: 275 this instance. 276 277 Throws: 278 IOError on I/O error. 279 280 **************************************************************************/ 281 282 public typeof (this) flush ( ) 283 { 284 if (this.cork_) 285 { 286 /* 287 * Disabling TCP_CORK is the only way to explicitly flush the 288 * output buffer. 289 */ 290 this.setCork(false); 291 this.setCork(true); 292 } 293 294 return this; 295 } 296 297 /************************************************************************** 298 299 Clears any pending data in the buffer. 300 301 Returns: 302 this instance 303 304 **************************************************************************/ 305 306 public typeof (this) reset ( ) 307 { 308 if (this.cork_) 309 { 310 /* 311 * Disabling TCP_CORK is the only way to explicitly clear the 312 * output buffer. 313 */ 314 this.setCork(false, false); 315 this.setCork(true, false); 316 } 317 318 return this; 319 } 320 321 /************************************************************************** 322 323 Attempts to write data to the output conduit. The output conduit may or 324 may not write all elements of data. 325 326 Params: 327 events = events reported for the output conduit 328 329 Returns: 330 true if all data has been sent or false to try again. 331 332 Throws: 333 IOException if the connection is closed or broken: 334 - IOWarning if the remote hung up, 335 - IOError (IOWarning subclass) on I/O error. 336 337 **************************************************************************/ 338 339 protected override bool transmit ( Event events ) 340 out 341 { 342 assert (this); 343 } 344 body 345 { 346 debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)", 347 super.conduit.fileHandle, this.data_slice, this.data_slice.length); 348 349 if (this.sent < this.data_slice.length) 350 { 351 .errno = 0; 352 353 output.ssize_t n = this.output.write(this.data_slice[this.sent .. $]); 354 355 if (n >= 0) 356 { 357 this.sent += n; 358 } 359 else 360 { 361 this.error_e.checkDeviceError("write error", __FILE__, __LINE__); 362 363 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up"); 364 365 int errnum = .errno; 366 367 switch (errnum) 368 { 369 default: 370 throw this.error_e.set(errnum, "write error"); 371 372 case EINTR, EAGAIN: 373 static if ( EAGAIN != EWOULDBLOCK ) 374 { 375 case EWOULDBLOCK: 376 } 377 } 378 } 379 } 380 381 return this.sent < this.data_slice.length; 382 } 383 384 385 /************************************************************************** 386 387 Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data. 388 389 Params: 390 enable = 0 disables TCP_CORK and flushes if previously enabled, a 391 different value enables TCP_CORK. 392 throw_on_error = true: throw IOError if setting the TCP_CORK option 393 failed, false: ignore errors. Ignoring errors is desired to 394 just clear the cork buffer if TCP_CORK seems to be 395 currently enabled. 396 397 Throws: 398 IOException on error setting the TCP_CORK option for this.conduit if 399 throw_on_error is true. 400 In practice this can fail only for one of the following reasons: 401 - this.conduit does not contain a valid file descriptor 402 (errno == EBADF). This is the case if the socket was not created 403 by socket() or accept() yet. 404 - this.conduit does not refer to a socket (errno == ENOTSOCK) 405 406 **************************************************************************/ 407 408 private void setCork ( int enable, bool throw_on_error = true ) 409 { 410 this.error_e.enforce(!.setsockopt(this.conduit.fileHandle, 411 .IPPROTO_TCP, .TCP_CORK, 412 &enable, enable.sizeof) || 413 !throw_on_error, 414 "error setting TCP_CORK option"); 415 } 416 }