1 /*******************************************************************************
2 
3     Test suite for the unix socket listener.
4 
5     Copyright:
6         Copyright (c) 2009-2017 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module integrationtest.unixlistener.main;
17 
18 import ocean.transition;
19 
20 import core.thread;
21 import core.stdc.errno;
22 import core.stdc.stdio;
23 import core.sys.posix.stdlib;
24 import core.sys.posix.semaphore;
25 import core.sys.posix.sys.mman;
26 import core.sys.posix.unistd;
27 import core.sys.posix.sys.wait;
28 
29 import ocean.core.Test;
30 import ocean.core.Enforce;
31 import ocean.core.Time;
32 import ocean.io.select.EpollSelectDispatcher;
33 import core.stdc.errno: ECONNREFUSED;
34 import ocean.stdc.posix.sys.un;
35 import ocean.sys.socket.UnixSocket;
36 import ocean.net.server.unix.UnixListener;
37 import Integer = ocean.text.convert.Integer_tango;
38 import ocean.text.util.SplitIterator: ChrSplitIterator;
39 import ocean.text.util.StringC;
40 import ocean.task.Task;
41 import ocean.task.Scheduler;
42 import ocean.sys.ErrnoException;
43 import core.stdc.string;
44 
45 static if (!is(typeof(mkdtemp)))
46 {
47     extern (C) char* mkdtemp(char*);
48 }
49 
50 /*******************************************************************************
51 
52     Semaphore to synchronize child and parent process. The child process should
53     connect to the unix domain socket, only after the parent one binded to it.
54 
55 *******************************************************************************/
56 
57 sem_t* start_semaphore;
58 
59 /*******************************************************************************
60 
61     Helper function to wait for the start_semaphore.
62 
63 *******************************************************************************/
64 
65 void waitForSemaphore ()
66 {
67     // Wait for the parent to send signals, then connect and start
68     // issuing commands
69     int ret;
70 
71     do
72     {
73         ret = sem_wait(start_semaphore);
74     }
75     while (ret == -1 && errno == EINTR);
76     enforce (ret == 0);
77 }
78 
79 /*******************************************************************************
80 
81     Body of a client process. Connects to the unix domain sockets and sends
82     some commands, some of which are interactive.
83 
84     Params:
85         socket_path = path to connect to.
86 
87 *******************************************************************************/
88 
89 void client_process (cstring socket_path)
90 {
91     mstring read_buffer;
92     UnixSocket client;
93 
94     cstring readData ()
95     {
96         read_buffer.length = 100;
97         enableStomping(read_buffer);
98 
99         auto buff = cast(void[])read_buffer;
100 
101         // receive some data
102         ssize_t read_bytes;
103 
104         do
105         {
106             read_bytes = client.recv(buff, 0);
107         }
108         while (read_bytes == -1 && errno == EINTR);
109 
110         enforce(read_bytes > 0);
111 
112         read_buffer.length = read_bytes;
113         enableStomping(read_buffer);
114         return read_buffer;
115     }
116 
117     waitForSemaphore();
118 
119     // Connect to a listening socket and keep it open.
120     auto local_sock_add = sockaddr_un.create(socket_path);
121     client = new UnixSocket();
122 
123     auto socket_fd = client.socket();
124     enforce(socket_fd >= 0, "socket() call failed!");
125 
126     auto connect_result = client.connect(&local_sock_add);
127     enforce(connect_result == 0, "connect() call failed.");
128 
129     client.write("increment 2\n");
130     client.write("increment 1\n");
131 
132     client.write("askMyName John Angie\n");
133     test!("==")(readData(), "first name? ");
134     client.write("John\n");
135     test!("==")(readData(), "second name? ");
136     client.write("Angie\n");
137 
138     client.write("askMeAgain John Angie");
139     client.write("\n");
140     test!("==")(readData(), "second name? ");
141     client.write("Angie\n");
142     test!("==")(readData(), "first name? ");
143     client.write("John\n");
144 
145     client.write("echo Joseph\n");
146     test!("==")(readData(), "Joseph");
147 
148     client.write("shutdown\n");
149     client.close();
150 }
151 
152 /*******************************************************************************
153 
154     Creates UnixListener, sends several commands, and confirms they were
155     processed.
156 
157 *******************************************************************************/
158 
159 version(UnitTest) {} else
160 int main ( )
161 {
162     // Since this is a one-off test, we will not care about destroying
163     // these
164     start_semaphore = cast(sem_t*)mmap(null,
165             sem_t.sizeof, PROT_READ | PROT_WRITE,
166             MAP_SHARED | MAP_ANON, -1, 0);
167 
168     if (start_semaphore is null)
169     {
170         return 1;
171     }
172 
173     if (sem_init(start_semaphore, 1, 0) == -1)
174     {
175         return 1;
176     }
177 
178     auto e = new ErrnoException;
179 
180     // Create a tmp directory
181     mstring dir_name = "/tmp/ocean_socket_testXXXXXX".dup;
182     char* tmp_dir = mkdtemp(dir_name.ptr);
183     enforce(tmp_dir == dir_name.ptr);
184     auto local_address = dir_name ~ "/ocean.socket";
185 
186     int child_pid = fork();
187     enforce(child_pid >= 0);
188 
189     if (child_pid == 0)
190     {
191         client_process(local_address);
192         return 0;
193     }
194     else
195     {
196         // Only parent cleans up
197         scope (exit)
198         {
199             auto filename = StringC.toCString(local_address);
200             if (access(filename, F_OK) != -1)
201             {
202                 if (unlink(filename) != 0)
203                 {
204                     throw e.useGlobalErrno("unlink");
205                 }
206             }
207 
208             if (rmdir(tmp_dir) != 0)
209             {
210                 throw e.useGlobalErrno("rmdir");
211             }
212         }
213 
214         auto epoll = new EpollSelectDispatcher();
215         auto unix_socket   = new UnixSocket;
216 
217         // Value to be incremented via command to server
218         // Needs to be 3 after end of tests.
219         int expected_value = 0;
220 
221         scope handleIncrementCommand = ( cstring args,
222                 void delegate ( cstring response ) send_response,
223                 void delegate ( ref mstring response ) wait_reply )
224         {
225             expected_value += Integer.parse(args);
226         };
227 
228         // Command shutting down the epoll
229         scope handleShutdown = ( cstring args,
230                 void delegate ( cstring response ) send_response )
231         {
232             epoll.shutdown();
233         };
234 
235         // test doesn't work in this callbacks, as the exceptions will be swallowed
236         int success_count = 0;
237 
238         // Interactive callback. This will ask the client for the two names,
239         // which should be the same as two arguments with which
240         scope handleAskMyName = ( cstring args,
241                 void delegate ( cstring response ) send_response,
242                 void delegate ( ref mstring response ) wait_reply )
243         {
244             scope i = new ChrSplitIterator(' ');
245             i.reset(args);
246             cstring first = i.next();
247             cstring second = i.remaining();
248             mstring response;
249             send_response("first name? "); wait_reply(response);
250             success_count += first == response ? 1 : 0;
251             send_response("second name? "); wait_reply(response);
252             success_count += second == response ? 1 : 0;
253         };
254 
255         // Interactive callback for the other command
256         // Used to check if we can execute more handlers
257         scope handleAskMyNameReverse = ( cstring args,
258                 void delegate ( cstring response ) send_response,
259                 void delegate ( ref mstring response ) wait_reply )
260         {
261             scope i = new ChrSplitIterator(' ');
262             i.reset(args);
263             cstring first = i.next();
264             cstring second = i.remaining();
265             mstring response;
266             send_response("second name? "); wait_reply(response);
267             success_count += second == response ? 1 : 0;
268             send_response("first name? "); wait_reply(response);
269             success_count += first == response ? 1 : 0;
270         };
271 
272         // Simple echo command, non-interactive
273         scope handleEcho = delegate ( cstring args,
274                 void delegate (cstring response) send_response)
275         {
276             send_response(args);
277         };
278 
279         auto unix_server   = new UnixListener(idup(local_address), epoll,
280                 ["echo"[]: handleEcho,
281                  "shutdown": handleShutdown],
282                 ["increment"[]: handleIncrementCommand,
283                  "askMyName": handleAskMyName,
284                  "askMeAgain": handleAskMyNameReverse]
285         );
286 
287         epoll.register(unix_server);
288 
289         // let the child process know it may connect & start
290         sem_post(start_semaphore);
291 
292         // Spin the server
293         epoll.eventLoop();
294 
295         // This will be reached only if "shutdown" command was successful.
296         test!("==")(expected_value, 3);
297         test!("==")(success_count, 4);
298 
299         // Let's reap the zombies
300 
301         int ret;
302         int status;
303 
304         do
305         {
306             ret = wait(&status);
307         }
308         while (ret == -1 && errno == EINTR);
309         enforce (ret == child_pid);
310     }
311 
312     return 0;
313 }