1 /****************************************************************************** 2 3 Module containing worker thread implementation of AsyncIO. 4 5 In async IO framework, fixed amount of worker threads are taking request 6 from the queue, and performing it (using blocking call which will in turn 7 block this thread). When finished, the worker thread will resume the 8 blocked fiber, and block on the semaphore waiting for the next request. 9 10 Copyright: 11 Copyright (c) 2018 dunnhumby Germany GmbH. 12 All rights reserved. 13 14 License: 15 Boost Software License Version 1.0. See LICENSE.txt for details. 16 17 ******************************************************************************/ 18 19 module ocean.util.aio.internal.ThreadWorker; 20 21 import core.memory; 22 import core.stdc.errno; 23 import core.sys.posix.semaphore; 24 import core.sys.posix.signal: SIGRTMIN; 25 import core.sys.posix.pthread; 26 import core.sys.posix.unistd; 27 import core.stdc.stdint; 28 import core.stdc.stdio; 29 import core.thread; 30 31 import ocean.util.aio.AsyncIO; 32 import ocean.util.aio.internal.JobQueue; 33 34 /************************************************************************** 35 36 Thread's entry point. 37 38 **************************************************************************/ 39 40 extern (C) static void* thread_entry_point(ThreadInitializationContext)(void* init_context_ptr) 41 { 42 ThreadInitializationContext* init_context = cast(ThreadInitializationContext*)init_context_ptr; 43 JobQueue jobs = init_context.job_queue; 44 typeof(ThreadInitializationContext.makeContext()) context; 45 46 // Block all signals delivered to this thread 47 sigset_t block_set; 48 49 // Return value is ignored here, as this can fail only 50 // because programming error, and we can't use assert here 51 cast(void) sigfillset(&block_set); 52 cast(void)pthread_sigmask(SIG_BLOCK, &block_set, null); 53 54 { 55 thread_lock_mutex(&init_context.init_mutex); 56 scope (exit) 57 thread_unlock_mutex(&init_context.init_mutex); 58 59 if (init_context.makeContext !is null) 60 { 61 context = init_context.makeContext(); 62 } 63 64 pthread_attr_t attr; 65 void* stack_addr; 66 size_t stack_size; 67 68 core.thread.pthread_getattr_np(pthread_self(), &attr); 69 pthread_attr_getstack(&attr, &stack_addr, &stack_size); 70 pthread_attr_destroy(&attr); 71 72 GC.addRange(stack_addr, stack_size); 73 74 init_context.to_create--; 75 } 76 77 pthread_cond_signal(&init_context.init_cond); 78 79 // Wait for new jobs and execute them 80 while (true) 81 { 82 // Current job 83 Job* job; 84 85 // Wait on the semaphore for new jobs 86 while (true) 87 { 88 auto ret = sem_wait(&(jobs.jobs_available)); 89 90 if (ret == -1 && .errno != EINTR) 91 { 92 exit_thread(-1); 93 } 94 else if (ret == 0) 95 { 96 break; 97 } 98 } 99 100 // Get the next job, if any 101 job = jobs.takeFirstNonTakenJob(&thread_lock_mutex, 102 &thread_unlock_mutex); 103 104 // No more jobs 105 if (job == null) 106 { 107 break; 108 } 109 110 ssize_t ret_value; 111 switch (job.cmd) 112 { 113 case Job.Command.Read: 114 ret_value = do_pread(job); 115 break; 116 case Job.Command.Write: 117 ret_value = do_write(job); 118 break; 119 case Job.Command.Fsync: 120 ret_value = do_fsync(job); 121 break; 122 case Job.Command.Close: 123 ret_value = do_close(job); 124 break; 125 case Job.Command.CallDelegate: 126 ret_value = do_call_delegate(context, job); 127 break; 128 default: 129 break; 130 } 131 132 job.return_value = ret_value; 133 134 if (ret_value != 0 && job.ret_val !is null) 135 { 136 *job.errno_val = .errno; 137 *job.ret_val = ret_value; 138 } 139 140 // Signal that you have done the job 141 signalJobDone(jobs, job); 142 } 143 144 return cast(void*)0; 145 } 146 147 148 /************************************************************************** 149 150 Wrapper around fsync call. 151 152 Params: 153 job = job for which the request is executed. 154 155 Returns: 156 0 in case of success, -1 in case of failure 157 158 **************************************************************************/ 159 160 private static ssize_t do_fsync (Job* job) 161 { 162 while (true) 163 { 164 auto ret = .fsync(job.fd); 165 166 if (ret == 0 || .errno != EINTR) 167 { 168 return ret; 169 } 170 } 171 } 172 173 /************************************************************************** 174 175 Wrapper around close call. 176 177 Params: 178 job = job for which the request is executed. 179 180 Returns: 181 0 in case of success, -1 in case of failure 182 183 **************************************************************************/ 184 185 private static ssize_t do_close (Job* job) 186 { 187 ssize_t ret; 188 189 do 190 { 191 ret = .close(job.fd); 192 } 193 while (ret != 0 && .errno == EINTR); 194 195 return ret; 196 } 197 198 /************************************************************************** 199 200 Wrapper around pread call. 201 202 Params: 203 job = job for which the request is executed. 204 205 Returns: 206 number of bytes read, or -1 in case of error. 207 208 **************************************************************************/ 209 210 private static ssize_t do_pread (Job* job) 211 { 212 // Now, do the reading! 213 size_t count = 0; 214 while (count < job.recv_buffer.length) 215 { 216 ssize_t read; 217 218 while (true) 219 { 220 read = .pread(job.fd, job.recv_buffer.ptr + count, 221 job.recv_buffer.length - count, job.offset + count); 222 223 if (read >= 0 || .errno != EINTR) 224 { 225 break; 226 } 227 } 228 229 // Check for the error 230 if (read < 0) 231 { 232 return read; 233 } 234 235 if (read == 0) 236 { 237 // No more data 238 break; 239 } 240 241 count += read; 242 } 243 244 return count; 245 } 246 247 /************************************************************************** 248 249 Wrapper around write call. 250 251 Params: 252 job = job for which the request is executed. 253 254 Returns: 255 number of bytes writen, or -1 in case of error. 256 257 **************************************************************************/ 258 259 private static ssize_t do_write (Job* job) 260 { 261 size_t count = 0; 262 // TODO: check sync_writes here 263 while (count < job.recv_buffer.length) 264 { 265 ssize_t ret; 266 267 while (true) 268 { 269 ret = .write(job.fd, job.recv_buffer.ptr + count, 270 job.recv_buffer.length - count); 271 272 if (ret >= 0 || .errno != EINTR) 273 { 274 break; 275 } 276 } 277 278 // Check for the error 279 if (ret < 0) 280 { 281 return ret; 282 } 283 284 count += ret; 285 } 286 287 return count; 288 } 289 290 /************************************************************************** 291 292 Wrapper around call of the arbitrary delegate. 293 294 Params: 295 job = job for which the request is executed. 296 297 Returns: 298 non-zero if the delegate call finishes successfully, zero 299 if the delegate thrown an exception 300 301 **************************************************************************/ 302 303 private static ssize_t do_call_delegate (AsyncIO.Context context, Job* job) 304 { 305 try 306 { 307 job.user_delegate(context); 308 return 1; 309 } 310 catch (Exception) 311 { 312 return 0; 313 } 314 } 315 316 /********************************************************************* 317 318 Helper method to exit the thread and raise a signal 319 in parent AsyncIO 320 321 This method will signal the main thread that this thread has 322 performed the invalid operation from which it can't recover 323 and it will exit the current thread with a return code. 324 325 *********************************************************************/ 326 327 private void exit_thread(int return_code) 328 { 329 // getpid is always successful 330 int parent_id = getpid(); 331 pthread_kill(parent_id, SIGRTMIN); 332 pthread_exit(cast(void*)return_code); 333 } 334 335 /********************************************************************* 336 337 Method implementing locking the mutex with non-allocating and non 338 throwing error handling. 339 340 Since this method will be called from the pthread, we must not 341 throw or allocate anything from here. 342 343 Instead, main thread is being signaled and the current thread 344 exits with the -1 value. 345 346 Params: 347 mutex = mutex to perform the operation on 348 349 *********************************************************************/ 350 351 private void thread_lock_mutex (pthread_mutex_t* mutex) 352 { 353 if (pthread_mutex_lock(mutex) != 0) 354 { 355 exit_thread(-1); 356 } 357 } 358 359 /********************************************************************* 360 361 Method implementing unlocking the mutex with non-allocating and non 362 throwing error handling. 363 364 Since this method will be called from the pthread, we must not 365 throw or allocate anything from here. 366 367 Instead, main thread is being signaled and the current thread 368 exits with the -1 value. 369 370 Params: 371 mutex = mutex to perform the operation on 372 373 *********************************************************************/ 374 375 private void thread_unlock_mutex (pthread_mutex_t* mutex) 376 { 377 if (pthread_mutex_unlock(mutex) != 0) 378 { 379 exit_thread(-1); 380 } 381 } 382 383 /********************************************************************** 384 385 Signals that the request has been executed and checks for the 386 cancelation 387 388 Params: 389 jobs = queue containing jobs to be run 390 job = pointer to job containing executed request 391 392 **********************************************************************/ 393 394 private void signalJobDone (JobQueue jobs, Job* job) 395 { 396 jobs.markJobReady(job, 397 &thread_lock_mutex, &thread_unlock_mutex); 398 }