1 /****************************************************************************** 2 3 Module containing worker thread implementation of AsyncIO. 4 5 In async IO framework, fixed amount of worker threads are taking request 6 from the queue, and performing it (using blocking call which will in turn 7 block this thread). When finished, the worker thread will resume the 8 blocked fiber, and block on the semaphore waiting for the next request. 9 10 Copyright: 11 Copyright (c) 2018 dunnhumby Germany GmbH. 12 All rights reserved. 13 14 License: 15 Boost Software License Version 1.0. See LICENSE.txt for details. 16 17 ******************************************************************************/ 18 19 module ocean.util.aio.internal.ThreadWorker; 20 21 import core.memory; 22 import core.stdc.errno; 23 import core.sys.posix.semaphore; 24 import core.sys.posix.pthread; 25 import core.sys.posix.unistd; 26 import core.stdc.stdint; 27 import core.stdc.stdio; 28 import ocean.util.aio.AsyncIO; 29 30 import ocean.util.aio.internal.JobQueue; 31 32 import core.sys.posix.signal: SIGRTMIN; 33 34 extern(C) int pthread_getattr_np(pthread_t thread, pthread_attr_t* attr); 35 36 /************************************************************************** 37 38 Thread's entry point. 39 40 **************************************************************************/ 41 42 extern (C) static void* thread_entry_point(ThreadInitializationContext)(void* init_context_ptr) 43 { 44 ThreadInitializationContext* init_context = cast(ThreadInitializationContext*)init_context_ptr; 45 JobQueue jobs = init_context.job_queue; 46 typeof(ThreadInitializationContext.makeContext()) context; 47 48 // Block all signals delivered to this thread 49 sigset_t block_set; 50 51 // Return value is ignored here, as this can fail only 52 // because programming error, and we can't use assert here 53 cast(void) sigfillset(&block_set); 54 cast(void)pthread_sigmask(SIG_BLOCK, &block_set, null); 55 56 { 57 thread_lock_mutex(&init_context.init_mutex); 58 scope (exit) 59 thread_unlock_mutex(&init_context.init_mutex); 60 61 if (init_context.makeContext !is null) 62 { 63 context = init_context.makeContext(); 64 } 65 66 pthread_attr_t attr; 67 void* stack_addr; 68 size_t stack_size; 69 70 pthread_getattr_np(pthread_self(), &attr); 71 pthread_attr_getstack(&attr, &stack_addr, &stack_size); 72 pthread_attr_destroy(&attr); 73 74 GC.addRange(stack_addr, stack_size); 75 76 init_context.to_create--; 77 } 78 79 pthread_cond_signal(&init_context.init_cond); 80 81 // Wait for new jobs and execute them 82 while (true) 83 { 84 // Current job 85 Job* job; 86 87 // Wait on the semaphore for new jobs 88 while (true) 89 { 90 auto ret = sem_wait(&(jobs.jobs_available)); 91 92 if (ret == -1 && .errno != EINTR) 93 { 94 exit_thread(-1); 95 } 96 else if (ret == 0) 97 { 98 break; 99 } 100 } 101 102 // Get the next job, if any 103 job = jobs.takeFirstNonTakenJob(&thread_lock_mutex, 104 &thread_unlock_mutex); 105 106 // No more jobs 107 if (job == null) 108 { 109 break; 110 } 111 112 ssize_t ret_value; 113 switch (job.cmd) 114 { 115 case Job.Command.Read: 116 ret_value = do_pread(job); 117 break; 118 case Job.Command.Write: 119 ret_value = do_write(job); 120 break; 121 case Job.Command.Fsync: 122 ret_value = do_fsync(job); 123 break; 124 case Job.Command.Close: 125 ret_value = do_close(job); 126 break; 127 case Job.Command.CallDelegate: 128 ret_value = do_call_delegate(context, job); 129 break; 130 default: 131 break; 132 } 133 134 job.return_value = ret_value; 135 136 if (ret_value != 0 && job.ret_val !is null) 137 { 138 *job.errno_val = .errno; 139 *job.ret_val = ret_value; 140 } 141 142 // Signal that you have done the job 143 signalJobDone(jobs, job); 144 } 145 146 return cast(void*)0; 147 } 148 149 150 /************************************************************************** 151 152 Wrapper around fsync call. 153 154 Params: 155 job = job for which the request is executed. 156 157 Returns: 158 0 in case of success, -1 in case of failure 159 160 **************************************************************************/ 161 162 private static ssize_t do_fsync (Job* job) 163 { 164 while (true) 165 { 166 auto ret = .fsync(job.fd); 167 168 if (ret == 0 || .errno != EINTR) 169 { 170 return ret; 171 } 172 } 173 } 174 175 /************************************************************************** 176 177 Wrapper around close call. 178 179 Params: 180 job = job for which the request is executed. 181 182 Returns: 183 0 in case of success, -1 in case of failure 184 185 **************************************************************************/ 186 187 private static ssize_t do_close (Job* job) 188 { 189 ssize_t ret; 190 191 do 192 { 193 ret = .close(job.fd); 194 } 195 while (ret != 0 && .errno == EINTR); 196 197 return ret; 198 } 199 200 /************************************************************************** 201 202 Wrapper around pread call. 203 204 Params: 205 job = job for which the request is executed. 206 207 Returns: 208 number of bytes read, or -1 in case of error. 209 210 **************************************************************************/ 211 212 private static ssize_t do_pread (Job* job) 213 { 214 // Now, do the reading! 215 size_t count = 0; 216 while (count < job.recv_buffer.length) 217 { 218 ssize_t read; 219 220 while (true) 221 { 222 read = .pread(job.fd, job.recv_buffer.ptr + count, 223 job.recv_buffer.length - count, job.offset + count); 224 225 if (read >= 0 || .errno != EINTR) 226 { 227 break; 228 } 229 } 230 231 // Check for the error 232 if (read < 0) 233 { 234 return read; 235 } 236 237 if (read == 0) 238 { 239 // No more data 240 break; 241 } 242 243 count += read; 244 } 245 246 return count; 247 } 248 249 /************************************************************************** 250 251 Wrapper around write call. 252 253 Params: 254 job = job for which the request is executed. 255 256 Returns: 257 number of bytes writen, or -1 in case of error. 258 259 **************************************************************************/ 260 261 private static ssize_t do_write (Job* job) 262 { 263 size_t count = 0; 264 // TODO: check sync_writes here 265 while (count < job.recv_buffer.length) 266 { 267 ssize_t ret; 268 269 while (true) 270 { 271 ret = .write(job.fd, job.recv_buffer.ptr + count, 272 job.recv_buffer.length - count); 273 274 if (ret >= 0 || .errno != EINTR) 275 { 276 break; 277 } 278 } 279 280 // Check for the error 281 if (ret < 0) 282 { 283 return ret; 284 } 285 286 count += ret; 287 } 288 289 return count; 290 } 291 292 /************************************************************************** 293 294 Wrapper around call of the arbitrary delegate. 295 296 Params: 297 job = job for which the request is executed. 298 299 Returns: 300 non-zero if the delegate call finishes successfully, zero 301 if the delegate thrown an exception 302 303 **************************************************************************/ 304 305 private static ssize_t do_call_delegate (AsyncIO.Context context, Job* job) 306 { 307 try 308 { 309 job.user_delegate(context); 310 return 1; 311 } 312 catch (Exception) 313 { 314 return 0; 315 } 316 } 317 318 /********************************************************************* 319 320 Helper method to exit the thread and raise a signal 321 in parent AsyncIO 322 323 This method will signal the main thread that this thread has 324 performed the invalid operation from which it can't recover 325 and it will exit the current thread with a return code. 326 327 *********************************************************************/ 328 329 private void exit_thread(int return_code) 330 { 331 // getpid is always successful 332 int parent_id = getpid(); 333 pthread_kill(parent_id, SIGRTMIN); 334 pthread_exit(cast(void*)return_code); 335 } 336 337 /********************************************************************* 338 339 Method implementing locking the mutex with non-allocating and non 340 throwing error handling. 341 342 Since this method will be called from the pthread, we must not 343 throw or allocate anything from here. 344 345 Instead, main thread is being signaled and the current thread 346 exits with the -1 value. 347 348 Params: 349 mutex = mutex to perform the operation on 350 351 *********************************************************************/ 352 353 private void thread_lock_mutex (pthread_mutex_t* mutex) 354 { 355 if (pthread_mutex_lock(mutex) != 0) 356 { 357 exit_thread(-1); 358 } 359 } 360 361 /********************************************************************* 362 363 Method implementing unlocking the mutex with non-allocating and non 364 throwing error handling. 365 366 Since this method will be called from the pthread, we must not 367 throw or allocate anything from here. 368 369 Instead, main thread is being signaled and the current thread 370 exits with the -1 value. 371 372 Params: 373 mutex = mutex to perform the operation on 374 375 *********************************************************************/ 376 377 private void thread_unlock_mutex (pthread_mutex_t* mutex) 378 { 379 if (pthread_mutex_unlock(mutex) != 0) 380 { 381 exit_thread(-1); 382 } 383 } 384 385 /********************************************************************** 386 387 Signals that the request has been executed and checks for the 388 cancelation 389 390 Params: 391 jobs = queue containing jobs to be run 392 job = pointer to job containing executed request 393 394 **********************************************************************/ 395 396 private void signalJobDone (JobQueue jobs, Job* job) 397 { 398 jobs.markJobReady(job, 399 &thread_lock_mutex, &thread_unlock_mutex); 400 }