1 /******************************************************************************* 2 3 Input range that will read data from a socket and return token-separated 4 values. The token will be excluded. 5 6 This struct is an implementation detail of the Collectd socket and not 7 intended to be used outside of it. 8 9 Copyright: 10 Copyright (c) 2015-2016 dunnhumby Germany GmbH. 11 All rights reserved. 12 13 License: 14 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 15 Alternatively, this file may be distributed under the terms of the Tango 16 3-Clause BSD License (see LICENSE_BSD.txt for details). 17 18 *******************************************************************************/ 19 20 module ocean.net.collectd.SocketReader; 21 22 23 import ocean.meta.types.Qualifiers; 24 import ocean.core.Verify; 25 import ocean.sys.ErrnoException; 26 import ocean.sys.socket.model.ISocket; 27 import ocean.text.util.StringSearch; // locateChar 28 29 import core.sys.posix.sys.types; // ssize_t 30 31 /******************************************************************************* 32 33 Input range that will read data from a socket and return token-separated 34 values. The token will be excluded. 35 36 This struct is allocation-free as it acts as a (specialized) circular buffer. 37 38 Params: 39 MAX_FIELD_SIZE = Maximum length a line can have. 40 As lines can be non-contiguous (if a line starts at 41 the end of the circular buffer and ends at the 42 beginning of it), but we need to provide them 43 contiguous to the reader, a buffer is used and 44 written to in this situation. 45 FIELDS = The maximum number of maximum length fields that 46 the buffer can store. 47 In other words, MAX_FIELD_SIZE * FIELDS == capacity. 48 49 *******************************************************************************/ 50 51 package struct SocketReader (size_t MAX_FIELD_SIZE = 512, size_t FIELDS = 16) 52 { 53 /*************************************************************************** 54 55 Get the current element 56 57 The returned data is transient, which means it might get invalidated 58 by the next call to popFront or when the range is finalized. Make sure 59 to `.dup` it if you need a longer lifetime. 60 61 Returns: 62 A transient string to the current item. This will be `null` if 63 this range is `empty`. 64 65 ***************************************************************************/ 66 67 public cstring front () 68 { 69 return this.current_field; 70 } 71 72 73 /*************************************************************************** 74 75 Discard the current item (`front`) and process the next field 76 77 This will change the current front. If not enough data is available, 78 and a non-null socket is provided, it will attempt to read as much as 79 possible data from this socket. 80 81 Params: 82 socket = If non null, instead of becoming empty, popFront will 83 attempt to read data from the socket 84 flags = Flags to pass to recv in the event of a read from socket 85 86 Returns: 87 The amount of data read from the network, if any (recv return value) 88 89 Throws: 90 `ErrnoException` if `recv` returned a negative value 91 92 ***************************************************************************/ 93 94 public ssize_t popFront (ISocket socket = null, int flags = 0) 95 { 96 auto off = this.locateChar('\n'); 97 98 if (off != this.length) 99 { 100 // Worst case scenario: the field starts at the end of the buffer 101 // and continues at the beginning. In this case, we have no choice 102 // but to copy data to our field_buffer to get something sane. 103 if (this.start_idx + off > this.buffer.length) 104 { 105 auto p1len = this.buffer.length - this.start_idx; 106 verify(p1len < off); 107 108 this.field_buffer[0 .. p1len] 109 = this.buffer[this.start_idx .. this.buffer.length]; 110 111 this.field_buffer[p1len .. off] = this.buffer[0 .. off - p1len]; 112 113 this.current_field = this.field_buffer[0 .. off]; 114 } 115 else 116 { 117 // Usual case: We just return a slice to our buffer 118 this.current_field = this.buffer[this.start_idx .. this.start_idx + off]; 119 } 120 this.length -= (off + 1); 121 this.start_idx = !this.length ? 0 : this.calc(this.start_idx, off + 1); 122 } 123 else if (socket !is null) 124 { 125 auto r = this.recv(socket, flags); 126 if (r <= 0) 127 { 128 this.current_field = null; 129 throw this.e.useGlobalErrno("recv"); 130 } 131 this.popFront(socket, flags); 132 } 133 else 134 { 135 this.current_field = null; 136 } 137 return 0; 138 } 139 140 141 /*************************************************************************** 142 143 Tells whenever the range is empty (i.e. no more fields can be read) 144 145 Note that empty doesn't mean that no more data is stored in the buffer, 146 but rather mean no more delimiter (or token) could be found in the data 147 148 ***************************************************************************/ 149 150 public bool empty () 151 { 152 return this.current_field is null; 153 } 154 155 156 /*************************************************************************** 157 158 Read data from the socket 159 160 This function is only called from popFront when a socket is provided. 161 162 Params: 163 socket = An ISocket to read from 164 flags = flags to pass to `ISocket.recv` 165 166 Returns: 167 The return value of `ISocket.recv`, which is the quantity of bytes 168 read. 169 170 ***************************************************************************/ 171 172 private ssize_t recv (ISocket socket, int flags) 173 { 174 verify(socket !is null, "Cannot recv with a null socket"); 175 176 auto start = this.calc(this.start_idx, this.length); 177 auto end = start < this.start_idx ? this.start_idx : this.buffer.length; 178 179 ssize_t ret = socket.recv(this.buffer[start .. end], flags); 180 181 // Errors are handled from popFront 182 if (ret <= 0) 183 return ret; 184 185 this.length += ret; 186 verify(this.length <= this.buffer.length); 187 188 return ret; 189 } 190 191 192 /*************************************************************************** 193 194 Tell whenever the current data in the buffer are linear, or extend 195 past the end of the buffer and circle to the beginning 196 197 ***************************************************************************/ 198 199 private bool isLinear () 200 { 201 return !(this.start_idx + this.length > this.buffer.length); 202 } 203 204 205 /*************************************************************************** 206 207 Helper function for locateChar 208 209 Returns: 210 An end suitable for 'linear' reading of the buffer, that is, 211 an end which is always > this.start_idx 212 213 ***************************************************************************/ 214 215 private size_t linearEnd () 216 { 217 return this.isLinear() 218 ? (this.start_idx + this.length) 219 : (this.buffer.length); 220 } 221 222 223 /************************************************************************** 224 225 Returns: the maximum amount of data we can read 226 227 ***************************************************************************/ 228 229 private size_t linearSpace () 230 { 231 return this.isLinear() 232 ? (this.buffer.length - this.calc(this.start_idx, this.length)) 233 : (this.start_idx - this.calc(this.start_idx, this.length)); 234 } 235 236 237 /*************************************************************************** 238 239 Find the next occurrence of 'tok' in the string in a non-linear way 240 241 Params: 242 tok = a token (character) to search from, starting from start_idx 243 244 Returns: 245 The offset to `start_idx` (linear offset) at which the token is, 246 or `this.length` if it wasn't found 247 248 ***************************************************************************/ 249 250 private size_t locateChar (char tok) 251 { 252 auto after = StringSearch!(false).locateChar( 253 this.buffer[this.start_idx .. this.linearEnd()], tok); 254 if (this.isLinear() || this.start_idx + after < this.buffer.length) 255 { 256 return after; 257 } 258 // In this case, after ==> buffer.length - start_idx 259 return after + StringSearch!(false).locateChar( 260 this.buffer[0 .. this.length - after], 261 tok); 262 } 263 264 265 /*************************************************************************** 266 267 Helper function to calculate index in the buffer from offsets 268 269 Params: 270 idx = The index to start from 271 val = The offset to add 272 273 Returns: 274 An index in `this.buffer`. It is always in-bound. 275 276 ***************************************************************************/ 277 278 private size_t calc (size_t idx, size_t val) 279 { 280 return (idx + val) % this.buffer.length; 281 } 282 283 284 /*************************************************************************** 285 286 Buffer in which the data will be stored 287 288 ***************************************************************************/ 289 290 private char[MAX_FIELD_SIZE * FIELDS] buffer; 291 292 293 /*************************************************************************** 294 295 Internal buffer in which the current line will be copied in the event 296 of a line being non-linear (starts at the end of the buffer and 297 continue at the beginning). 298 299 ***************************************************************************/ 300 301 private char[MAX_FIELD_SIZE] field_buffer; 302 303 304 /*************************************************************************** 305 306 A slice to the data currently being the `front()` 307 308 ***************************************************************************/ 309 310 private cstring current_field; 311 312 313 /*************************************************************************** 314 315 Unprocessed data start 316 317 ***************************************************************************/ 318 319 private size_t start_idx; 320 321 322 /*************************************************************************** 323 324 Unprocessed data length 325 326 ***************************************************************************/ 327 328 private size_t length; 329 330 331 /*************************************************************************** 332 333 Exception to throw on error 334 335 Note: 336 It is set from outside, hence the package visibility. 337 338 ***************************************************************************/ 339 340 package ErrnoException e; 341 } 342 343 unittest 344 { 345 // Ensure it compiles 346 SocketReader!() reader; 347 }