1 /******************************************************************************* 2 3 An utility class to interact with collectd-unixsock plugin 4 5 This class is a simple wrapper around Collectd's functionalities, 6 providing parsing and communication means. 7 8 Most users will not want to use this module directly and should prefer 9 the high-level stats API provided in `ocean.util.log.Stats`. 10 11 See_Also: 12 https://collectd.org/documentation/manpages/collectd-unixsock.5.shtml 13 14 Copyright: 15 Copyright (c) 2015-2016 dunnhumby Germany GmbH. 16 All rights reserved. 17 18 License: 19 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 20 Alternatively, this file may be distributed under the terms of the Tango 21 3-Clause BSD License (see LICENSE_BSD.txt for details). 22 23 *******************************************************************************/ 24 25 module ocean.net.collectd.Collectd; 26 27 28 /******************************************************************************* 29 30 Usage example 31 32 *******************************************************************************/ 33 34 unittest 35 { 36 void sendCollectdData () 37 { 38 // Every call to collectd (but `listval`) needs to use an `Identifier`. 39 // See Collectd's documentation for more information. Here we create an 40 // app-global identifier. 41 Identifier id = 42 { 43 host: "example.com", 44 plugin: "http_server", 45 type: "requests", // how much traffic it handles 46 plugin_instance: "1", // the instance number 47 type_instance: "worker-1" 48 }; 49 50 // Note that if you have a Collectd-provided identifier, you can 51 // read it using `Identifier.create` 52 // Here we use the convenience overload that throws on error, however 53 // there is one version which returns a message if the parsing failed. 54 auto id2 = Identifier.create("sociomantic.com/http_server-1/requests-worker-1"); 55 56 // Construct a Collectd instance that connect() to the socket. 57 // If the connect() fails, an `ErrnoIOException` is thrown. 58 // The parameter is the path of the Collectd socket 59 auto collectd = new Collectd("/var/run/collectd.socket"); 60 61 // From this point on you can use the instance to talk to the socket. 62 // Once a function that returns a set of data is called (e.g. `listval`), 63 // no other function should be called until the result is fully 64 // processed, as this class internally uses a rotating buffer to 65 // minimize memory allocations. 66 // If a new request is started while the previous one isn't 67 // fully processed, a `CollectdException` will be thrown. 68 69 // When writing a value, you need a structure that match a definition 70 // in your `types.db` file. 71 // 72 // The documentation of `types.db` can be found here: 73 // https://collectd.org/documentation/manpages/types.db.5.shtml 74 // 75 // The name of the struct doesn't matter, only what's in `id`. 76 // To simplify the example, we use a struct that is defined by default 77 // in `types.db`. 78 // Note: the definition is `bytes value:GAUGE:0:U` 79 static struct Charge { double value; } 80 Charge charge = Charge(42.0); 81 82 // Write an entry to collectd. 83 collectd.putval(id, charge); 84 // Will send `PUTVAL current_unix_timestamp:42` on the wire 85 } 86 } 87 88 89 import ocean.meta.types.Qualifiers; 90 import ocean.core.Enforce; 91 import ocean.core.Verify; 92 import ocean.core.Exception; 93 import ocean.net.collectd.SocketReader; 94 public import ocean.net.collectd.Identifier; 95 import ocean.stdc.posix.sys.un; 96 import ocean.sys.ErrnoException; 97 import ocean.sys.socket.UnixSocket; 98 import ocean.text.Util; 99 import Float = ocean.text.convert.Float; 100 import ocean.text.convert.Formatter; 101 import ocean.text.convert.Integer; 102 import Conv = ocean.util.Convert; 103 import ocean.text.util.StringSearch; // locateChar 104 105 import core.stdc.time; // time 106 import core.sys.posix.sys.socket; // SOCK_DGRAM 107 import core.sys.posix.sys.types; // time_t 108 109 version (unittest) 110 { 111 import ocean.core.Test; 112 import ocean.io.Stdout : Stdout; 113 } 114 115 116 /******************************************************************************* 117 118 Collectd wrapper class 119 120 Encapsulate communication with the Collectd socket, as well as parsing 121 of its messages. 122 123 Note: 124 You must be careful when mixing calls. Returned data are transient 125 (sits in an internal buffer), and might get invalidated on the next 126 call to a member function. 127 128 For example, don't do: 129 130 ```` 131 Collectd inst = ...; 132 foreach (v; inst.listval()) 133 { 134 inst.getval!(Counter)(v); 135 } 136 ```` 137 Because this might invalidate the data returned by `listval()` on the 138 first call to `getval()` 139 140 Note: 141 PUTNOTIF is not implemented 142 143 *******************************************************************************/ 144 145 public final class Collectd 146 { 147 /*************************************************************************** 148 149 Values returned by 'listval' 150 151 ***************************************************************************/ 152 153 public static struct Value 154 { 155 /*********************************************************************** 156 157 The timestamp - as a floating point value - of the last update 158 159 ***********************************************************************/ 160 161 public double last_update; 162 163 164 /*********************************************************************** 165 166 An identifier the can be passed to `getval()` 167 168 ***********************************************************************/ 169 170 public Identifier identifier; 171 } 172 173 174 /*************************************************************************** 175 176 Options that can be passed to Putval 177 178 ***************************************************************************/ 179 180 public struct PutvalOptions 181 { 182 /*********************************************************************** 183 184 Gives the interval in which the data is being collected. 185 186 ***********************************************************************/ 187 188 public time_t interval; 189 } 190 191 192 /*************************************************************************** 193 194 Constructor 195 196 Params: 197 socket_path = Path of the local socket of the collectd daemon. 198 199 Throws: 200 If it can't create the socket or connect to the collectd daemon, 201 an Exception is thrown. 202 203 ***************************************************************************/ 204 205 public this (istring socket_path) 206 { 207 auto socketaddr = sockaddr_un.create(socket_path); 208 209 this.socket = new UnixSocket(); 210 211 this.e_errno = new ErrnoException(); 212 this.e = new CollectdException(256); 213 this.reader.e = this.e_errno; 214 215 auto sockRet = this.socket.socket(); 216 if (sockRet < 0) 217 throw this.e_errno.useGlobalErrno("socket"); 218 219 if (auto connectRet = this.socket.connect(&socketaddr)) 220 throw this.e_errno.useGlobalErrno("connect"); 221 222 // This ought to be enough for any numeric argument 223 this.format_buff = new mstring(256); 224 this.format_buff.length = 0; 225 assumeSafeAppend(this.format_buff); 226 } 227 228 229 /*************************************************************************** 230 231 Submits one or more values, identified by Identifier to the daemon 232 which will dispatch it to all its write-plugins 233 234 Params: 235 id = Uniquely identifies what value is being collected. 236 Note the `type` must be defined in `types.db`. 237 238 data = A struct containing only numeric types. Values can either 239 be an integer if the data-source is a counter, 240 or a double if the data-source is of type "gauge". 241 NaN and infinity are translated to undefined values ('U'). 242 The current UNIX time is submitted along. 243 244 options = The options list is an optional parameter, where each 245 option is sent as a key-value-pair. 246 See `PutvalOptions`'s documentation for a list 247 of all currently recognized options, however be aware 248 that an outdated Collectd which doesn't support all 249 the options will silently ignore them. 250 251 Throws: 252 `ErrnoException` if writing to the socket produced an error, 253 or `CollectdException` if an error happened while communicating 254 (Collectd returns an error, the internal buffer wasn't empty (which 255 means the caller haven't fully processed the last query), 256 or we get unexpected / inconsistent data), or if more than 257 10 millions records where found 258 259 ***************************************************************************/ 260 261 public void putval (T) (Identifier id, ref T data, 262 PutvalOptions options = PutvalOptions.init) 263 { 264 static assert (is (T == struct) || is (T == class), 265 "Only struct and classes can be sent to Collectd"); 266 static assert (T.tupleof.length, 267 "Cannot send empty aggregate of type " 268 ~ T.stringof ~ " to Collectd"); 269 270 this.startNewRequest!("putval"); 271 272 this.format("PUTVAL ", id); 273 274 // Write the options 275 if (options.interval) 276 this.format(` interval="`, options.interval, `"`); 277 278 // Every line should start with the timestamp 279 this.format(" ", time(null)); 280 281 // Write all the data 282 foreach (idx, ref v; data.tupleof) 283 this.format(":", v); 284 285 // All lines need to end with a \n 286 this.format("\n"); 287 this.write(this.format_buff); 288 289 this.reader.popFront(this.socket, 0); 290 291 // Check for success 292 this.e.enforce(this.reader.front()[0 .. PutvalSuccessLineBegin.length] 293 == PutvalSuccessLineBegin, 294 this.reader.front()); 295 this.reader.popFront(); 296 if (!this.reader.empty()) 297 throw this.e.set("Unexpected line received from Collectd: ") 298 .append(this.reader.front()); 299 } 300 301 302 /*************************************************************************** 303 304 Read a status line as sent by collectd 305 306 Params: 307 line = The status line read from collectd. It should be in the form 308 "X Values found", where X is a number greater than 1, or 309 "1 Value found". 310 311 Throws: 312 `CollectdException` if the status line is non conformant 313 314 Returns: 315 On success the number of values found (that is, 'X' or 'Y') 316 317 ***************************************************************************/ 318 319 private size_t processStatusLine (cstring line) 320 { 321 size_t values = void; 322 auto spIdx = StringSearch!(false).locateChar(line, ' '); 323 324 auto vfound = line[spIdx .. $]; 325 if (vfound != " Values found" && vfound != " Value found") 326 throw this.e.set("Expected 'Value(s) found' in status line, got ") 327 .append(vfound); 328 329 auto vstring = line[0 .. spIdx]; 330 if (!toInteger(vstring, values)) 331 throw this.e.set("Could not convert '").append(vstring) 332 .append("' to integer"); 333 334 return values; 335 } 336 337 338 /*************************************************************************** 339 340 An instance to the socket used to communicate with collectd daemon 341 342 When reading from the socket, collectd always send *at least* one line, 343 the status line. Lines are always send in full. 344 The socket is a streaming (TCP) socket. 345 346 The minimal status line one can get is "0 Value found\n", which has a 347 length of 14. If we limit ourselves to a max value of 348 size_t.length, or 18_446_744_073_709_551_615 on 64 bits machines, 349 we can get a status line which size is comprised between 14 and 34. 350 351 ***************************************************************************/ 352 353 private UnixSocket socket; 354 355 356 /*************************************************************************** 357 358 Exception when a non-IO error happen while communicating with Collectd 359 360 ***************************************************************************/ 361 362 private CollectdException e; 363 364 365 /*************************************************************************** 366 367 Exception when an IO error happen 368 369 ***************************************************************************/ 370 371 private ErrnoException e_errno; 372 373 374 /*************************************************************************** 375 376 An instance of the line reader 377 378 ***************************************************************************/ 379 380 private SocketReader!() reader; 381 382 383 /*************************************************************************** 384 385 Internal buffer used to format non-string arguments 386 387 ***************************************************************************/ 388 389 private mstring format_buff; 390 391 392 /*************************************************************************** 393 394 What putval returns on success 395 396 ***************************************************************************/ 397 398 private static immutable istring PutvalSuccessLineBegin = "0 Success: "; 399 400 401 /*************************************************************************** 402 403 Write the content of an identifier to a buffer 404 405 Params: 406 identifier = Identifier instance to write 407 408 ***************************************************************************/ 409 410 private void formatIdentifier (ref const(Identifier) identifier) 411 { 412 verify(identifier.host.length != 0, "No host for identifier"); 413 verify(identifier.plugin.length != 0, "No plugin for identifier"); 414 verify(identifier.type.length != 0, "No type for identifier"); 415 416 auto pi = identifier.plugin_instance.length ? "-" : null; 417 auto ti = identifier.type_instance.length ? "-" : null; 418 419 this.format_buff ~= identifier.host; 420 this.format_buff ~= '/'; 421 this.format_buff ~= identifier.plugin; 422 this.format_buff ~= pi; 423 this.format_buff ~= identifier.plugin_instance; 424 this.format_buff ~= '/'; 425 this.format_buff ~= identifier.type; 426 this.format_buff ~= ti; 427 this.format_buff ~= identifier.type_instance; 428 } 429 430 431 /*************************************************************************** 432 433 Append stringified arguments into `this.format_buff` 434 435 Params: 436 args = Array of arguments to write to `this.format_buff`. 437 `Identifier`, string types and numeric values are supported. 438 439 ***************************************************************************/ 440 441 private void format (T...) (in T args) 442 { 443 scope sink = (const(char)[] v) 444 { 445 this.format_buff ~= v; 446 }; 447 448 foreach (arg; args) 449 { 450 static if (is(typeof(arg) : Unqual!(Identifier))) 451 this.formatIdentifier(arg); 452 else static if (is(typeof(arg) == struct) 453 || is(typeof(arg) == class) 454 || is(typeof(arg) == enum)) 455 static assert(0, "Cannot send an aggregate of type " 456 ~ typeof(arg).stringof ~ " to Collectd"); 457 else 458 sformat(sink, "{}", arg); 459 } 460 } 461 462 463 /*************************************************************************** 464 465 Helper to write data to a socket 466 467 Params: 468 str = String to send on the socket. 469 Usually a literal, or the formatted buffer. 470 471 Throws: 472 `CollectdException` if writing to the Collectd socket failed 473 474 ***************************************************************************/ 475 476 private void write (cstring str) 477 { 478 auto r = this.socket.write(str); 479 if (r != str.length) 480 throw this.e_errno.useGlobalErrno("write"); 481 } 482 483 484 /*************************************************************************** 485 486 Sanity check to ensure a request is started with a clean slate 487 488 Also reset the formatting buffer. 489 490 Params: 491 reqname = Name of the request that is started, for more informative 492 error message. 493 494 Throws: 495 `CollectdException` if there is data in the buffer. 496 497 ***************************************************************************/ 498 499 private void startNewRequest (istring reqname /*= __FUNCTION__*/) () 500 { 501 this.format_buff.length = 0; 502 assumeSafeAppend(this.format_buff); 503 504 this.e.enforce(this.reader.empty(), 505 "Called " ~ reqname ~ " with a non-empty buffer"); 506 } 507 } 508 509 510 /******************************************************************************* 511 512 Exception to be thrown when an error happens in Collectd 513 514 *******************************************************************************/ 515 516 public class CollectdException : Exception 517 { 518 mixin ReusableExceptionImplementation!(); 519 }