1 /*******************************************************************************
2 
3     Contains a listener for scrape requests by Prometheus.
4 
5     Please refer to the unittest in this module for an example. A more elaborate
6     example can be found in `integrationtest.prometheusstats.main`.
7 
8     Copyright:
9         Copyright (c) 2019 dunnhumby Germany GmbH.
10         All rights reserved
11 
12     License:
13         Boost Software License Version 1.0. See LICENSE.txt for details.
14 
15 *******************************************************************************/
16 
17 module ocean.util.prometheus.server.PrometheusListener;
18 
19 import ocean.io.select.EpollSelectDispatcher;
20 import ocean.net.server.SelectListener;
21 import ocean.util.prometheus.collector.CollectorRegistry;
22 import ocean.util.prometheus.server.PrometheusHandler;
23 
24 /*******************************************************************************
25 
26     A listener for Prometheus' stat scrape.
27 
28     Once instantiated with the set of callbacks that collect stats for given
29     stats and labels, the callbacks will be called for every incoming prometheus
30     request, and the accumulated stats will be appended to the response message.
31 
32     Derives from `SelectListener` with four generic parameters, the last three
33     of which are used to instantiate the handler used by this listener.
34 
35 *******************************************************************************/
36 
37 public class PrometheusListener :
38     SelectListener!(PrometheusHandler, CollectorRegistry, EpollSelectDispatcher,
39         size_t)
40 {
41     import core.sys.posix.sys.socket : sockaddr, time_t;
42     import ocean.io.select.client.TimerEvent : TimerEvent;
43     import ocean.net.http.HttpConnectionHandler : HttpConnectionHandler;
44     import ocean.sys.socket.model.ISocket : ISocket;
45     import ocean.text.convert.Formatter : sformat;
46     import ocean.util.log.Logger : Logger, Log;
47 
48     import ocean.meta.types.Qualifiers;
49 
50     /// A static logger for logging information about connections.
51     private static Logger log;
52     static this ( )
53     {
54         PrometheusListener.log =
55             Log.lookup("ocean.util.prometheus.server.PrometheusListener");
56     }
57 
58     /// A buffer used for logging information about connections.
59     private mstring connection_log_buf;
60 
61     /***************************************************************************
62 
63         Constructor
64 
65         Creates the server socket and registers it for incoming connections.
66 
67         Params:
68             address            = The address of the socket.
69             socket             = The server socket.
70             collector_registry = The CollectorRegistry instance, containing
71                                  references to the collection callbacks.
72             epoll              = The EpollSelectDispatcher instance to use in
73                                  response handler(s).
74             stack_size         = The fiber stack size to use. Defaults to
75                                  `HttpConnectionHandler.default_stack_size`.
76 
77     ***************************************************************************/
78 
79     public this ( sockaddr* address, ISocket socket,
80         CollectorRegistry collector_registry, EpollSelectDispatcher epoll,
81         size_t stack_size = HttpConnectionHandler.default_stack_size )
82     {
83         super(address, socket, collector_registry, epoll, stack_size);
84     }
85 
86     /***************************************************************************
87 
88         Registers the listener with a given epoll, so that it can be activated
89         in the latter's event loop.
90 
91         Params:
92             epoll = The epoll to register the server with.
93 
94     ***************************************************************************/
95 
96     public void registerEventHandling ( EpollSelectDispatcher epoll )
97     {
98         epoll.register(this);
99     }
100 
101     /***************************************************************************
102 
103         Logs the information about connections to the log file.
104 
105         Overriden from the base instance to specify the logger's name to be
106         this module's fully-qualified name.
107 
108     ***************************************************************************/
109 
110     override public void connectionLog ( )
111     {
112         auto conns = this.poolInfo;
113 
114         PrometheusListener.log.info("Connection pool: {} busy, {} idle",
115             conns.num_busy, conns.num_idle);
116 
117         foreach ( i, conn; conns )
118         {
119             this.connection_log_buf.length = 0;
120             sformat(this.connection_log_buf, "{}: ", i);
121 
122             conn.formatInfo(this.connection_log_buf);
123 
124             PrometheusListener.log.info(this.connection_log_buf);
125         }
126     }
127 }
128 
129 version (unittest)
130 {
131     import core.sys.posix.sys.socket;
132     import ocean.sys.socket.IPSocket;
133     import ocean.util.prometheus.collector.Collector;
134 }
135 
136 /// Test and demonstrate instantiation of a `PrometheusListener` object
137 unittest
138 {
139     class ExampleStats
140     {
141         import core.sys.posix.unistd;
142         import ocean.sys.Stats : CpuMemoryStats;
143 
144         CpuMemoryStats system_stats;
145 
146         struct Labels
147         {
148             pid_t pid;
149         }
150 
151         this ( ) { this.system_stats = new CpuMemoryStats(); }
152 
153         void collect ( Collector collector )
154         {
155             // CpuMemoryStats.collect() returns a struct whose fields we want
156             // to collect via Prometheus.
157             collector.collect(this.system_stats.collect(), Labels( getpid() ));
158         }
159 
160         void collectp ( Collector collector )
161         {
162             collector.collect(this.system_stats.collect(), Labels( getppid() ));
163         }
164     }
165 
166     auto epoll = new EpollSelectDispatcher();
167 
168     auto example = new ExampleStats();
169     auto registry = new CollectorRegistry(
170         [&example.collect, &example.collectp]);
171 
172     IPSocket!().InetAddress srv_address;
173     sockaddr* socket_addrress = srv_address("127.0.0.1", 8080);
174     auto listener = new PrometheusListener(socket_addrress, new IPSocket!(),
175         registry, epoll);
176 
177     // The following line registers the server with the dispatcher, which in
178     // effect makes it operational, subject to the latter's scheduling.
179     // (Commented out here to not start a server instance in a unit test.)
180 
181     // listener.registerEventHandling(epoll);
182 }