1 /******************************************************************************* 2 3 Input range that will read data from a socket and return token-separated 4 values. The token will be excluded. 5 6 This struct is an implementation detail of the Collectd socket and not 7 intended to be used outside of it. 8 9 Copyright: 10 Copyright (c) 2015-2016 dunnhumby Germany GmbH. 11 All rights reserved. 12 13 License: 14 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 15 Alternatively, this file may be distributed under the terms of the Tango 16 3-Clause BSD License (see LICENSE_BSD.txt for details). 17 18 *******************************************************************************/ 19 20 module ocean.net.collectd.SocketReader; 21 22 23 24 import ocean.transition; 25 import ocean.core.Verify; 26 import ocean.stdc.posix.sys.types; // ssize_t 27 import ocean.sys.ErrnoException; 28 import ocean.sys.socket.model.ISocket; 29 import ocean.text.util.StringSearch; // locateChar 30 31 32 /******************************************************************************* 33 34 Input range that will read data from a socket and return token-separated 35 values. The token will be excluded. 36 37 This struct is allocation-free as it acts as a (specialized) circular buffer. 38 39 Params: 40 MAX_FIELD_SIZE = Maximum length a line can have. 41 As lines can be non-contiguous (if a line starts at 42 the end of the circular buffer and ends at the 43 beginning of it), but we need to provide them 44 contiguous to the reader, a buffer is used and 45 written to in this situation. 46 FIELDS = The maximum number of maximum length fields that 47 the buffer can store. 48 In other words, MAX_FIELD_SIZE * FIELDS == capacity. 49 50 *******************************************************************************/ 51 52 package struct SocketReader (size_t MAX_FIELD_SIZE = 512, size_t FIELDS = 16) 53 { 54 /*************************************************************************** 55 56 Get the current element 57 58 The returned data is transient, which means it might get invalidated 59 by the next call to popFront or when the range is finalized. Make sure 60 to `.dup` it if you need a longer lifetime. 61 62 Returns: 63 A transient string to the current item. This will be `null` if 64 this range is `empty`. 65 66 ***************************************************************************/ 67 68 public cstring front () 69 { 70 return (&this).current_field; 71 } 72 73 74 /*************************************************************************** 75 76 Discard the current item (`front`) and process the next field 77 78 This will change the current front. If not enough data is available, 79 and a non-null socket is provided, it will attempt to read as much as 80 possible data from this socket. 81 82 Params: 83 socket = If non null, instead of becoming empty, popFront will 84 attempt to read data from the socket 85 flags = Flags to pass to recv in the event of a read from socket 86 87 Returns: 88 The amount of data read from the network, if any (recv return value) 89 90 Throws: 91 `ErrnoException` if `recv` returned a negative value 92 93 ***************************************************************************/ 94 95 public ssize_t popFront (ISocket socket = null, int flags = 0) 96 { 97 auto off = (&this).locateChar('\n'); 98 99 if (off != (&this).length) 100 { 101 // Worst case scenario: the field starts at the end of the buffer 102 // and continues at the beginning. In this case, we have no choice 103 // but to copy data to our field_buffer to get something sane. 104 if ((&this).start_idx + off > (&this).buffer.length) 105 { 106 auto p1len = (&this).buffer.length - (&this).start_idx; 107 verify(p1len < off); 108 109 (&this).field_buffer[0 .. p1len] 110 = (&this).buffer[(&this).start_idx .. (&this).buffer.length]; 111 112 (&this).field_buffer[p1len .. off] = (&this).buffer[0 .. off - p1len]; 113 114 (&this).current_field = (&this).field_buffer[0 .. off]; 115 } 116 else 117 { 118 // Usual case: We just return a slice to our buffer 119 (&this).current_field = (&this).buffer[(&this).start_idx .. (&this).start_idx + off]; 120 } 121 (&this).length -= (off + 1); 122 (&this).start_idx = !(&this).length ? 0 : (&this).calc((&this).start_idx, off + 1); 123 } 124 else if (socket !is null) 125 { 126 auto r = (&this).recv(socket, flags); 127 if (r <= 0) 128 { 129 (&this).current_field = null; 130 throw (&this).e.useGlobalErrno("recv"); 131 } 132 (&this).popFront(socket, flags); 133 } 134 else 135 { 136 (&this).current_field = null; 137 } 138 return 0; 139 } 140 141 142 /*************************************************************************** 143 144 Tells whenever the range is empty (i.e. no more fields can be read) 145 146 Note that empty doesn't mean that no more data is stored in the buffer, 147 but rather mean no more delimiter (or token) could be found in the data 148 149 ***************************************************************************/ 150 151 public bool empty () 152 { 153 return (&this).current_field is null; 154 } 155 156 157 /*************************************************************************** 158 159 Read data from the socket 160 161 This function is only called from popFront when a socket is provided. 162 163 Params: 164 socket = An ISocket to read from 165 flags = flags to pass to `ISocket.recv` 166 167 Returns: 168 The return value of `ISocket.recv`, which is the quantity of bytes 169 read. 170 171 ***************************************************************************/ 172 173 private ssize_t recv (ISocket socket, int flags) 174 { 175 verify(socket !is null, "Cannot recv with a null socket"); 176 177 auto start = (&this).calc((&this).start_idx, (&this).length); 178 auto end = start < (&this).start_idx ? (&this).start_idx : (&this).buffer.length; 179 180 ssize_t ret = socket.recv((&this).buffer[start .. end], flags); 181 182 // Errors are handled from popFront 183 if (ret <= 0) 184 return ret; 185 186 (&this).length += ret; 187 verify((&this).length <= (&this).buffer.length); 188 189 return ret; 190 } 191 192 193 /*************************************************************************** 194 195 Tell whenever the current data in the buffer are linear, or extend 196 past the end of the buffer and circle to the beginning 197 198 ***************************************************************************/ 199 200 private bool isLinear () 201 { 202 return !((&this).start_idx + (&this).length > (&this).buffer.length); 203 } 204 205 206 /*************************************************************************** 207 208 Helper function for locateChar 209 210 Returns: 211 An end suitable for 'linear' reading of the buffer, that is, 212 an end which is always > this.start_idx 213 214 ***************************************************************************/ 215 216 private size_t linearEnd () 217 { 218 return (&this).isLinear() 219 ? ((&this).start_idx + (&this).length) 220 : ((&this).buffer.length); 221 } 222 223 224 /************************************************************************** 225 * 226 Returns: the maximum amount of data we can read 227 228 ***************************************************************************/ 229 230 private size_t linearSpace () 231 { 232 return (&this).isLinear() 233 ? ((&this).buffer.length - (&this).calc((&this).start_idx, (&this).length)) 234 : ((&this).start_idx - (&this).calc((&this).start_idx, (&this).length)); 235 } 236 237 238 /*************************************************************************** 239 240 Find the next occurrence of 'tok' in the string in a non-linear way 241 242 Params: 243 tok = a token (character) to search from, starting from start_idx 244 245 Returns: 246 The offset to `start_idx` (linear offset) at which the token is, 247 or `this.length` if it wasn't found 248 249 ***************************************************************************/ 250 251 private size_t locateChar (char tok) 252 { 253 auto after = StringSearch!(false).locateChar( 254 (&this).buffer[(&this).start_idx .. (&this).linearEnd()], tok); 255 if ((&this).isLinear() || (&this).start_idx + after < (&this).buffer.length) 256 { 257 return after; 258 } 259 // In this case, after ==> buffer.length - start_idx 260 return after + StringSearch!(false).locateChar( 261 (&this).buffer[0 .. (&this).length - after], 262 tok); 263 } 264 265 266 /*************************************************************************** 267 268 Helper function to calculate index in the buffer from offsets 269 270 Params: 271 idx = The index to start from 272 val = The offset to add 273 274 Returns: 275 An index in `this.buffer`. It is always in-bound. 276 277 ***************************************************************************/ 278 279 private size_t calc (size_t idx, size_t val) 280 { 281 return (idx + val) % (&this).buffer.length; 282 } 283 284 285 /*************************************************************************** 286 287 Buffer in which the data will be stored 288 289 ***************************************************************************/ 290 291 private char[MAX_FIELD_SIZE * FIELDS] buffer; 292 293 294 /*************************************************************************** 295 296 Internal buffer in which the current line will be copied in the event 297 of a line being non-linear (starts at the end of the buffer and 298 continue at the beginning). 299 300 ***************************************************************************/ 301 302 private char[MAX_FIELD_SIZE] field_buffer; 303 304 305 /*************************************************************************** 306 307 A slice to the data currently being the `front()` 308 309 ***************************************************************************/ 310 311 private cstring current_field; 312 313 314 /*************************************************************************** 315 316 Unprocessed data start 317 318 ***************************************************************************/ 319 320 private size_t start_idx; 321 322 323 /*************************************************************************** 324 325 Unprocessed data length 326 327 ***************************************************************************/ 328 329 private size_t length; 330 331 332 /*************************************************************************** 333 334 Exception to throw on error 335 336 Note: 337 It is set from outside, hence the package visibility. 338 339 ***************************************************************************/ 340 341 package ErrnoException e; 342 } 343 344 unittest 345 { 346 // Ensure it compiles 347 SocketReader!() reader; 348 }