1 /*******************************************************************************
2 
3     Test-suite for stat collection using prometheus.
4 
5     This test uses a TCP socket connection to `localhost:8080`.
6 
7     Copyright: Copyright (c) 2019 dunnhumby Germany GmbH. All rights reserved
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module integrationtest.prometheusstats.main;
17 
18 import ocean.meta.types.Qualifiers;
19 
20 import ocean.io.select.EpollSelectDispatcher;
21 import ocean.task.Task;
22 import ocean.task.Scheduler;
23 import ocean.text.convert.Formatter;
24 import ocean.sys.socket.IPSocket;
25 import ocean.sys.ErrnoException;
26 import core.stdc.errno;
27 import core.stdc.stdlib;
28 
29 import ocean.util.prometheus.collector.Collector;
30 import ocean.util.prometheus.collector.CollectorRegistry;
31 import ocean.util.prometheus.server.PrometheusListener;
32 
33 import ocean.io.Stdout;
34 
35 /// A class containing structs representing stats and their labels, a method
36 /// that can be added to the CollectorRegistry, followed by a textual
37 /// representation of the stats mock-collected here.
38 class PrometheusStats
39 {
40     ///
41     struct Statistics
42     {
43         ulong up_time_s;
44         size_t count;
45         float ratio;
46         double fraction;
47         real very_real;
48     }
49 
50     ///
51     struct Labels
52     {
53         hash_t id;
54         cstring job;
55         float perf;
56     }
57 
58     ///
59     void collect ( Collector collector )
60     {
61         collector.collect(
62             Statistics(3600, 347, 3.14, 6.023, 0.43),
63             Labels(1_235_813, "ocean", 3.14159));
64     }
65 
66     ///
67     static istring collection_text =
68         "up_time_s {id=\"1235813\",job=\"ocean\",perf=\"3.14159\"} 3600\n" ~
69         "count {id=\"1235813\",job=\"ocean\",perf=\"3.14159\"} 347\n" ~
70         "ratio {id=\"1235813\",job=\"ocean\",perf=\"3.14159\"} 3.14\n" ~
71         "fraction {id=\"1235813\",job=\"ocean\",perf=\"3.14159\"} 6.023\n" ~
72         "very_real {id=\"1235813\",job=\"ocean\",perf=\"3.14159\"} 0.43\n";
73 }
74 
75 /// HTTP client task. It sends one HTTP GET request to the prometheus endpoint
76 /// and receives a respone that should contain the expected stats, as provided
77 /// in `PrometheusStats.collection_text`.
78 class ClientTask: Task
79 {
80     import Finder = ocean.core.array.Search;
81     import ocean.core.Test: test;
82     import ocean.io.select.protocol.task.TaskSelectTransceiver;
83     import ocean.io.select.protocol.generic.ErrnoIOException: SocketError;
84 
85     public Exception to_report;
86 
87     private TaskSelectTransceiver tst;
88 
89     private IPSocket!() socket;
90     private SocketError socket_err;
91     private IPSocket!().InetAddress srv_address;
92 
93     private mstring response_msg;
94 
95     ///
96     this ( IPSocket!().InetAddress srv_address )
97     {
98         this.socket = new IPSocket!();
99         this.socket_err = new SocketError(socket);
100 
101         this.tst = new TaskSelectTransceiver(socket, socket_err, socket_err);
102 
103         this.srv_address = srv_address;
104     }
105 
106     ///
107     override void run ( )
108     {
109         try
110         {
111             this.socket_err.enforce(
112                 this.socket.tcpSocket(true) >= 0, "", "socket");
113 
114             connect(this.tst, delegate (IPSocket!() socket) {
115                 return !socket.connect(this.srv_address.addr);
116             });
117 
118             this.tst.write(
119                 "GET /metrics HTTP/1.1\r\nHost: oceantest.net\r\n\r\n");
120 
121             this.tst.readConsume(&this.consume);
122             test!("==")(this.response_msg, PrometheusStats.collection_text);
123         }
124         catch (Exception ex)
125         {
126             this.to_report = ex;
127         }
128     }
129 
130     ///
131     size_t consume ( void[] data )
132     {
133         cstring header_end = "\r\n\r\n";
134         auto header_end_idx = Finder.find(cast(char[])data, header_end);
135 
136         if (header_end_idx == data.length)
137         {
138             return data.length + 1;
139         }
140 
141         sformat(this.response_msg, "{}",
142             cast(char[])data[header_end_idx + 4 .. $]);
143         return 0;
144     }
145 }
146 
147 /*******************************************************************************
148 
149     Runs the server, which listens to one request on the prometheus endpoint,
150     and exits.
151 
152     Returns:
153         `EXIT_SUCCESS`
154 
155 *******************************************************************************/
156 
157 version (unittest) {} else
158 int main ( )
159 {
160     initScheduler(SchedulerConfiguration.init);
161 
162     auto stats = new PrometheusStats();
163     auto registry = new CollectorRegistry([&stats.collect]);
164 
165     IPSocket!().InetAddress srv_address;
166 
167     auto listener = new PrometheusListener(srv_address("127.0.0.1", 8080),
168         new IPSocket!(), registry, theScheduler.epoll);
169 
170     auto client = new ClientTask(srv_address);
171     client.terminationHook = delegate {
172         theScheduler.epoll.unregister(listener);
173     };
174 
175     listener.registerEventHandling(theScheduler.epoll);
176     theScheduler.schedule(client);
177     theScheduler.eventLoop();
178 
179     if (client.to_report !is null)
180     {
181         throw client.to_report;
182     }
183 
184     return EXIT_SUCCESS;
185 }