1 /*******************************************************************************
2 
3     An utility class to interact with collectd-unixsock plugin
4 
5     This class is a simple wrapper around Collectd's functionalities,
6     providing parsing and communication means.
7 
8     Most users will not want to use this module directly and should prefer
9     the high-level stats API provided in `ocean.util.log.Stats`.
10 
11     See_Also:
12         https://collectd.org/documentation/manpages/collectd-unixsock.5.shtml
13 
14     Copyright:
15         Copyright (c) 2015-2016 dunnhumby Germany GmbH.
16         All rights reserved.
17 
18     License:
19         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
20         Alternatively, this file may be distributed under the terms of the Tango
21         3-Clause BSD License (see LICENSE_BSD.txt for details).
22 
23 *******************************************************************************/
24 
25 module ocean.net.collectd.Collectd;
26 
27 
28 /*******************************************************************************
29 
30     Usage example
31 
32 *******************************************************************************/
33 
34 unittest
35 {
36     void sendCollectdData ()
37     {
38         // Every call to collectd (but `listval`) needs to use an `Identifier`.
39         // See Collectd's documentation for more information. Here we create an
40         // app-global identifier.
41         Identifier id =
42         {
43             host:               "example.com",
44             plugin:             "http_server",
45             type:               "requests",       // how much traffic it handles
46             plugin_instance:    "1",              // the instance number
47             type_instance:      "worker-1"
48         };
49 
50         // Note that if you have a Collectd-provided identifier, you can
51         // read it using `Identifier.create`
52         // Here we use the convenience overload that throws on error, however
53         // there is one version which returns a message if the parsing failed.
54         auto id2 = Identifier.create("sociomantic.com/http_server-1/requests-worker-1");
55 
56         // Construct a Collectd instance that connect() to the socket.
57         // If the connect() fails, an `ErrnoIOException` is thrown.
58         // The parameter is the path of the Collectd socket
59         auto collectd = new Collectd("/var/run/collectd.socket");
60 
61         // From this point on you can use the instance to talk to the socket.
62         // Once a function that returns a set of data is called (e.g. `listval`),
63         // no other function should be called until the result is fully
64         // processed, as this class internally uses a rotating buffer to
65         // minimize memory allocations.
66         // If a new request is started while the previous one isn't
67         // fully processed, a `CollectdException` will be thrown.
68 
69         // When writing a value, you need a structure that match a definition
70         // in your `types.db` file.
71         //
72         // The documentation of `types.db` can be found here:
73         // https://collectd.org/documentation/manpages/types.db.5.shtml
74         //
75         // The name of the struct doesn't matter, only what's in `id`.
76         // To simplify the example, we use a struct that is defined by default
77         // in `types.db`.
78         // Note: the definition is `bytes value:GAUGE:0:U`
79         static struct Charge { double value; }
80         Charge charge = Charge(42.0);
81 
82         // Write an entry to collectd.
83         collectd.putval(id, charge);
84         // Will send `PUTVAL current_unix_timestamp:42` on the wire
85     }
86 }
87 
88 
89 import ocean.meta.types.Qualifiers;
90 import ocean.core.Enforce;
91 import ocean.core.Verify;
92 import ocean.core.Exception;
93 import ocean.net.collectd.SocketReader;
94 public import ocean.net.collectd.Identifier;
95 import ocean.stdc.posix.sys.un;
96 import ocean.sys.ErrnoException;
97 import ocean.sys.socket.UnixSocket;
98 import ocean.text.Util;
99 import Float = ocean.text.convert.Float;
100 import ocean.text.convert.Formatter;
101 import ocean.text.convert.Integer;
102 import Conv = ocean.util.Convert;
103 import ocean.text.util.StringSearch; // locateChar
104 
105 import core.stdc.time; // time
106 import core.sys.posix.sys.socket;  // SOCK_DGRAM
107 import core.sys.posix.sys.types; // time_t
108 
109 version (unittest)
110 {
111     import ocean.core.Test;
112     import ocean.io.Stdout : Stdout;
113 }
114 
115 
116 /*******************************************************************************
117 
118     Collectd wrapper class
119 
120     Encapsulate communication with the Collectd socket, as well as parsing
121     of its messages.
122 
123     Note:
124         You must be careful when mixing calls. Returned data are transient
125         (sits in an internal buffer), and might get invalidated on the next
126         call to a member function.
127 
128         For example, don't do:
129 
130         ````
131         Collectd inst = ...;
132         foreach (v; inst.listval())
133         {
134             inst.getval!(Counter)(v);
135         }
136         ````
137         Because this might invalidate the data returned by `listval()` on the
138         first call to `getval()`
139 
140     Note:
141         PUTNOTIF is not implemented
142 
143 *******************************************************************************/
144 
145 public final class Collectd
146 {
147     /***************************************************************************
148 
149         Values returned by 'listval'
150 
151     ***************************************************************************/
152 
153     public static struct Value
154     {
155         /***********************************************************************
156 
157             The timestamp - as a floating point value - of the last update
158 
159         ***********************************************************************/
160 
161         public double last_update;
162 
163 
164         /***********************************************************************
165 
166             An identifier the can be passed to `getval()`
167 
168         ***********************************************************************/
169 
170         public Identifier identifier;
171     }
172 
173 
174     /***************************************************************************
175 
176         Options that can be passed to Putval
177 
178     ***************************************************************************/
179 
180     public struct PutvalOptions
181     {
182         /***********************************************************************
183 
184             Gives the interval in which the data is being collected.
185 
186         ***********************************************************************/
187 
188         public time_t interval;
189     }
190 
191 
192     /***************************************************************************
193 
194         Constructor
195 
196         Params:
197             socket_path = Path of the local socket of the collectd daemon.
198 
199         Throws:
200             If it can't create the socket or connect to the collectd daemon,
201             an Exception is thrown.
202 
203     ***************************************************************************/
204 
205     public this (istring socket_path)
206     {
207         auto socketaddr = sockaddr_un.create(socket_path);
208 
209         this.socket = new UnixSocket();
210 
211         this.e_errno = new ErrnoException();
212         this.e = new CollectdException(256);
213         this.reader.e = this.e_errno;
214 
215         auto sockRet = this.socket.socket();
216         if (sockRet < 0)
217             throw this.e_errno.useGlobalErrno("socket");
218 
219         if (auto connectRet = this.socket.connect(&socketaddr))
220             throw this.e_errno.useGlobalErrno("connect");
221 
222         // This ought to be enough for any numeric argument
223         this.format_buff = new mstring(256);
224         this.format_buff.length = 0;
225         assumeSafeAppend(this.format_buff);
226     }
227 
228 
229     /***************************************************************************
230 
231         Submits one or more values, identified by Identifier to the daemon
232         which will dispatch it to all its write-plugins
233 
234         Params:
235             id      = Uniquely identifies what value is being collected.
236                       Note the `type` must be defined in `types.db`.
237 
238             data    = A struct containing only numeric types. Values can either
239                       be an integer if the data-source is a counter,
240                       or a double if the data-source is of type "gauge".
241                       NaN and infinity are translated to undefined values ('U').
242                       The current UNIX time is submitted along.
243 
244             options = The options list is an optional parameter, where each
245                       option is sent as a key-value-pair.
246                       See `PutvalOptions`'s documentation for a list
247                       of all currently recognized options, however be aware
248                       that an outdated Collectd which doesn't support all
249                       the options will silently ignore them.
250 
251         Throws:
252             `ErrnoException` if writing to the socket produced an error,
253             or `CollectdException` if an error happened while communicating
254             (Collectd returns an error, the internal buffer wasn't empty (which
255             means the caller haven't fully processed the last query),
256             or we get unexpected / inconsistent data), or if more than
257             10 millions records where found
258 
259     ***************************************************************************/
260 
261     public void putval (T) (Identifier id, ref T data,
262                             PutvalOptions options = PutvalOptions.init)
263     {
264         static assert (is (T == struct) || is (T == class),
265                        "Only struct and classes can be sent to Collectd");
266         static assert (T.tupleof.length,
267                        "Cannot send empty aggregate of type "
268                        ~ T.stringof ~ " to Collectd");
269 
270         this.startNewRequest!("putval");
271 
272         this.format("PUTVAL ", id);
273 
274         // Write the options
275         if (options.interval)
276             this.format(` interval="`, options.interval, `"`);
277 
278         // Every line should start with the timestamp
279         this.format(" ", time(null));
280 
281         // Write all the data
282         foreach (idx, ref v; data.tupleof)
283             this.format(":", v);
284 
285         // All lines need to end with a \n
286         this.format("\n");
287         this.write(this.format_buff);
288 
289         this.reader.popFront(this.socket, 0);
290 
291         // Check for success
292         this.e.enforce(this.reader.front()[0 .. PutvalSuccessLineBegin.length]
293                        == PutvalSuccessLineBegin,
294                        this.reader.front());
295         this.reader.popFront();
296         if (!this.reader.empty())
297             throw this.e.set("Unexpected line received from Collectd: ")
298                 .append(this.reader.front());
299     }
300 
301 
302     /***************************************************************************
303 
304         Read a status line as sent by collectd
305 
306         Params:
307             line = The status line read from collectd. It should be in the form
308                     "X Values found", where X is a number greater than 1, or
309                     "1 Value found".
310 
311         Throws:
312             `CollectdException` if the status line is non conformant
313 
314         Returns:
315             On success the number of values found (that is, 'X' or 'Y')
316 
317     ***************************************************************************/
318 
319     private size_t processStatusLine (cstring line)
320     {
321         size_t values = void;
322         auto spIdx = StringSearch!(false).locateChar(line, ' ');
323 
324         auto vfound = line[spIdx .. $];
325         if (vfound != " Values found" && vfound != " Value found")
326             throw this.e.set("Expected 'Value(s) found' in status line, got ")
327                 .append(vfound);
328 
329         auto vstring = line[0 .. spIdx];
330         if (!toInteger(vstring, values))
331             throw this.e.set("Could not convert '").append(vstring)
332                 .append("' to integer");
333 
334         return values;
335     }
336 
337 
338     /***************************************************************************
339 
340         An instance to the socket used to communicate with collectd daemon
341 
342         When reading from the socket, collectd always send *at least* one line,
343         the status line. Lines are always send in full.
344         The socket is a streaming (TCP) socket.
345 
346         The minimal status line one can get is "0 Value found\n", which has a
347         length of 14. If we limit ourselves to a max value of
348         size_t.length, or 18_446_744_073_709_551_615 on 64 bits machines,
349         we can get a status line which size is comprised between 14 and 34.
350 
351     ***************************************************************************/
352 
353     private UnixSocket socket;
354 
355 
356     /***************************************************************************
357 
358         Exception when a non-IO error happen while communicating with Collectd
359 
360     ***************************************************************************/
361 
362     private CollectdException e;
363 
364 
365     /***************************************************************************
366 
367         Exception when an IO error happen
368 
369     ***************************************************************************/
370 
371     private ErrnoException e_errno;
372 
373 
374     /***************************************************************************
375 
376         An instance of the line reader
377 
378     ***************************************************************************/
379 
380     private SocketReader!() reader;
381 
382 
383     /***************************************************************************
384 
385         Internal buffer used to format non-string arguments
386 
387     ***************************************************************************/
388 
389     private mstring format_buff;
390 
391 
392     /***************************************************************************
393 
394         What putval returns on success
395 
396     ***************************************************************************/
397 
398     private static immutable istring PutvalSuccessLineBegin = "0 Success: ";
399 
400 
401     /***************************************************************************
402 
403         Write the content of an identifier to a buffer
404 
405         Params:
406             identifier = Identifier instance to write
407 
408     ***************************************************************************/
409 
410     private void formatIdentifier (ref const(Identifier) identifier)
411     {
412         verify(identifier.host.length != 0, "No host for identifier");
413         verify(identifier.plugin.length != 0, "No plugin for identifier");
414         verify(identifier.type.length != 0, "No type for identifier");
415 
416         auto pi = identifier.plugin_instance.length ? "-" : null;
417         auto ti = identifier.type_instance.length ? "-" : null;
418 
419         this.format_buff ~= identifier.host;
420         this.format_buff ~= '/';
421         this.format_buff ~= identifier.plugin;
422         this.format_buff ~= pi;
423         this.format_buff ~= identifier.plugin_instance;
424         this.format_buff ~= '/';
425         this.format_buff ~= identifier.type;
426         this.format_buff ~= ti;
427         this.format_buff ~= identifier.type_instance;
428     }
429 
430 
431     /***************************************************************************
432 
433         Append stringified arguments into `this.format_buff`
434 
435         Params:
436             args = Array of arguments to write to `this.format_buff`.
437                    `Identifier`, string types and numeric values are supported.
438 
439     ***************************************************************************/
440 
441     private void format (T...) (in T args)
442     {
443         scope sink = (const(char)[] v)
444                      {
445                          this.format_buff ~= v;
446                      };
447 
448         foreach (arg; args)
449         {
450             static if (is(typeof(arg) : Unqual!(Identifier)))
451                 this.formatIdentifier(arg);
452             else static if (is(typeof(arg) == struct)
453                             || is(typeof(arg) == class)
454                             || is(typeof(arg) == enum))
455                 static assert(0, "Cannot send an aggregate of type "
456                               ~ typeof(arg).stringof ~ " to Collectd");
457             else
458                 sformat(sink, "{}", arg);
459         }
460     }
461 
462 
463     /***************************************************************************
464 
465         Helper to write data to a socket
466 
467         Params:
468             str = String to send on the socket.
469                   Usually a literal, or the formatted buffer.
470 
471         Throws:
472             `CollectdException` if writing to the Collectd socket failed
473 
474     ***************************************************************************/
475 
476     private void write (cstring str)
477     {
478         auto r = this.socket.write(str);
479         if (r != str.length)
480             throw this.e_errno.useGlobalErrno("write");
481     }
482 
483 
484     /***************************************************************************
485 
486         Sanity check to ensure a request is started with a clean slate
487 
488         Also reset the formatting buffer.
489 
490         Params:
491             reqname = Name of the request that is started, for more informative
492                       error message.
493 
494         Throws:
495             `CollectdException` if there is data in the buffer.
496 
497     ***************************************************************************/
498 
499     private void startNewRequest (istring reqname /*= __FUNCTION__*/) ()
500     {
501         this.format_buff.length = 0;
502         assumeSafeAppend(this.format_buff);
503 
504         this.e.enforce(this.reader.empty(),
505                        "Called " ~ reqname ~ " with a non-empty buffer");
506     }
507 }
508 
509 
510 /*******************************************************************************
511 
512     Exception to be thrown when an error happens in Collectd
513 
514 *******************************************************************************/
515 
516 public class CollectdException : Exception
517 {
518     mixin ReusableExceptionImplementation!();
519 }