1 /*******************************************************************************
2 
3     An utility class to interact with collectd-unixsock plugin
4 
5     This class is a simple wrapper around Collectd's functionalities,
6     providing parsing and communication means.
7 
8     Most users will not want to use this module directly and should prefer
9     the high-level stats API provided in `ocean.util.log.Stats`.
10 
11     See_Also:
12         https://collectd.org/documentation/manpages/collectd-unixsock.5.shtml
13 
14     Copyright:
15         Copyright (c) 2015-2016 dunnhumby Germany GmbH.
16         All rights reserved.
17 
18     License:
19         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
20         Alternatively, this file may be distributed under the terms of the Tango
21         3-Clause BSD License (see LICENSE_BSD.txt for details).
22 
23 *******************************************************************************/
24 
25 module ocean.net.collectd.Collectd;
26 
27 
28 /*******************************************************************************
29 
30     Usage example
31 
32 *******************************************************************************/
33 
34 unittest
35 {
36     void sendCollectdData ()
37     {
38         // Every call to collectd (but `listval`) needs to use an `Identifier`.
39         // See Collectd's documentation for more information. Here we create an
40         // app-global identifier.
41         Identifier id =
42         {
43             host:               "example.com",
44             plugin:             "http_server",
45             type:               "requests",       // how much traffic it handles
46             plugin_instance:    "1",              // the instance number
47             type_instance:      "worker-1"
48         };
49 
50         // Note that if you have a Collectd-provided identifier, you can
51         // read it using `Identifier.create`
52         // Here we use the convenience overload that throws on error, however
53         // there is one version which returns a message if the parsing failed.
54         auto id2 = Identifier.create("sociomantic.com/http_server-1/requests-worker-1");
55 
56         // Construct a Collectd instance that connect() to the socket.
57         // If the connect() fails, an `ErrnoIOException` is thrown.
58         // The parameter is the path of the Collectd socket
59         auto collectd = new Collectd("/var/run/collectd.socket");
60 
61         // From this point on you can use the instance to talk to the socket.
62         // Once a function that returns a set of data is called (e.g. `listval`),
63         // no other function should be called until the result is fully
64         // processed, as this class internally uses a rotating buffer to
65         // minimize memory allocations.
66         // If a new request is started while the previous one isn't
67         // fully processed, a `CollectdException` will be thrown.
68 
69         // When writing a value, you need a structure that match a definition
70         // in your `types.db` file.
71         //
72         // The documentation of `types.db` can be found here:
73         // https://collectd.org/documentation/manpages/types.db.5.shtml
74         //
75         // The name of the struct doesn't matter, only what's in `id`.
76         // To simplify the example, we use a struct that is defined by default
77         // in `types.db`.
78         // Note: the definition is `bytes value:GAUGE:0:U`
79         static struct Charge { double value; }
80         Charge charge = Charge(42.0);
81 
82         // Write an entry to collectd.
83         collectd.putval(id, charge);
84         // Will send `PUTVAL current_unix_timestamp:42` on the wire
85     }
86 }
87 
88 
89 
90 import ocean.transition;
91 import ocean.core.Enforce;
92 import ocean.core.Verify;
93 import ocean.core.Exception;
94 import ocean.stdc.posix.sys.un;
95 import core.stdc.time; // time
96 import ocean.stdc.posix.sys.types; // time_t
97 import ocean.sys.ErrnoException;
98 import core.sys.posix.sys.socket;  // SOCK_DGRAM
99 import ocean.sys.socket.UnixSocket;
100 import ocean.text.Util;
101 import Float = ocean.text.convert.Float;
102 import ocean.text.convert.Formatter;
103 import ocean.text.convert.Integer;
104 import Conv = ocean.util.Convert;
105 import ocean.text.util.StringSearch; // locateChar
106 
107 import ocean.net.collectd.SocketReader;
108 public import ocean.net.collectd.Identifier;
109 
110 version (UnitTest)
111 {
112     import ocean.core.Test;
113     import ocean.io.Stdout : Stdout;
114 }
115 
116 
117 /*******************************************************************************
118 
119     Collectd wrapper class
120 
121     Encapsulate communication with the Collectd socket, as well as parsing
122     of its messages.
123 
124     Note:
125         You must be careful when mixing calls. Returned data are transient
126         (sits in an internal buffer), and might get invalidated on the next
127         call to a member function.
128 
129         For example, don't do:
130 
131         ````
132         Collectd inst = ...;
133         foreach (v; inst.listval())
134         {
135             inst.getval!(Counter)(v);
136         }
137         ````
138         Because this might invalidate the data returned by `listval()` on the
139         first call to `getval()`
140 
141     Note:
142         PUTNOTIF is not implemented
143 
144 *******************************************************************************/
145 
146 public final class Collectd
147 {
148     /***************************************************************************
149 
150         Values returned by 'listval'
151 
152     ***************************************************************************/
153 
154     public static struct Value
155     {
156         /***********************************************************************
157 
158             The timestamp - as a floating point value - of the last update
159 
160         ***********************************************************************/
161 
162         public double last_update;
163 
164 
165         /***********************************************************************
166 
167             An identifier the can be passed to `getval()`
168 
169         ***********************************************************************/
170 
171         public Identifier identifier;
172     }
173 
174 
175     /***************************************************************************
176 
177         Options that can be passed to Putval
178 
179     ***************************************************************************/
180 
181     public struct PutvalOptions
182     {
183         /***********************************************************************
184 
185             Gives the interval in which the data is being collected.
186 
187         ***********************************************************************/
188 
189         public time_t interval;
190     }
191 
192 
193     /***************************************************************************
194 
195         Constructor
196 
197         Params:
198             socket_path = Path of the local socket of the collectd daemon.
199 
200         Throws:
201             If it can't create the socket or connect to the collectd daemon,
202             an Exception is thrown.
203 
204     ***************************************************************************/
205 
206     public this (istring socket_path)
207     {
208         auto socketaddr = sockaddr_un.create(socket_path);
209 
210         this.socket = new UnixSocket();
211 
212         this.e_errno = new ErrnoException();
213         this.e = new CollectdException(256);
214         this.reader.e = this.e_errno;
215 
216         auto sockRet = this.socket.socket();
217         if (sockRet < 0)
218             throw this.e_errno.useGlobalErrno("socket");
219 
220         if (auto connectRet = this.socket.connect(&socketaddr))
221             throw this.e_errno.useGlobalErrno("connect");
222 
223         // This ought to be enough for any numeric argument
224         this.format_buff = new mstring(256);
225         this.format_buff.length = 0;
226         enableStomping(this.format_buff);
227     }
228 
229 
230     /***************************************************************************
231 
232         Submits one or more values, identified by Identifier to the daemon
233         which will dispatch it to all its write-plugins
234 
235         Params:
236             id      = Uniquely identifies what value is being collected.
237                       Note the `type` must be defined in `types.db`.
238 
239             data    = A struct containing only numeric types. Values can either
240                       be an integer if the data-source is a counter,
241                       or a double if the data-source is of type "gauge".
242                       NaN and infinity are translated to undefined values ('U').
243                       The current UNIX time is submitted along.
244 
245             options = The options list is an optional parameter, where each
246                       option is sent as a key-value-pair.
247                       See `PutvalOptions`'s documentation for a list
248                       of all currently recognized options, however be aware
249                       that an outdated Collectd which doesn't support all
250                       the options will silently ignore them.
251 
252         Throws:
253             `ErrnoException` if writing to the socket produced an error,
254             or `CollectdException` if an error happened while communicating
255             (Collectd returns an error, the internal buffer wasn't empty (which
256             means the caller haven't fully processed the last query),
257             or we get unexpected / inconsistent data), or if more than
258             10 millions records where found
259 
260     ***************************************************************************/
261 
262     public void putval (T) (Identifier id, ref T data,
263                             PutvalOptions options = PutvalOptions.init)
264     {
265         static assert (is (T == struct) || is (T == class),
266                        "Only struct and classes can be sent to Collectd");
267         static assert (T.tupleof.length,
268                        "Cannot send empty aggregate of type "
269                        ~ T.stringof ~ " to Collectd");
270 
271         this.startNewRequest!("putval");
272 
273         this.format("PUTVAL ", id);
274 
275         // Write the options
276         if (options.interval)
277             this.format(` interval="`, options.interval, `"`);
278 
279         // Every line should start with the timestamp
280         this.format(" ", time(null));
281 
282         // Write all the data
283         foreach (idx, ref v; data.tupleof)
284             this.format(":", v);
285 
286         // All lines need to end with a \n
287         this.format("\n");
288         this.write(this.format_buff);
289 
290         this.reader.popFront(this.socket, 0);
291 
292         // Check for success
293         this.e.enforce(this.reader.front()[0 .. PutvalSuccessLineBegin.length]
294                        == PutvalSuccessLineBegin,
295                        this.reader.front());
296         this.reader.popFront();
297         if (!this.reader.empty())
298             throw this.e.set("Unexpected line received from Collectd: ")
299                 .append(this.reader.front());
300     }
301 
302 
303     /***************************************************************************
304 
305         Read a status line as sent by collectd
306 
307         Params:
308             line = The status line read from collectd. It should be in the form
309                     "X Values found", where X is a number greater than 1, or
310                     "1 Value found".
311 
312         Throws:
313             `CollectdException` if the status line is non conformant
314 
315         Returns:
316             On success the number of values found (that is, 'X' or 'Y')
317 
318     ***************************************************************************/
319 
320     private size_t processStatusLine (cstring line)
321     {
322         size_t values = void;
323         auto spIdx = StringSearch!(false).locateChar(line, ' ');
324 
325         auto vfound = line[spIdx .. $];
326         if (vfound != " Values found" && vfound != " Value found")
327             throw this.e.set("Expected 'Value(s) found' in status line, got ")
328                 .append(vfound);
329 
330         auto vstring = line[0 .. spIdx];
331         if (!toInteger(vstring, values))
332             throw this.e.set("Could not convert '").append(vstring)
333                 .append("' to integer");
334 
335         return values;
336     }
337 
338 
339     /***************************************************************************
340 
341         An instance to the socket used to communicate with collectd daemon
342 
343         When reading from the socket, collectd always send *at least* one line,
344         the status line. Lines are always send in full.
345         The socket is a streaming (TCP) socket.
346 
347         The minimal status line one can get is "0 Value found\n", which has a
348         length of 14. If we limit ourselves to a max value of
349         size_t.length, or 18_446_744_073_709_551_615 on 64 bits machines,
350         we can get a status line which size is comprised between 14 and 34.
351 
352     ***************************************************************************/
353 
354     private UnixSocket socket;
355 
356 
357     /***************************************************************************
358 
359         Exception when a non-IO error happen while communicating with Collectd
360 
361     ***************************************************************************/
362 
363     private CollectdException e;
364 
365 
366     /***************************************************************************
367 
368         Exception when an IO error happen
369 
370     ***************************************************************************/
371 
372     private ErrnoException e_errno;
373 
374 
375     /***************************************************************************
376 
377         An instance of the line reader
378 
379     ***************************************************************************/
380 
381     private SocketReader!() reader;
382 
383 
384     /***************************************************************************
385 
386         Internal buffer used to format non-string arguments
387 
388     ***************************************************************************/
389 
390     private mstring format_buff;
391 
392 
393     /***************************************************************************
394 
395         What putval returns on success
396 
397     ***************************************************************************/
398 
399     private static immutable istring PutvalSuccessLineBegin = "0 Success: ";
400 
401 
402     /***************************************************************************
403 
404         Write the content of an identifier to a buffer
405 
406         Params:
407             identifier = Identifier instance to write
408 
409     ***************************************************************************/
410 
411     private void formatIdentifier (ref Const!(Identifier) identifier)
412     {
413         verify(identifier.host.length != 0, "No host for identifier");
414         verify(identifier.plugin.length != 0, "No plugin for identifier");
415         verify(identifier.type.length != 0, "No type for identifier");
416 
417         auto pi = identifier.plugin_instance.length ? "-" : null;
418         auto ti = identifier.type_instance.length ? "-" : null;
419 
420         this.format_buff ~= identifier.host;
421         this.format_buff ~= '/';
422         this.format_buff ~= identifier.plugin;
423         this.format_buff ~= pi;
424         this.format_buff ~= identifier.plugin_instance;
425         this.format_buff ~= '/';
426         this.format_buff ~= identifier.type;
427         this.format_buff ~= ti;
428         this.format_buff ~= identifier.type_instance;
429     }
430 
431 
432     /***************************************************************************
433 
434         Append stringified arguments into `this.format_buff`
435 
436         Params:
437             args = Array of arguments to write to `this.format_buff`.
438                    `Identifier`, string types and numeric values are supported.
439 
440     ***************************************************************************/
441 
442     private void format (T...) (in T args)
443     {
444         scope sink = (Const!(char)[] v)
445                      {
446                          this.format_buff ~= v;
447                      };
448 
449         foreach (arg; args)
450         {
451             static if (is(typeof(arg) : Unqual!(Identifier)))
452                 this.formatIdentifier(arg);
453             else static if (is(typeof(arg) == struct)
454                             || is(typeof(arg) == class)
455                             || is(typeof(arg) == enum))
456                 static assert(0, "Cannot send an aggregate of type "
457                               ~ typeof(arg).stringof ~ " to Collectd");
458             else
459                 sformat(sink, "{}", arg);
460         }
461     }
462 
463 
464     /***************************************************************************
465 
466         Helper to write data to a socket
467 
468         Params:
469             str = String to send on the socket.
470                   Usually a literal, or the formatted buffer.
471 
472         Throws:
473             `CollectdException` if writing to the Collectd socket failed
474 
475     ***************************************************************************/
476 
477     private void write (cstring str)
478     {
479         auto r = this.socket.write(str);
480         if (r != str.length)
481             throw this.e_errno.useGlobalErrno("write");
482     }
483 
484 
485     /***************************************************************************
486 
487         Sanity check to ensure a request is started with a clean slate
488 
489         Also reset the formatting buffer.
490 
491         Params:
492             reqname = Name of the request that is started, for more informative
493                       error message.
494 
495         Throws:
496             `CollectdException` if there is data in the buffer.
497 
498     ***************************************************************************/
499 
500     private void startNewRequest (istring reqname /*= __FUNCTION__*/) ()
501     {
502         this.format_buff.length = 0;
503         enableStomping(this.format_buff);
504 
505         this.e.enforce(this.reader.empty(),
506                        "Called " ~ reqname ~ " with a non-empty buffer");
507     }
508 }
509 
510 
511 /*******************************************************************************
512 
513     Exception to be thrown when an error happens in Collectd
514 
515 *******************************************************************************/
516 
517 public class CollectdException : Exception
518 {
519     mixin ReusableExceptionImplementation!();
520 }