1 /*******************************************************************************
2 
3     Test suite for the unix socket listener.
4 
5     Copyright:
6         Copyright (c) 2009-2017 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
11         Alternatively, this file may be distributed under the terms of the Tango
12         3-Clause BSD License (see LICENSE_BSD.txt for details).
13 
14 *******************************************************************************/
15 
16 module integrationtest.unixlistener.main;
17 
18 import core.thread;
19 import core.stdc.errno;
20 import core.stdc.stdio;
21 import core.stdc..string;
22 import core.sys.posix.semaphore;
23 import core.sys.posix.stdlib;
24 import core.sys.posix.unistd;
25 import core.sys.posix.sys.mman;
26 import core.sys.posix.sys.wait;
27 
28 import ocean.core.Test;
29 import ocean.core.Enforce;
30 import ocean.io.select.EpollSelectDispatcher;
31 import ocean.meta.types.Qualifiers;
32 import ocean.net.server.unix.UnixListener;
33 import ocean.stdc.posix.sys.un;
34 import ocean.sys.socket.UnixSocket;
35 import ocean.task.Task;
36 import ocean.task.Scheduler;
37 import Integer = ocean.text.convert.Integer_tango;
38 import ocean.text.util.SplitIterator: ChrSplitIterator;
39 import ocean.text.util.StringC;
40 import ocean.sys.ErrnoException;
41 
42 /*******************************************************************************
43 
44     Semaphore to synchronize child and parent process. The child process should
45     connect to the unix domain socket, only after the parent one binded to it.
46 
47 *******************************************************************************/
48 
49 sem_t* start_semaphore;
50 
51 /*******************************************************************************
52 
53     Helper function to wait for the start_semaphore.
54 
55 *******************************************************************************/
56 
57 void waitForSemaphore ()
58 {
59     // Wait for the parent to send signals, then connect and start
60     // issuing commands
61     int ret;
62 
63     do
64     {
65         ret = sem_wait(start_semaphore);
66     }
67     while (ret == -1 && errno == EINTR);
68     enforce (ret == 0);
69 }
70 
71 /*******************************************************************************
72 
73     Body of a client process. Connects to the unix domain sockets and sends
74     some commands, some of which are interactive.
75 
76     Params:
77         socket_path = path to connect to.
78 
79 *******************************************************************************/
80 
81 void client_process (cstring socket_path)
82 {
83     mstring read_buffer;
84     UnixSocket client;
85 
86     cstring readData ()
87     {
88         read_buffer.length = 100;
89         assumeSafeAppend(read_buffer);
90 
91         auto buff = cast(void[])read_buffer;
92 
93         // receive some data
94         ssize_t read_bytes;
95 
96         do
97         {
98             read_bytes = client.recv(buff, 0);
99         }
100         while (read_bytes == -1 && errno == EINTR);
101 
102         enforce(read_bytes > 0);
103 
104         read_buffer.length = read_bytes;
105         assumeSafeAppend(read_buffer);
106         return read_buffer;
107     }
108 
109     waitForSemaphore();
110 
111     // Connect to a listening socket and keep it open.
112     auto local_sock_add = sockaddr_un.create(socket_path);
113     client = new UnixSocket();
114 
115     auto socket_fd = client.socket();
116     enforce(socket_fd >= 0, "socket() call failed!");
117 
118     auto connect_result = client.connect(&local_sock_add);
119     enforce(connect_result == 0, "connect() call failed.");
120 
121     client.write("increment 2\n");
122     client.write("increment 1\n");
123 
124     client.write("askMyName John Angie\n");
125     test!("==")(readData(), "first name? ");
126     client.write("John\n");
127     test!("==")(readData(), "second name? ");
128     client.write("Angie\n");
129 
130     client.write("askMeAgain John Angie");
131     client.write("\n");
132     test!("==")(readData(), "second name? ");
133     client.write("Angie\n");
134     test!("==")(readData(), "first name? ");
135     client.write("John\n");
136 
137     client.write("echo Joseph\n");
138     test!("==")(readData(), "Joseph");
139 
140     client.write("shutdown\n");
141     client.close();
142 }
143 
144 /*******************************************************************************
145 
146     Creates UnixListener, sends several commands, and confirms they were
147     processed.
148 
149 *******************************************************************************/
150 
151 version (unittest) {} else
152 int main ( )
153 {
154     // Since this is a one-off test, we will not care about destroying
155     // these
156     start_semaphore = cast(sem_t*)mmap(null,
157             sem_t.sizeof, PROT_READ | PROT_WRITE,
158             MAP_SHARED | MAP_ANON, -1, 0);
159 
160     if (start_semaphore is null)
161     {
162         return 1;
163     }
164 
165     if (sem_init(start_semaphore, 1, 0) == -1)
166     {
167         return 1;
168     }
169 
170     auto e = new ErrnoException;
171 
172     // Create a tmp directory
173     mstring dir_name = "/tmp/ocean_socket_testXXXXXX".dup;
174     char* tmp_dir = mkdtemp(dir_name.ptr);
175     enforce(tmp_dir == dir_name.ptr);
176     auto local_address = dir_name ~ "/ocean.socket";
177 
178     int child_pid = fork();
179     enforce(child_pid >= 0);
180 
181     if (child_pid == 0)
182     {
183         client_process(local_address);
184         return 0;
185     }
186     else
187     {
188         // Only parent cleans up
189         scope (exit)
190         {
191             auto filename = StringC.toCString(local_address);
192             if (access(filename, F_OK) != -1)
193             {
194                 if (unlink(filename) != 0)
195                 {
196                     throw e.useGlobalErrno("unlink");
197                 }
198             }
199 
200             if (rmdir(tmp_dir) != 0)
201             {
202                 throw e.useGlobalErrno("rmdir");
203             }
204         }
205 
206         auto epoll = new EpollSelectDispatcher();
207         auto unix_socket   = new UnixSocket;
208 
209         // Value to be incremented via command to server
210         // Needs to be 3 after end of tests.
211         int expected_value = 0;
212 
213         scope handleIncrementCommand = ( cstring args,
214                 void delegate ( cstring response ) send_response,
215                 void delegate ( ref mstring response ) wait_reply )
216         {
217             expected_value += Integer.parse(args);
218         };
219 
220         // Command shutting down the epoll
221         scope handleShutdown = ( cstring args,
222                 void delegate ( cstring response ) send_response )
223         {
224             epoll.shutdown();
225         };
226 
227         // test doesn't work in this callbacks, as the exceptions will be swallowed
228         int success_count = 0;
229 
230         // Interactive callback. This will ask the client for the two names,
231         // which should be the same as two arguments with which
232         scope handleAskMyName = ( cstring args,
233                 void delegate ( cstring response ) send_response,
234                 void delegate ( ref mstring response ) wait_reply )
235         {
236             scope i = new ChrSplitIterator(' ');
237             i.reset(args);
238             cstring first = i.next();
239             cstring second = i.remaining();
240             mstring response;
241             send_response("first name? "); wait_reply(response);
242             success_count += first == response ? 1 : 0;
243             send_response("second name? "); wait_reply(response);
244             success_count += second == response ? 1 : 0;
245         };
246 
247         // Interactive callback for the other command
248         // Used to check if we can execute more handlers
249         scope handleAskMyNameReverse = ( cstring args,
250                 void delegate ( cstring response ) send_response,
251                 void delegate ( ref mstring response ) wait_reply )
252         {
253             scope i = new ChrSplitIterator(' ');
254             i.reset(args);
255             cstring first = i.next();
256             cstring second = i.remaining();
257             mstring response;
258             send_response("second name? "); wait_reply(response);
259             success_count += second == response ? 1 : 0;
260             send_response("first name? "); wait_reply(response);
261             success_count += first == response ? 1 : 0;
262         };
263 
264         // Simple echo command, non-interactive
265         scope handleEcho = delegate ( cstring args,
266                 void delegate (cstring response) send_response)
267         {
268             send_response(args);
269         };
270 
271         auto unix_server   = new UnixListener(idup(local_address), epoll,
272                 ["echo"[]: handleEcho,
273                  "shutdown": handleShutdown],
274                 ["increment"[]: handleIncrementCommand,
275                  "askMyName": handleAskMyName,
276                  "askMeAgain": handleAskMyNameReverse]
277         );
278 
279         epoll.register(unix_server);
280 
281         // let the child process know it may connect & start
282         sem_post(start_semaphore);
283 
284         // Spin the server
285         epoll.eventLoop();
286 
287         // This will be reached only if "shutdown" command was successful.
288         test!("==")(expected_value, 3);
289         test!("==")(success_count, 4);
290 
291         // Let's reap the zombies
292 
293         int ret;
294         int status;
295 
296         do
297         {
298             ret = wait(&status);
299         }
300         while (ret == -1 && errno == EINTR);
301         enforce (ret == child_pid);
302     }
303 
304     return 0;
305 }