1 /*******************************************************************************
2 
3     Input range that will read data from a socket and return token-separated
4     values. The token will be excluded.
5 
6     This struct is an implementation detail of the Collectd socket and not
7     intended to be used outside of it.
8 
9     Copyright:
10         Copyright (c) 2015-2016 dunnhumby Germany GmbH.
11         All rights reserved.
12 
13     License:
14         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
15         Alternatively, this file may be distributed under the terms of the Tango
16         3-Clause BSD License (see LICENSE_BSD.txt for details).
17 
18 *******************************************************************************/
19 
20 module ocean.net.collectd.SocketReader;
21 
22 
23 import ocean.meta.types.Qualifiers;
24 import ocean.core.Verify;
25 import ocean.sys.ErrnoException;
26 import ocean.sys.socket.model.ISocket;
27 import ocean.text.util.StringSearch; // locateChar
28 
29 import core.sys.posix.sys.types; // ssize_t
30 
31 /*******************************************************************************
32 
33     Input range that will read data from a socket and return token-separated
34     values. The token will be excluded.
35 
36     This struct is allocation-free as it acts as a (specialized) circular buffer.
37 
38     Params:
39         MAX_FIELD_SIZE  = Maximum length a line can have.
40                           As lines can be non-contiguous (if a line starts at
41                           the end of the circular buffer and ends at the
42                           beginning of it), but we need to provide them
43                           contiguous to the reader, a buffer is used and
44                           written to in this situation.
45         FIELDS          = The maximum number of maximum length fields that
46                           the buffer can store.
47                           In other words, MAX_FIELD_SIZE * FIELDS == capacity.
48 
49 *******************************************************************************/
50 
51 package struct SocketReader (size_t MAX_FIELD_SIZE = 512, size_t FIELDS = 16)
52 {
53     /***************************************************************************
54 
55         Get the current element
56 
57         The returned data is transient, which means it might get invalidated
58         by the next call to popFront or when the range is finalized. Make sure
59         to `.dup` it if you need a longer lifetime.
60 
61         Returns:
62             A transient string to the current item. This will be `null` if
63             this range is `empty`.
64 
65     ***************************************************************************/
66 
67     public cstring front ()
68     {
69         return this.current_field;
70     }
71 
72 
73     /***************************************************************************
74 
75         Discard the current item (`front`) and process the next field
76 
77         This will change the current front. If not enough data is available,
78         and a non-null socket is provided, it will attempt to read as much as
79         possible data from this socket.
80 
81         Params:
82             socket = If non null, instead of becoming empty, popFront will
83                         attempt to read data from the socket
84             flags  = Flags to pass to recv in the event of a read from socket
85 
86         Returns:
87             The amount of data read from the network, if any (recv return value)
88 
89         Throws:
90             `ErrnoException` if `recv` returned a negative value
91 
92     ***************************************************************************/
93 
94     public ssize_t popFront (ISocket socket = null, int flags = 0)
95     {
96         auto off = this.locateChar('\n');
97 
98         if (off != this.length)
99         {
100             // Worst case scenario: the field starts at the end of the buffer
101             // and continues at the beginning. In this case, we have no choice
102             // but to copy data to our field_buffer to get something sane.
103             if (this.start_idx + off > this.buffer.length)
104             {
105                 auto p1len = this.buffer.length - this.start_idx;
106                 verify(p1len < off);
107 
108                 this.field_buffer[0 .. p1len]
109                     = this.buffer[this.start_idx .. this.buffer.length];
110 
111                 this.field_buffer[p1len .. off] = this.buffer[0 .. off - p1len];
112 
113                 this.current_field = this.field_buffer[0 .. off];
114             }
115             else
116             {
117                 // Usual case: We just return a slice to our buffer
118                 this.current_field = this.buffer[this.start_idx .. this.start_idx + off];
119             }
120             this.length -= (off + 1);
121             this.start_idx = !this.length ? 0 : this.calc(this.start_idx, off + 1);
122         }
123         else if (socket !is null)
124         {
125             auto r = this.recv(socket, flags);
126             if (r <= 0)
127             {
128                 this.current_field = null;
129                 throw this.e.useGlobalErrno("recv");
130             }
131             this.popFront(socket, flags);
132         }
133         else
134         {
135             this.current_field = null;
136         }
137         return 0;
138     }
139 
140 
141     /***************************************************************************
142 
143         Tells whenever the range is empty (i.e. no more fields can be read)
144 
145         Note that empty doesn't mean that no more data is stored in the buffer,
146         but rather mean no more delimiter (or token) could be found in the data
147 
148     ***************************************************************************/
149 
150     public bool empty ()
151     {
152         return this.current_field is null;
153     }
154 
155 
156     /***************************************************************************
157 
158         Read data from the socket
159 
160         This function is only called from popFront when a socket is provided.
161 
162         Params:
163             socket = An ISocket to read from
164             flags  = flags to pass to `ISocket.recv`
165 
166         Returns:
167             The return value of `ISocket.recv`, which is the quantity of bytes
168             read.
169 
170     ***************************************************************************/
171 
172     private ssize_t recv (ISocket socket, int flags)
173     {
174         verify(socket !is null, "Cannot recv with a null socket");
175 
176         auto start = this.calc(this.start_idx, this.length);
177         auto end   = start < this.start_idx ? this.start_idx : this.buffer.length;
178 
179         ssize_t ret = socket.recv(this.buffer[start .. end], flags);
180 
181         // Errors are handled from popFront
182         if (ret <= 0)
183             return ret;
184 
185         this.length += ret;
186         verify(this.length <= this.buffer.length);
187 
188         return ret;
189     }
190 
191 
192     /***************************************************************************
193 
194         Tell whenever the current data in the buffer are linear, or extend
195         past the end of the buffer and circle to the beginning
196 
197     ***************************************************************************/
198 
199     private bool isLinear ()
200     {
201         return !(this.start_idx + this.length > this.buffer.length);
202     }
203 
204 
205     /***************************************************************************
206 
207         Helper function for locateChar
208 
209         Returns:
210             An end suitable for 'linear' reading of the buffer, that is,
211             an end which is always > this.start_idx
212 
213     ***************************************************************************/
214 
215     private size_t linearEnd ()
216     {
217         return this.isLinear()
218             ? (this.start_idx + this.length)
219             : (this.buffer.length);
220     }
221 
222 
223     /**************************************************************************
224 
225         Returns: the maximum amount of data we can read
226 
227     ***************************************************************************/
228 
229     private size_t linearSpace ()
230     {
231         return this.isLinear()
232             ? (this.buffer.length - this.calc(this.start_idx, this.length))
233             : (this.start_idx - this.calc(this.start_idx, this.length));
234     }
235 
236 
237     /***************************************************************************
238 
239         Find the next occurrence of 'tok' in the string in a non-linear way
240 
241         Params:
242             tok = a token (character) to search from, starting from start_idx
243 
244         Returns:
245             The offset to `start_idx` (linear offset) at which the token is,
246             or `this.length` if it wasn't found
247 
248     ***************************************************************************/
249 
250     private size_t locateChar (char tok)
251     {
252         auto after = StringSearch!(false).locateChar(
253             this.buffer[this.start_idx .. this.linearEnd()], tok);
254         if (this.isLinear() || this.start_idx  + after < this.buffer.length)
255         {
256             return after;
257         }
258         // In this case, after ==> buffer.length - start_idx
259         return after + StringSearch!(false).locateChar(
260             this.buffer[0 .. this.length - after],
261             tok);
262     }
263 
264 
265     /***************************************************************************
266 
267         Helper function to calculate index in the buffer from offsets
268 
269         Params:
270             idx = The index to start from
271             val = The offset to add
272 
273         Returns:
274             An index in `this.buffer`. It is always in-bound.
275 
276     ***************************************************************************/
277 
278     private size_t calc (size_t idx, size_t val)
279     {
280         return (idx + val) % this.buffer.length;
281     }
282 
283 
284     /***************************************************************************
285 
286         Buffer in which the data will be stored
287 
288     ***************************************************************************/
289 
290     private char[MAX_FIELD_SIZE * FIELDS] buffer;
291 
292 
293     /***************************************************************************
294 
295         Internal buffer in which the current line will be copied in the event
296         of a line being non-linear (starts at the end of the buffer and
297         continue at the beginning).
298 
299     ***************************************************************************/
300 
301     private char[MAX_FIELD_SIZE] field_buffer;
302 
303 
304     /***************************************************************************
305 
306         A slice to the data currently being the `front()`
307 
308     ***************************************************************************/
309 
310     private cstring current_field;
311 
312 
313     /***************************************************************************
314 
315         Unprocessed data start
316 
317     ***************************************************************************/
318 
319     private size_t start_idx;
320 
321 
322     /***************************************************************************
323 
324         Unprocessed data length
325 
326     ***************************************************************************/
327 
328     private size_t length;
329 
330 
331     /***************************************************************************
332 
333         Exception to throw on error
334 
335         Note:
336             It is set from outside, hence the package visibility.
337 
338     ***************************************************************************/
339 
340     package ErrnoException e;
341 }
342 
343 unittest
344 {
345     // Ensure it compiles
346     SocketReader!() reader;
347 }