1 /******************************************************************************* 2 3 Test suite for the unix socket listener. 4 5 Copyright: 6 Copyright (c) 2009-2017 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module integrationtest.unixlistener.main; 17 18 import core.thread; 19 import core.stdc.errno; 20 import core.stdc.stdio; 21 import core.stdc..string; 22 import core.sys.posix.semaphore; 23 import core.sys.posix.stdlib; 24 import core.sys.posix.unistd; 25 import core.sys.posix.sys.mman; 26 import core.sys.posix.sys.wait; 27 28 import ocean.core.Test; 29 import ocean.core.Enforce; 30 import ocean.io.select.EpollSelectDispatcher; 31 import ocean.meta.types.Qualifiers; 32 import ocean.net.server.unix.UnixListener; 33 import ocean.stdc.posix.sys.un; 34 import ocean.sys.socket.UnixSocket; 35 import ocean.task.Task; 36 import ocean.task.Scheduler; 37 import Integer = ocean.text.convert.Integer_tango; 38 import ocean.text.util.SplitIterator: ChrSplitIterator; 39 import ocean.text.util.StringC; 40 import ocean.sys.ErrnoException; 41 42 /******************************************************************************* 43 44 Semaphore to synchronize child and parent process. The child process should 45 connect to the unix domain socket, only after the parent one binded to it. 46 47 *******************************************************************************/ 48 49 sem_t* start_semaphore; 50 51 /******************************************************************************* 52 53 Helper function to wait for the start_semaphore. 54 55 *******************************************************************************/ 56 57 void waitForSemaphore () 58 { 59 // Wait for the parent to send signals, then connect and start 60 // issuing commands 61 int ret; 62 63 do 64 { 65 ret = sem_wait(start_semaphore); 66 } 67 while (ret == -1 && errno == EINTR); 68 enforce (ret == 0); 69 } 70 71 /******************************************************************************* 72 73 Body of a client process. Connects to the unix domain sockets and sends 74 some commands, some of which are interactive. 75 76 Params: 77 socket_path = path to connect to. 78 79 *******************************************************************************/ 80 81 void client_process (cstring socket_path) 82 { 83 mstring read_buffer; 84 UnixSocket client; 85 86 cstring readData () 87 { 88 read_buffer.length = 100; 89 assumeSafeAppend(read_buffer); 90 91 auto buff = cast(void[])read_buffer; 92 93 // receive some data 94 ssize_t read_bytes; 95 96 do 97 { 98 read_bytes = client.recv(buff, 0); 99 } 100 while (read_bytes == -1 && errno == EINTR); 101 102 enforce(read_bytes > 0); 103 104 read_buffer.length = read_bytes; 105 assumeSafeAppend(read_buffer); 106 return read_buffer; 107 } 108 109 waitForSemaphore(); 110 111 // Connect to a listening socket and keep it open. 112 auto local_sock_add = sockaddr_un.create(socket_path); 113 client = new UnixSocket(); 114 115 auto socket_fd = client.socket(); 116 enforce(socket_fd >= 0, "socket() call failed!"); 117 118 auto connect_result = client.connect(&local_sock_add); 119 enforce(connect_result == 0, "connect() call failed."); 120 121 client.write("increment 2\n"); 122 client.write("increment 1\n"); 123 124 client.write("askMyName John Angie\n"); 125 test!("==")(readData(), "first name? "); 126 client.write("John\n"); 127 test!("==")(readData(), "second name? "); 128 client.write("Angie\n"); 129 130 client.write("askMeAgain John Angie"); 131 client.write("\n"); 132 test!("==")(readData(), "second name? "); 133 client.write("Angie\n"); 134 test!("==")(readData(), "first name? "); 135 client.write("John\n"); 136 137 client.write("echo Joseph\n"); 138 test!("==")(readData(), "Joseph"); 139 140 client.write("shutdown\n"); 141 client.close(); 142 } 143 144 /******************************************************************************* 145 146 Creates UnixListener, sends several commands, and confirms they were 147 processed. 148 149 *******************************************************************************/ 150 151 version (unittest) {} else 152 int main ( ) 153 { 154 // Since this is a one-off test, we will not care about destroying 155 // these 156 start_semaphore = cast(sem_t*)mmap(null, 157 sem_t.sizeof, PROT_READ | PROT_WRITE, 158 MAP_SHARED | MAP_ANON, -1, 0); 159 160 if (start_semaphore is null) 161 { 162 return 1; 163 } 164 165 if (sem_init(start_semaphore, 1, 0) == -1) 166 { 167 return 1; 168 } 169 170 auto e = new ErrnoException; 171 172 // Create a tmp directory 173 mstring dir_name = "/tmp/ocean_socket_testXXXXXX".dup; 174 char* tmp_dir = mkdtemp(dir_name.ptr); 175 enforce(tmp_dir == dir_name.ptr); 176 auto local_address = dir_name ~ "/ocean.socket"; 177 178 int child_pid = fork(); 179 enforce(child_pid >= 0); 180 181 if (child_pid == 0) 182 { 183 client_process(local_address); 184 return 0; 185 } 186 else 187 { 188 // Only parent cleans up 189 scope (exit) 190 { 191 auto filename = StringC.toCString(local_address); 192 if (access(filename, F_OK) != -1) 193 { 194 if (unlink(filename) != 0) 195 { 196 throw e.useGlobalErrno("unlink"); 197 } 198 } 199 200 if (rmdir(tmp_dir) != 0) 201 { 202 throw e.useGlobalErrno("rmdir"); 203 } 204 } 205 206 auto epoll = new EpollSelectDispatcher(); 207 auto unix_socket = new UnixSocket; 208 209 // Value to be incremented via command to server 210 // Needs to be 3 after end of tests. 211 int expected_value = 0; 212 213 scope handleIncrementCommand = ( cstring args, 214 void delegate ( cstring response ) send_response, 215 void delegate ( ref mstring response ) wait_reply ) 216 { 217 expected_value += Integer.parse(args); 218 }; 219 220 // Command shutting down the epoll 221 scope handleShutdown = ( cstring args, 222 void delegate ( cstring response ) send_response ) 223 { 224 epoll.shutdown(); 225 }; 226 227 // test doesn't work in this callbacks, as the exceptions will be swallowed 228 int success_count = 0; 229 230 // Interactive callback. This will ask the client for the two names, 231 // which should be the same as two arguments with which 232 scope handleAskMyName = ( cstring args, 233 void delegate ( cstring response ) send_response, 234 void delegate ( ref mstring response ) wait_reply ) 235 { 236 scope i = new ChrSplitIterator(' '); 237 i.reset(args); 238 cstring first = i.next(); 239 cstring second = i.remaining(); 240 mstring response; 241 send_response("first name? "); wait_reply(response); 242 success_count += first == response ? 1 : 0; 243 send_response("second name? "); wait_reply(response); 244 success_count += second == response ? 1 : 0; 245 }; 246 247 // Interactive callback for the other command 248 // Used to check if we can execute more handlers 249 scope handleAskMyNameReverse = ( cstring args, 250 void delegate ( cstring response ) send_response, 251 void delegate ( ref mstring response ) wait_reply ) 252 { 253 scope i = new ChrSplitIterator(' '); 254 i.reset(args); 255 cstring first = i.next(); 256 cstring second = i.remaining(); 257 mstring response; 258 send_response("second name? "); wait_reply(response); 259 success_count += second == response ? 1 : 0; 260 send_response("first name? "); wait_reply(response); 261 success_count += first == response ? 1 : 0; 262 }; 263 264 // Simple echo command, non-interactive 265 scope handleEcho = delegate ( cstring args, 266 void delegate (cstring response) send_response) 267 { 268 send_response(args); 269 }; 270 271 auto unix_server = new UnixListener(idup(local_address), epoll, 272 ["echo"[]: handleEcho, 273 "shutdown": handleShutdown], 274 ["increment"[]: handleIncrementCommand, 275 "askMyName": handleAskMyName, 276 "askMeAgain": handleAskMyNameReverse] 277 ); 278 279 epoll.register(unix_server); 280 281 // let the child process know it may connect & start 282 sem_post(start_semaphore); 283 284 // Spin the server 285 epoll.eventLoop(); 286 287 // This will be reached only if "shutdown" command was successful. 288 test!("==")(expected_value, 3); 289 test!("==")(success_count, 4); 290 291 // Let's reap the zombies 292 293 int ret; 294 int status; 295 296 do 297 { 298 ret = wait(&status); 299 } 300 while (ret == -1 && errno == EINTR); 301 enforce (ret == child_pid); 302 } 303 304 return 0; 305 }