1 /******************************************************************************* 2 3 An utility class to interact with collectd-unixsock plugin 4 5 This class is a simple wrapper around Collectd's functionalities, 6 providing parsing and communication means. 7 8 Most users will not want to use this module directly and should prefer 9 the high-level stats API provided in `ocean.util.log.Stats`. 10 11 See_Also: 12 https://collectd.org/documentation/manpages/collectd-unixsock.5.shtml 13 14 Copyright: 15 Copyright (c) 2015-2016 dunnhumby Germany GmbH. 16 All rights reserved. 17 18 License: 19 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 20 Alternatively, this file may be distributed under the terms of the Tango 21 3-Clause BSD License (see LICENSE_BSD.txt for details). 22 23 *******************************************************************************/ 24 25 module ocean.net.collectd.Collectd; 26 27 28 /******************************************************************************* 29 30 Usage example 31 32 *******************************************************************************/ 33 34 unittest 35 { 36 void sendCollectdData () 37 { 38 // Every call to collectd (but `listval`) needs to use an `Identifier`. 39 // See Collectd's documentation for more information. Here we create an 40 // app-global identifier. 41 Identifier id = 42 { 43 host: "example.com", 44 plugin: "http_server", 45 type: "requests", // how much traffic it handles 46 plugin_instance: "1", // the instance number 47 type_instance: "worker-1" 48 }; 49 50 // Note that if you have a Collectd-provided identifier, you can 51 // read it using `Identifier.create` 52 // Here we use the convenience overload that throws on error, however 53 // there is one version which returns a message if the parsing failed. 54 auto id2 = Identifier.create("sociomantic.com/http_server-1/requests-worker-1"); 55 56 // Construct a Collectd instance that connect() to the socket. 57 // If the connect() fails, an `ErrnoIOException` is thrown. 58 // The parameter is the path of the Collectd socket 59 auto collectd = new Collectd("/var/run/collectd.socket"); 60 61 // From this point on you can use the instance to talk to the socket. 62 // Once a function that returns a set of data is called (e.g. `listval`), 63 // no other function should be called until the result is fully 64 // processed, as this class internally uses a rotating buffer to 65 // minimize memory allocations. 66 // If a new request is started while the previous one isn't 67 // fully processed, a `CollectdException` will be thrown. 68 69 // When writing a value, you need a structure that match a definition 70 // in your `types.db` file. 71 // 72 // The documentation of `types.db` can be found here: 73 // https://collectd.org/documentation/manpages/types.db.5.shtml 74 // 75 // The name of the struct doesn't matter, only what's in `id`. 76 // To simplify the example, we use a struct that is defined by default 77 // in `types.db`. 78 // Note: the definition is `bytes value:GAUGE:0:U` 79 static struct Charge { double value; } 80 Charge charge = Charge(42.0); 81 82 // Write an entry to collectd. 83 collectd.putval(id, charge); 84 // Will send `PUTVAL current_unix_timestamp:42` on the wire 85 } 86 } 87 88 89 90 import ocean.transition; 91 import ocean.core.Enforce; 92 import ocean.core.Verify; 93 import ocean.core.Exception; 94 import ocean.stdc.posix.sys.un; 95 import core.stdc.time; // time 96 import ocean.stdc.posix.sys.types; // time_t 97 import ocean.sys.ErrnoException; 98 import core.sys.posix.sys.socket; // SOCK_DGRAM 99 import ocean.sys.socket.UnixSocket; 100 import ocean.text.Util; 101 import Float = ocean.text.convert.Float; 102 import ocean.text.convert.Formatter; 103 import ocean.text.convert.Integer; 104 import Conv = ocean.util.Convert; 105 import ocean.text.util.StringSearch; // locateChar 106 107 import ocean.net.collectd.SocketReader; 108 public import ocean.net.collectd.Identifier; 109 110 version (UnitTest) 111 { 112 import ocean.core.Test; 113 import ocean.io.Stdout : Stdout; 114 } 115 116 117 /******************************************************************************* 118 119 Collectd wrapper class 120 121 Encapsulate communication with the Collectd socket, as well as parsing 122 of its messages. 123 124 Note: 125 You must be careful when mixing calls. Returned data are transient 126 (sits in an internal buffer), and might get invalidated on the next 127 call to a member function. 128 129 For example, don't do: 130 131 ```` 132 Collectd inst = ...; 133 foreach (v; inst.listval()) 134 { 135 inst.getval!(Counter)(v); 136 } 137 ```` 138 Because this might invalidate the data returned by `listval()` on the 139 first call to `getval()` 140 141 Note: 142 PUTNOTIF is not implemented 143 144 *******************************************************************************/ 145 146 public final class Collectd 147 { 148 /*************************************************************************** 149 150 Values returned by 'listval' 151 152 ***************************************************************************/ 153 154 public static struct Value 155 { 156 /*********************************************************************** 157 158 The timestamp - as a floating point value - of the last update 159 160 ***********************************************************************/ 161 162 public double last_update; 163 164 165 /*********************************************************************** 166 167 An identifier the can be passed to `getval()` 168 169 ***********************************************************************/ 170 171 public Identifier identifier; 172 } 173 174 175 /*************************************************************************** 176 177 Options that can be passed to Putval 178 179 ***************************************************************************/ 180 181 public struct PutvalOptions 182 { 183 /*********************************************************************** 184 185 Gives the interval in which the data is being collected. 186 187 ***********************************************************************/ 188 189 public time_t interval; 190 } 191 192 193 /*************************************************************************** 194 195 Constructor 196 197 Params: 198 socket_path = Path of the local socket of the collectd daemon. 199 200 Throws: 201 If it can't create the socket or connect to the collectd daemon, 202 an Exception is thrown. 203 204 ***************************************************************************/ 205 206 public this (istring socket_path) 207 { 208 auto socketaddr = sockaddr_un.create(socket_path); 209 210 this.socket = new UnixSocket(); 211 212 this.e_errno = new ErrnoException(); 213 this.e = new CollectdException(256); 214 this.reader.e = this.e_errno; 215 216 auto sockRet = this.socket.socket(); 217 if (sockRet < 0) 218 throw this.e_errno.useGlobalErrno("socket"); 219 220 if (auto connectRet = this.socket.connect(&socketaddr)) 221 throw this.e_errno.useGlobalErrno("connect"); 222 223 // This ought to be enough for any numeric argument 224 this.format_buff = new mstring(256); 225 this.format_buff.length = 0; 226 enableStomping(this.format_buff); 227 } 228 229 230 /*************************************************************************** 231 232 Submits one or more values, identified by Identifier to the daemon 233 which will dispatch it to all its write-plugins 234 235 Params: 236 id = Uniquely identifies what value is being collected. 237 Note the `type` must be defined in `types.db`. 238 239 data = A struct containing only numeric types. Values can either 240 be an integer if the data-source is a counter, 241 or a double if the data-source is of type "gauge". 242 NaN and infinity are translated to undefined values ('U'). 243 The current UNIX time is submitted along. 244 245 options = The options list is an optional parameter, where each 246 option is sent as a key-value-pair. 247 See `PutvalOptions`'s documentation for a list 248 of all currently recognized options, however be aware 249 that an outdated Collectd which doesn't support all 250 the options will silently ignore them. 251 252 Throws: 253 `ErrnoException` if writing to the socket produced an error, 254 or `CollectdException` if an error happened while communicating 255 (Collectd returns an error, the internal buffer wasn't empty (which 256 means the caller haven't fully processed the last query), 257 or we get unexpected / inconsistent data), or if more than 258 10 millions records where found 259 260 ***************************************************************************/ 261 262 public void putval (T) (Identifier id, ref T data, 263 PutvalOptions options = PutvalOptions.init) 264 { 265 static assert (is (T == struct) || is (T == class), 266 "Only struct and classes can be sent to Collectd"); 267 static assert (T.tupleof.length, 268 "Cannot send empty aggregate of type " 269 ~ T.stringof ~ " to Collectd"); 270 271 this.startNewRequest!("putval"); 272 273 this.format("PUTVAL ", id); 274 275 // Write the options 276 if (options.interval) 277 this.format(` interval="`, options.interval, `"`); 278 279 // Every line should start with the timestamp 280 this.format(" ", time(null)); 281 282 // Write all the data 283 foreach (idx, ref v; data.tupleof) 284 this.format(":", v); 285 286 // All lines need to end with a \n 287 this.format("\n"); 288 this.write(this.format_buff); 289 290 this.reader.popFront(this.socket, 0); 291 292 // Check for success 293 this.e.enforce(this.reader.front()[0 .. PutvalSuccessLineBegin.length] 294 == PutvalSuccessLineBegin, 295 this.reader.front()); 296 this.reader.popFront(); 297 if (!this.reader.empty()) 298 throw this.e.set("Unexpected line received from Collectd: ") 299 .append(this.reader.front()); 300 } 301 302 303 /*************************************************************************** 304 305 Read a status line as sent by collectd 306 307 Params: 308 line = The status line read from collectd. It should be in the form 309 "X Values found", where X is a number greater than 1, or 310 "1 Value found". 311 312 Throws: 313 `CollectdException` if the status line is non conformant 314 315 Returns: 316 On success the number of values found (that is, 'X' or 'Y') 317 318 ***************************************************************************/ 319 320 private size_t processStatusLine (cstring line) 321 { 322 size_t values = void; 323 auto spIdx = StringSearch!(false).locateChar(line, ' '); 324 325 auto vfound = line[spIdx .. $]; 326 if (vfound != " Values found" && vfound != " Value found") 327 throw this.e.set("Expected 'Value(s) found' in status line, got ") 328 .append(vfound); 329 330 auto vstring = line[0 .. spIdx]; 331 if (!toInteger(vstring, values)) 332 throw this.e.set("Could not convert '").append(vstring) 333 .append("' to integer"); 334 335 return values; 336 } 337 338 339 /*************************************************************************** 340 341 An instance to the socket used to communicate with collectd daemon 342 343 When reading from the socket, collectd always send *at least* one line, 344 the status line. Lines are always send in full. 345 The socket is a streaming (TCP) socket. 346 347 The minimal status line one can get is "0 Value found\n", which has a 348 length of 14. If we limit ourselves to a max value of 349 size_t.length, or 18_446_744_073_709_551_615 on 64 bits machines, 350 we can get a status line which size is comprised between 14 and 34. 351 352 ***************************************************************************/ 353 354 private UnixSocket socket; 355 356 357 /*************************************************************************** 358 359 Exception when a non-IO error happen while communicating with Collectd 360 361 ***************************************************************************/ 362 363 private CollectdException e; 364 365 366 /*************************************************************************** 367 368 Exception when an IO error happen 369 370 ***************************************************************************/ 371 372 private ErrnoException e_errno; 373 374 375 /*************************************************************************** 376 377 An instance of the line reader 378 379 ***************************************************************************/ 380 381 private SocketReader!() reader; 382 383 384 /*************************************************************************** 385 386 Internal buffer used to format non-string arguments 387 388 ***************************************************************************/ 389 390 private mstring format_buff; 391 392 393 /*************************************************************************** 394 395 What putval returns on success 396 397 ***************************************************************************/ 398 399 private static immutable istring PutvalSuccessLineBegin = "0 Success: "; 400 401 402 /*************************************************************************** 403 404 Write the content of an identifier to a buffer 405 406 Params: 407 identifier = Identifier instance to write 408 409 ***************************************************************************/ 410 411 private void formatIdentifier (ref Const!(Identifier) identifier) 412 { 413 verify(identifier.host.length != 0, "No host for identifier"); 414 verify(identifier.plugin.length != 0, "No plugin for identifier"); 415 verify(identifier.type.length != 0, "No type for identifier"); 416 417 auto pi = identifier.plugin_instance.length ? "-" : null; 418 auto ti = identifier.type_instance.length ? "-" : null; 419 420 this.format_buff ~= identifier.host; 421 this.format_buff ~= '/'; 422 this.format_buff ~= identifier.plugin; 423 this.format_buff ~= pi; 424 this.format_buff ~= identifier.plugin_instance; 425 this.format_buff ~= '/'; 426 this.format_buff ~= identifier.type; 427 this.format_buff ~= ti; 428 this.format_buff ~= identifier.type_instance; 429 } 430 431 432 /*************************************************************************** 433 434 Append stringified arguments into `this.format_buff` 435 436 Params: 437 args = Array of arguments to write to `this.format_buff`. 438 `Identifier`, string types and numeric values are supported. 439 440 ***************************************************************************/ 441 442 private void format (T...) (in T args) 443 { 444 scope sink = (Const!(char)[] v) 445 { 446 this.format_buff ~= v; 447 }; 448 449 foreach (arg; args) 450 { 451 static if (is(typeof(arg) : Unqual!(Identifier))) 452 this.formatIdentifier(arg); 453 else static if (is(typeof(arg) == struct) 454 || is(typeof(arg) == class) 455 || is(typeof(arg) == enum)) 456 static assert(0, "Cannot send an aggregate of type " 457 ~ typeof(arg).stringof ~ " to Collectd"); 458 else 459 sformat(sink, "{}", arg); 460 } 461 } 462 463 464 /*************************************************************************** 465 466 Helper to write data to a socket 467 468 Params: 469 str = String to send on the socket. 470 Usually a literal, or the formatted buffer. 471 472 Throws: 473 `CollectdException` if writing to the Collectd socket failed 474 475 ***************************************************************************/ 476 477 private void write (cstring str) 478 { 479 auto r = this.socket.write(str); 480 if (r != str.length) 481 throw this.e_errno.useGlobalErrno("write"); 482 } 483 484 485 /*************************************************************************** 486 487 Sanity check to ensure a request is started with a clean slate 488 489 Also reset the formatting buffer. 490 491 Params: 492 reqname = Name of the request that is started, for more informative 493 error message. 494 495 Throws: 496 `CollectdException` if there is data in the buffer. 497 498 ***************************************************************************/ 499 500 private void startNewRequest (istring reqname /*= __FUNCTION__*/) () 501 { 502 this.format_buff.length = 0; 503 enableStomping(this.format_buff); 504 505 this.e.enforce(this.reader.empty(), 506 "Called " ~ reqname ~ " with a non-empty buffer"); 507 } 508 } 509 510 511 /******************************************************************************* 512 513 Exception to be thrown when an error happens in Collectd 514 515 *******************************************************************************/ 516 517 public class CollectdException : Exception 518 { 519 mixin ReusableExceptionImplementation!(); 520 }