1 /****************************************************************************** 2 3 Fiber/coroutine based non-blocking output select client base class 4 5 Base class for a non-blocking output select client using a fiber/coroutine to 6 suspend operation while waiting for the read event and resume on that event. 7 8 The Linux TCP_CORK feature can be used by setting FiberSelectWriter.cork to 9 true. This prevents sending the data passed to send() in partial TCP frames. 10 Note that, if TCP_CORK is enabled, pending data may not be sent immediately. 11 To force sending pending data, call corkFlush(). 12 13 @see http://linux.die.net/man/7/tcp 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 ******************************************************************************/ 25 26 module ocean.io.select.protocol.fiber.FiberSelectWriter; 27 28 import ocean.meta.types.Qualifiers; 29 import ocean.core.Verify; 30 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 31 import ocean.io.select.client.model.ISelectClient; 32 import ocean.io.device.IODevice: IOutputDevice; 33 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR; 34 import core.sys.posix.netinet.in_: IPPROTO_TCP; 35 import core.sys.posix.sys.socket: setsockopt; 36 import core.sys.linux.netinet.tcp: TCP_CORK; 37 38 debug (Raw) import ocean.io.Stdout: Stderr; 39 40 41 42 /******************************************************************************/ 43 44 class FiberSelectWriter : IFiberSelectProtocol 45 { 46 /************************************************************************** 47 48 Set to true to make send() send all data immediately if the TCP_CORK 49 feature is enabled. This has the same effect as calling corkFlush() 50 after each send(). 51 52 **************************************************************************/ 53 54 public bool cork_auto_flush = false; 55 56 /************************************************************************** 57 58 Output device 59 60 **************************************************************************/ 61 62 public alias .IOutputDevice IOutputDevice; 63 64 private IOutputDevice output; 65 66 /************************************************************************** 67 68 Data buffer (slices the buffer provided to send()) 69 70 **************************************************************************/ 71 72 private const(void)[] data_slice = null; 73 74 /************************************************************************** 75 76 Number of bytes sent so far 77 78 **************************************************************************/ 79 80 protected size_t sent = 0; 81 82 /************************************************************************** 83 84 true if the TCP_CORK feature should be enabled when sending data through 85 the socket or false if it should not be enabled. 86 87 **************************************************************************/ 88 89 private bool cork_ = false; 90 91 /**************************************************************************/ 92 93 invariant ( ) 94 { 95 assert (this.sent <= this.data_slice.length); 96 } 97 98 /************************************************************************** 99 100 Constructor. 101 102 error_e and warning_e may be the same object if distinguishing between 103 error and warning is not required. 104 105 Params: 106 output = output device 107 fiber = output writing fiber 108 warning_e = exception to throw if the remote hung up 109 error_e = exception to throw on I/O error 110 111 **************************************************************************/ 112 113 public this ( IOutputDevice output, SelectFiber fiber, 114 IOWarning warning_e, IOError error_e ) 115 { 116 super(this.output = output, Event.EPOLLOUT, fiber, warning_e, error_e); 117 } 118 119 /************************************************************************** 120 121 Constructor 122 123 Uses the conduit, fiber and exceptions from the other 124 IFiberSelectProtocol instance. This is useful when this instance shares 125 the conduit and fiber with another IFiberSelectProtocol instance, e.g. 126 a FiberSelectReader. 127 128 The conduit owned by the other instance must have been downcast from 129 IOuputDevice. 130 131 Params: 132 other = other instance of this class 133 134 **************************************************************************/ 135 136 public this ( typeof (super) other ) 137 { 138 super(other, Event.EPOLLOUT); 139 140 this.output = cast (IOutputDevice) this.conduit; 141 142 verify(this.output !is null, typeof (this).stringof ~ ": the conduit of " 143 ~ "the other " ~ typeof (super).stringof ~ " instance must be a " 144 ~ IOutputDevice.stringof); 145 } 146 147 /************************************************************************** 148 149 Writes data to the output conduit. Whenever the output conduit is not 150 ready for writing, the output writing fiber is suspended and continues 151 writing on resume. 152 153 Params: 154 data = data to send 155 156 Returns: 157 this instance 158 159 Throws: 160 IOException if the connection is closed or broken: 161 - IOWarning if the remote hung up, 162 - IOError (IOWarning subclass) on I/O error. 163 164 **************************************************************************/ 165 166 public typeof (this) send ( const(void)[] data ) 167 /* in // FIXME: causes an unknown (i.e. uninvestigated) problem, for example in the queue monitor 168 { 169 assert (this.data_slice is null); 170 }*/ 171 /*out // FIXME: DMD bug triggered when overriding method with 'out' contract. 172 { 173 assert (!this.data); 174 }*/ 175 do 176 { 177 if (data.length) 178 { 179 this.data_slice = data; 180 181 if (this.cork_) 182 { 183 this.cork = true; 184 } 185 186 try 187 { 188 super.transmitLoop(); 189 } 190 finally 191 { 192 if (this.cork_ && this.cork_auto_flush) 193 { 194 /* 195 * Disabling TCP_CORK is the only way to explicitly flush 196 * the output buffer. 197 */ 198 this.setCork(false); 199 this.setCork(true); 200 } 201 202 this.data_slice = null; 203 this.sent = 0; 204 } 205 } 206 207 return this; 208 } 209 210 /************************************************************************** 211 212 Enables or disables the TCP_CORK feature. 213 214 Note that, if is enabled, not all data passed to send() may be sent 215 immediately; to force sending pending data, call flush() after send() or 216 set cork_auto_flush to true before calling send(). 217 218 If enabled is false but the TCP_CORK is currently enabled, pending data 219 will be sent now. 220 221 Params: 222 enabled = true: enable the TCP_CORK feature; false: disable it. 223 224 Returns: 225 enabled 226 227 Throws: 228 IOError if the TCP_CORK option cannot be set for the socket. If the 229 socket was not yet created by socket() or accept() then the request 230 will fail with the EBADF "Bad file descriptor" error code. 231 232 **************************************************************************/ 233 234 public bool cork ( bool enabled ) 235 { 236 this.setCork(this.cork_ = enabled); 237 238 return this.cork_; 239 } 240 241 /************************************************************************** 242 243 Tells whether the TCP_CORK feature is currently enabled. 244 245 Returns: 246 true if the TCP_CORK feature is currently enabled or false 247 otherwise. 248 249 **************************************************************************/ 250 251 public bool cork ( ) 252 { 253 return this.cork_; 254 } 255 256 /************************************************************************** 257 258 Sends all pending data immediately. 259 May be overridden by a subclass; calls corkFlush() by default. 260 261 Returns: 262 this instance. 263 264 Throws: 265 IOError on I/O error. 266 267 **************************************************************************/ 268 269 public typeof (this) flush ( ) 270 { 271 if (this.cork_) 272 { 273 /* 274 * Disabling TCP_CORK is the only way to explicitly flush the 275 * output buffer. 276 */ 277 this.setCork(false); 278 this.setCork(true); 279 } 280 281 return this; 282 } 283 284 /************************************************************************** 285 286 Clears any pending data in the buffer. 287 288 Returns: 289 this instance 290 291 **************************************************************************/ 292 293 public typeof (this) reset ( ) 294 { 295 if (this.cork_) 296 { 297 /* 298 * Disabling TCP_CORK is the only way to explicitly clear the 299 * output buffer. 300 */ 301 this.setCork(false, false); 302 this.setCork(true, false); 303 } 304 305 return this; 306 } 307 308 /************************************************************************** 309 310 Attempts to write data to the output conduit. The output conduit may or 311 may not write all elements of data. 312 313 Params: 314 events = events reported for the output conduit 315 316 Returns: 317 true if all data has been sent or false to try again. 318 319 Throws: 320 IOException if the connection is closed or broken: 321 - IOWarning if the remote hung up, 322 - IOError (IOWarning subclass) on I/O error. 323 324 **************************************************************************/ 325 326 protected override bool transmit ( Event events ) 327 out 328 { 329 assert (this); 330 } 331 do 332 { 333 debug (Raw) Stderr.formatln("[{}] Write {:X2} ({} bytes)", 334 super.conduit.fileHandle, this.data_slice, this.data_slice.length); 335 336 if (this.sent < this.data_slice.length) 337 { 338 .errno = 0; 339 340 output.ssize_t n = this.output.write(this.data_slice[this.sent .. $]); 341 342 if (n >= 0) 343 { 344 this.sent += n; 345 } 346 else 347 { 348 this.error_e.checkDeviceError("write error", __FILE__, __LINE__); 349 350 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up"); 351 352 int errnum = .errno; 353 354 switch (errnum) 355 { 356 default: 357 throw this.error_e.set(errnum, "write error"); 358 359 case EINTR, EAGAIN: 360 static if ( EAGAIN != EWOULDBLOCK ) 361 { 362 case EWOULDBLOCK: 363 } 364 } 365 } 366 } 367 368 return this.sent < this.data_slice.length; 369 } 370 371 372 /************************************************************************** 373 374 Sets the TCP_CORK option. Disabling (enable = 0) sends all pending data. 375 376 Params: 377 enable = 0 disables TCP_CORK and flushes if previously enabled, a 378 different value enables TCP_CORK. 379 throw_on_error = true: throw IOError if setting the TCP_CORK option 380 failed, false: ignore errors. Ignoring errors is desired to 381 just clear the cork buffer if TCP_CORK seems to be 382 currently enabled. 383 384 Throws: 385 IOException on error setting the TCP_CORK option for this.conduit if 386 throw_on_error is true. 387 In practice this can fail only for one of the following reasons: 388 - this.conduit does not contain a valid file descriptor 389 (errno == EBADF). This is the case if the socket was not created 390 by socket() or accept() yet. 391 - this.conduit does not refer to a socket (errno == ENOTSOCK) 392 393 **************************************************************************/ 394 395 private void setCork ( int enable, bool throw_on_error = true ) 396 { 397 this.error_e.enforce(!.setsockopt(this.conduit.fileHandle, 398 .IPPROTO_TCP, .TCP_CORK, 399 &enable, enable.sizeof) || 400 !throw_on_error, 401 "error setting TCP_CORK option"); 402 } 403 }