1 /*******************************************************************************
2 
3     Input range that will read data from a socket and return token-separated
4     values. The token will be excluded.
5 
6     This struct is an implementation detail of the Collectd socket and not
7     intended to be used outside of it.
8 
9     Copyright:
10         Copyright (c) 2015-2016 dunnhumby Germany GmbH.
11         All rights reserved.
12 
13     License:
14         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
15         Alternatively, this file may be distributed under the terms of the Tango
16         3-Clause BSD License (see LICENSE_BSD.txt for details).
17 
18 *******************************************************************************/
19 
20 module ocean.net.collectd.SocketReader;
21 
22 
23 
24 import ocean.transition;
25 import ocean.core.Verify;
26 import ocean.stdc.posix.sys.types; // ssize_t
27 import ocean.sys.ErrnoException;
28 import ocean.sys.socket.model.ISocket;
29 import ocean.text.util.StringSearch; // locateChar
30 
31 
32 /*******************************************************************************
33 
34     Input range that will read data from a socket and return token-separated
35     values. The token will be excluded.
36 
37     This struct is allocation-free as it acts as a (specialized) circular buffer.
38 
39     Params:
40         MAX_FIELD_SIZE  = Maximum length a line can have.
41                           As lines can be non-contiguous (if a line starts at
42                           the end of the circular buffer and ends at the
43                           beginning of it), but we need to provide them
44                           contiguous to the reader, a buffer is used and
45                           written to in this situation.
46         FIELDS          = The maximum number of maximum length fields that
47                           the buffer can store.
48                           In other words, MAX_FIELD_SIZE * FIELDS == capacity.
49 
50 *******************************************************************************/
51 
52 package struct SocketReader (size_t MAX_FIELD_SIZE = 512, size_t FIELDS = 16)
53 {
54     /***************************************************************************
55 
56         Get the current element
57 
58         The returned data is transient, which means it might get invalidated
59         by the next call to popFront or when the range is finalized. Make sure
60         to `.dup` it if you need a longer lifetime.
61 
62         Returns:
63             A transient string to the current item. This will be `null` if
64             this range is `empty`.
65 
66     ***************************************************************************/
67 
68     public cstring front ()
69     {
70         return (&this).current_field;
71     }
72 
73 
74     /***************************************************************************
75 
76         Discard the current item (`front`) and process the next field
77 
78         This will change the current front. If not enough data is available,
79         and a non-null socket is provided, it will attempt to read as much as
80         possible data from this socket.
81 
82         Params:
83             socket = If non null, instead of becoming empty, popFront will
84                         attempt to read data from the socket
85             flags  = Flags to pass to recv in the event of a read from socket
86 
87         Returns:
88             The amount of data read from the network, if any (recv return value)
89 
90         Throws:
91             `ErrnoException` if `recv` returned a negative value
92 
93     ***************************************************************************/
94 
95     public ssize_t popFront (ISocket socket = null, int flags = 0)
96     {
97         auto off = (&this).locateChar('\n');
98 
99         if (off != (&this).length)
100         {
101             // Worst case scenario: the field starts at the end of the buffer
102             // and continues at the beginning. In this case, we have no choice
103             // but to copy data to our field_buffer to get something sane.
104             if ((&this).start_idx + off > (&this).buffer.length)
105             {
106                 auto p1len = (&this).buffer.length - (&this).start_idx;
107                 verify(p1len < off);
108 
109                 (&this).field_buffer[0 .. p1len]
110                     = (&this).buffer[(&this).start_idx .. (&this).buffer.length];
111 
112                 (&this).field_buffer[p1len .. off] = (&this).buffer[0 .. off - p1len];
113 
114                 (&this).current_field = (&this).field_buffer[0 .. off];
115             }
116             else
117             {
118                 // Usual case: We just return a slice to our buffer
119                 (&this).current_field = (&this).buffer[(&this).start_idx .. (&this).start_idx + off];
120             }
121             (&this).length -= (off + 1);
122             (&this).start_idx = !(&this).length ? 0 : (&this).calc((&this).start_idx, off + 1);
123         }
124         else if (socket !is null)
125         {
126             auto r = (&this).recv(socket, flags);
127             if (r <= 0)
128             {
129                 (&this).current_field = null;
130                 throw (&this).e.useGlobalErrno("recv");
131             }
132             (&this).popFront(socket, flags);
133         }
134         else
135         {
136             (&this).current_field = null;
137         }
138         return 0;
139     }
140 
141 
142     /***************************************************************************
143 
144         Tells whenever the range is empty (i.e. no more fields can be read)
145 
146         Note that empty doesn't mean that no more data is stored in the buffer,
147         but rather mean no more delimiter (or token) could be found in the data
148 
149     ***************************************************************************/
150 
151     public bool empty ()
152     {
153         return (&this).current_field is null;
154     }
155 
156 
157     /***************************************************************************
158 
159         Read data from the socket
160 
161         This function is only called from popFront when a socket is provided.
162 
163         Params:
164             socket = An ISocket to read from
165             flags  = flags to pass to `ISocket.recv`
166 
167         Returns:
168             The return value of `ISocket.recv`, which is the quantity of bytes
169             read.
170 
171     ***************************************************************************/
172 
173     private ssize_t recv (ISocket socket, int flags)
174     {
175         verify(socket !is null, "Cannot recv with a null socket");
176 
177         auto start = (&this).calc((&this).start_idx, (&this).length);
178         auto end   = start < (&this).start_idx ? (&this).start_idx : (&this).buffer.length;
179 
180         ssize_t ret = socket.recv((&this).buffer[start .. end], flags);
181 
182         // Errors are handled from popFront
183         if (ret <= 0)
184             return ret;
185 
186         (&this).length += ret;
187         verify((&this).length <= (&this).buffer.length);
188 
189         return ret;
190     }
191 
192 
193     /***************************************************************************
194 
195         Tell whenever the current data in the buffer are linear, or extend
196         past the end of the buffer and circle to the beginning
197 
198     ***************************************************************************/
199 
200     private bool isLinear ()
201     {
202         return !((&this).start_idx + (&this).length > (&this).buffer.length);
203     }
204 
205 
206     /***************************************************************************
207 
208         Helper function for locateChar
209 
210         Returns:
211             An end suitable for 'linear' reading of the buffer, that is,
212             an end which is always > this.start_idx
213 
214     ***************************************************************************/
215 
216     private size_t linearEnd ()
217     {
218         return (&this).isLinear()
219             ? ((&this).start_idx + (&this).length)
220             : ((&this).buffer.length);
221     }
222 
223 
224     /**************************************************************************
225 *
226         Returns: the maximum amount of data we can read
227 
228     ***************************************************************************/
229 
230     private size_t linearSpace ()
231     {
232         return (&this).isLinear()
233             ? ((&this).buffer.length - (&this).calc((&this).start_idx, (&this).length))
234             : ((&this).start_idx - (&this).calc((&this).start_idx, (&this).length));
235     }
236 
237 
238     /***************************************************************************
239 
240         Find the next occurrence of 'tok' in the string in a non-linear way
241 
242         Params:
243             tok = a token (character) to search from, starting from start_idx
244 
245         Returns:
246             The offset to `start_idx` (linear offset) at which the token is,
247             or `this.length` if it wasn't found
248 
249     ***************************************************************************/
250 
251     private size_t locateChar (char tok)
252     {
253         auto after = StringSearch!(false).locateChar(
254             (&this).buffer[(&this).start_idx .. (&this).linearEnd()], tok);
255         if ((&this).isLinear() || (&this).start_idx  + after < (&this).buffer.length)
256         {
257             return after;
258         }
259         // In this case, after ==> buffer.length - start_idx
260         return after + StringSearch!(false).locateChar(
261             (&this).buffer[0 .. (&this).length - after],
262             tok);
263     }
264 
265 
266     /***************************************************************************
267 
268         Helper function to calculate index in the buffer from offsets
269 
270         Params:
271             idx = The index to start from
272             val = The offset to add
273 
274         Returns:
275             An index in `this.buffer`. It is always in-bound.
276 
277     ***************************************************************************/
278 
279     private size_t calc (size_t idx, size_t val)
280     {
281         return (idx + val) % (&this).buffer.length;
282     }
283 
284 
285     /***************************************************************************
286 
287         Buffer in which the data will be stored
288 
289     ***************************************************************************/
290 
291     private char[MAX_FIELD_SIZE * FIELDS] buffer;
292 
293 
294     /***************************************************************************
295 
296         Internal buffer in which the current line will be copied in the event
297         of a line being non-linear (starts at the end of the buffer and
298         continue at the beginning).
299 
300     ***************************************************************************/
301 
302     private char[MAX_FIELD_SIZE] field_buffer;
303 
304 
305     /***************************************************************************
306 
307         A slice to the data currently being the `front()`
308 
309     ***************************************************************************/
310 
311     private cstring current_field;
312 
313 
314     /***************************************************************************
315 
316         Unprocessed data start
317 
318     ***************************************************************************/
319 
320     private size_t start_idx;
321 
322 
323     /***************************************************************************
324 
325         Unprocessed data length
326 
327     ***************************************************************************/
328 
329     private size_t length;
330 
331 
332     /***************************************************************************
333 
334         Exception to throw on error
335 
336         Note:
337             It is set from outside, hence the package visibility.
338 
339     ***************************************************************************/
340 
341     package ErrnoException e;
342 }
343 
344 unittest
345 {
346     // Ensure it compiles
347     SocketReader!() reader;
348 }