1 /****************************************************************************** 2 3 Fiber/coroutine based buffered non-blocking output select client 4 5 Copyright: 6 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 ******************************************************************************/ 15 16 module ocean.io.select.protocol.fiber.BufferedFiberSelectWriter; 17 18 import ocean.meta.types.Qualifiers; 19 20 import ocean.core.Verify; 21 22 import ocean.io.select.protocol.fiber.FiberSelectWriter; 23 24 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 25 26 import ocean.util.container.AppendBuffer; 27 28 /******************************************************************************/ 29 30 class BufferedFiberSelectWriter : FiberSelectWriter 31 { 32 /************************************************************************** 33 34 Default output buffer size (64 kB) 35 36 **************************************************************************/ 37 38 public static immutable default_buffer_size = 64 * 1024; 39 40 /************************************************************************** 41 42 AppendBuffer instance 43 44 **************************************************************************/ 45 46 private AppendBuffer!(void) buffer; 47 48 /************************************************************************** 49 50 Constructor 51 52 Params: 53 output = output device 54 fiber = output reading fiber 55 warning_e = exception to throw on end-of-flow condition or if the 56 remote hung up 57 error_e = exception to throw on I/O error 58 size = buffer size 59 60 In: 61 The buffer size must not be 0. 62 63 **************************************************************************/ 64 65 public this ( IOutputDevice output, SelectFiber fiber, 66 IOWarning warning_e, IOError error_e, 67 size_t size = default_buffer_size ) 68 { 69 verify(size != 0, "zero input buffer size specified"); 70 71 super(output, fiber, warning_e, error_e); 72 this.buffer = new AppendBuffer!(void)(size, true); 73 } 74 75 /************************************************************************** 76 77 Constructor 78 79 Uses the conduit, fiber and exceptions from the other 80 IFiberSelectProtocol instance. This is useful when this instance shares 81 the conduit and fiber with another IFiberSelectProtocol instance, e.g. 82 a FiberSelectWriter. 83 84 The conduit owned by the other instance must have been downcast from 85 IInputDevice. 86 87 Params: 88 other = other instance of this class 89 size = output buffer size 90 91 In: 92 buffer_size must not be 0. 93 94 **************************************************************************/ 95 96 public this ( IFiberSelectProtocol other, size_t size = default_buffer_size ) 97 { 98 verify(size != 0, "zero input buffer size specified"); 99 100 super(other); 101 this.buffer = new AppendBuffer!(void)(size, true); 102 } 103 104 /************************************************************************** 105 106 Returns: 107 current buffer size 108 109 **************************************************************************/ 110 111 public size_t buffer_size ( ) 112 { 113 return this.buffer.capacity; 114 } 115 116 /************************************************************************** 117 118 Flushes the buffer and sends all pending data. 119 120 Returns: 121 this instance. 122 123 **************************************************************************/ 124 125 public override typeof (this) flush ( ) 126 { 127 this.flushBuffer(); 128 super.flush(); 129 130 return this; 131 } 132 133 /************************************************************************** 134 135 Clears any pending data in the buffer. 136 137 Returns: 138 this instance 139 140 **************************************************************************/ 141 142 public override typeof (this) reset ( ) 143 { 144 this.buffer.clear(); 145 super.reset(); 146 147 return this; 148 } 149 150 /************************************************************************** 151 152 Sets the buffer size to s. If there are currently more than s bytes of 153 data in the buffer, flush() is called before setting the size. 154 155 Params: 156 s = new buffer size 157 158 Returns: 159 new buffer size 160 161 In: 162 The new buffer size must not be 0. 163 164 **************************************************************************/ 165 166 public size_t buffer_size ( size_t s ) 167 out (n) 168 { 169 assert (n == s); 170 } 171 do 172 { 173 verify(s != 0, typeof (this).stringof ~ ".buffer_size: 0 specified"); 174 175 if (s < this.buffer.length) 176 { 177 this.flushBuffer(); 178 } 179 180 return this.buffer.capacity = s; 181 } 182 183 /************************************************************************** 184 185 Sends data_. 186 187 Params: 188 data = data to send 189 190 Returns: 191 this instance. 192 193 **************************************************************************/ 194 195 public override typeof (this) send ( const(void)[] data ) 196 { 197 if (data.length < this.buffer.capacity) 198 { 199 auto dst = this.buffer.extend(data.length); 200 201 dst[] = data[0 .. dst.length]; 202 203 auto left = data[dst.length .. $]; 204 205 if (left.length || this.buffer.length == this.buffer.capacity) 206 { 207 this.flushBuffer(); 208 } 209 210 if (left.length) 211 { 212 this.buffer ~= left; 213 } 214 } 215 else 216 { 217 this.flushBuffer(); 218 super.send(data); 219 } 220 221 return this; 222 } 223 224 /************************************************************************** 225 226 Flushes the buffer. Pending data may not be sent immediately, for 227 example, if the TCP_CORK feature is enabled in the super class. 228 229 **************************************************************************/ 230 231 private void flushBuffer ( ) 232 { 233 super.send(this.buffer.dump()); 234 } 235 }