1 /******************************************************************************* 2 3 Test suite for the unix socket listener. 4 5 Copyright: 6 Copyright (c) 2009-2017 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module integrationtest.unixlistener.main; 17 18 import ocean.transition; 19 20 import core.thread; 21 import core.stdc.errno; 22 import core.stdc.stdio; 23 import core.sys.posix.stdlib; 24 import core.sys.posix.semaphore; 25 import core.sys.posix.sys.mman; 26 import core.sys.posix.unistd; 27 import core.sys.posix.sys.wait; 28 29 import ocean.core.Test; 30 import ocean.core.Enforce; 31 import ocean.core.Time; 32 import ocean.io.select.EpollSelectDispatcher; 33 import core.stdc.errno: ECONNREFUSED; 34 import ocean.stdc.posix.sys.un; 35 import ocean.sys.socket.UnixSocket; 36 import ocean.net.server.unix.UnixListener; 37 import Integer = ocean.text.convert.Integer_tango; 38 import ocean.text.util.SplitIterator: ChrSplitIterator; 39 import ocean.text.util.StringC; 40 import ocean.task.Task; 41 import ocean.task.Scheduler; 42 import ocean.sys.ErrnoException; 43 import core.stdc.string; 44 45 static if (!is(typeof(mkdtemp))) 46 { 47 extern (C) char* mkdtemp(char*); 48 } 49 50 /******************************************************************************* 51 52 Semaphore to synchronize child and parent process. The child process should 53 connect to the unix domain socket, only after the parent one binded to it. 54 55 *******************************************************************************/ 56 57 sem_t* start_semaphore; 58 59 /******************************************************************************* 60 61 Helper function to wait for the start_semaphore. 62 63 *******************************************************************************/ 64 65 void waitForSemaphore () 66 { 67 // Wait for the parent to send signals, then connect and start 68 // issuing commands 69 int ret; 70 71 do 72 { 73 ret = sem_wait(start_semaphore); 74 } 75 while (ret == -1 && errno == EINTR); 76 enforce (ret == 0); 77 } 78 79 /******************************************************************************* 80 81 Body of a client process. Connects to the unix domain sockets and sends 82 some commands, some of which are interactive. 83 84 Params: 85 socket_path = path to connect to. 86 87 *******************************************************************************/ 88 89 void client_process (cstring socket_path) 90 { 91 mstring read_buffer; 92 UnixSocket client; 93 94 cstring readData () 95 { 96 read_buffer.length = 100; 97 enableStomping(read_buffer); 98 99 auto buff = cast(void[])read_buffer; 100 101 // receive some data 102 ssize_t read_bytes; 103 104 do 105 { 106 read_bytes = client.recv(buff, 0); 107 } 108 while (read_bytes == -1 && errno == EINTR); 109 110 enforce(read_bytes > 0); 111 112 read_buffer.length = read_bytes; 113 enableStomping(read_buffer); 114 return read_buffer; 115 } 116 117 waitForSemaphore(); 118 119 // Connect to a listening socket and keep it open. 120 auto local_sock_add = sockaddr_un.create(socket_path); 121 client = new UnixSocket(); 122 123 auto socket_fd = client.socket(); 124 enforce(socket_fd >= 0, "socket() call failed!"); 125 126 auto connect_result = client.connect(&local_sock_add); 127 enforce(connect_result == 0, "connect() call failed."); 128 129 client.write("increment 2\n"); 130 client.write("increment 1\n"); 131 132 client.write("askMyName John Angie\n"); 133 test!("==")(readData(), "first name? "); 134 client.write("John\n"); 135 test!("==")(readData(), "second name? "); 136 client.write("Angie\n"); 137 138 client.write("askMeAgain John Angie"); 139 client.write("\n"); 140 test!("==")(readData(), "second name? "); 141 client.write("Angie\n"); 142 test!("==")(readData(), "first name? "); 143 client.write("John\n"); 144 145 client.write("echo Joseph\n"); 146 test!("==")(readData(), "Joseph"); 147 148 client.write("shutdown\n"); 149 client.close(); 150 } 151 152 /******************************************************************************* 153 154 Creates UnixListener, sends several commands, and confirms they were 155 processed. 156 157 *******************************************************************************/ 158 159 version(UnitTest) {} else 160 int main ( ) 161 { 162 // Since this is a one-off test, we will not care about destroying 163 // these 164 start_semaphore = cast(sem_t*)mmap(null, 165 sem_t.sizeof, PROT_READ | PROT_WRITE, 166 MAP_SHARED | MAP_ANON, -1, 0); 167 168 if (start_semaphore is null) 169 { 170 return 1; 171 } 172 173 if (sem_init(start_semaphore, 1, 0) == -1) 174 { 175 return 1; 176 } 177 178 auto e = new ErrnoException; 179 180 // Create a tmp directory 181 mstring dir_name = "/tmp/ocean_socket_testXXXXXX".dup; 182 char* tmp_dir = mkdtemp(dir_name.ptr); 183 enforce(tmp_dir == dir_name.ptr); 184 auto local_address = dir_name ~ "/ocean.socket"; 185 186 int child_pid = fork(); 187 enforce(child_pid >= 0); 188 189 if (child_pid == 0) 190 { 191 client_process(local_address); 192 return 0; 193 } 194 else 195 { 196 // Only parent cleans up 197 scope (exit) 198 { 199 auto filename = StringC.toCString(local_address); 200 if (access(filename, F_OK) != -1) 201 { 202 if (unlink(filename) != 0) 203 { 204 throw e.useGlobalErrno("unlink"); 205 } 206 } 207 208 if (rmdir(tmp_dir) != 0) 209 { 210 throw e.useGlobalErrno("rmdir"); 211 } 212 } 213 214 auto epoll = new EpollSelectDispatcher(); 215 auto unix_socket = new UnixSocket; 216 217 // Value to be incremented via command to server 218 // Needs to be 3 after end of tests. 219 int expected_value = 0; 220 221 scope handleIncrementCommand = ( cstring args, 222 void delegate ( cstring response ) send_response, 223 void delegate ( ref mstring response ) wait_reply ) 224 { 225 expected_value += Integer.parse(args); 226 }; 227 228 // Command shutting down the epoll 229 scope handleShutdown = ( cstring args, 230 void delegate ( cstring response ) send_response ) 231 { 232 epoll.shutdown(); 233 }; 234 235 // test doesn't work in this callbacks, as the exceptions will be swallowed 236 int success_count = 0; 237 238 // Interactive callback. This will ask the client for the two names, 239 // which should be the same as two arguments with which 240 scope handleAskMyName = ( cstring args, 241 void delegate ( cstring response ) send_response, 242 void delegate ( ref mstring response ) wait_reply ) 243 { 244 scope i = new ChrSplitIterator(' '); 245 i.reset(args); 246 cstring first = i.next(); 247 cstring second = i.remaining(); 248 mstring response; 249 send_response("first name? "); wait_reply(response); 250 success_count += first == response ? 1 : 0; 251 send_response("second name? "); wait_reply(response); 252 success_count += second == response ? 1 : 0; 253 }; 254 255 // Interactive callback for the other command 256 // Used to check if we can execute more handlers 257 scope handleAskMyNameReverse = ( cstring args, 258 void delegate ( cstring response ) send_response, 259 void delegate ( ref mstring response ) wait_reply ) 260 { 261 scope i = new ChrSplitIterator(' '); 262 i.reset(args); 263 cstring first = i.next(); 264 cstring second = i.remaining(); 265 mstring response; 266 send_response("second name? "); wait_reply(response); 267 success_count += second == response ? 1 : 0; 268 send_response("first name? "); wait_reply(response); 269 success_count += first == response ? 1 : 0; 270 }; 271 272 // Simple echo command, non-interactive 273 scope handleEcho = delegate ( cstring args, 274 void delegate (cstring response) send_response) 275 { 276 send_response(args); 277 }; 278 279 auto unix_server = new UnixListener(idup(local_address), epoll, 280 ["echo"[]: handleEcho, 281 "shutdown": handleShutdown], 282 ["increment"[]: handleIncrementCommand, 283 "askMyName": handleAskMyName, 284 "askMeAgain": handleAskMyNameReverse] 285 ); 286 287 epoll.register(unix_server); 288 289 // let the child process know it may connect & start 290 sem_post(start_semaphore); 291 292 // Spin the server 293 epoll.eventLoop(); 294 295 // This will be reached only if "shutdown" command was successful. 296 test!("==")(expected_value, 3); 297 test!("==")(success_count, 4); 298 299 // Let's reap the zombies 300 301 int ret; 302 int status; 303 304 do 305 { 306 ret = wait(&status); 307 } 308 while (ret == -1 && errno == EINTR); 309 enforce (ret == child_pid); 310 } 311 312 return 0; 313 }