1 /******************************************************************************
2 
3     Module containing worker thread implementation of AsyncIO.
4 
5     In async IO framework, fixed amount of worker threads are taking request
6     from the queue, and performing it (using blocking call which will in turn
7     block this thread). When finished, the worker thread will resume the
8     blocked fiber, and block on the semaphore waiting for the next request.
9 
10     Copyright:
11         Copyright (c) 2018 dunnhumby Germany GmbH.
12         All rights reserved.
13 
14     License:
15         Boost Software License Version 1.0. See LICENSE.txt for details.
16 
17 ******************************************************************************/
18 
19 module ocean.util.aio.internal.ThreadWorker;
20 
21 import core.memory;
22 import core.stdc.errno;
23 import core.sys.posix.semaphore;
24 import core.sys.posix.pthread;
25 import core.sys.posix.unistd;
26 import core.stdc.stdint;
27 import core.stdc.stdio;
28 import ocean.util.aio.AsyncIO;
29 
30 import ocean.util.aio.internal.JobQueue;
31 
32 import core.sys.posix.signal: SIGRTMIN;
33 
34 extern(C) int pthread_getattr_np(pthread_t thread, pthread_attr_t* attr);
35 
36 /**************************************************************************
37 
38     Thread's entry point.
39 
40 **************************************************************************/
41 
42 extern (C) static void* thread_entry_point(ThreadInitializationContext)(void* init_context_ptr)
43 {
44     ThreadInitializationContext* init_context = cast(ThreadInitializationContext*)init_context_ptr;
45     JobQueue jobs = init_context.job_queue;
46     typeof(ThreadInitializationContext.makeContext()) context;
47 
48     // Block all signals delivered to this thread
49     sigset_t block_set;
50 
51     // Return value is ignored here, as this can fail only
52     // because programming error, and we can't use assert here
53     cast(void) sigfillset(&block_set);
54     cast(void)pthread_sigmask(SIG_BLOCK, &block_set, null);
55 
56     {
57         thread_lock_mutex(&init_context.init_mutex);
58         scope (exit)
59             thread_unlock_mutex(&init_context.init_mutex);
60 
61         if (init_context.makeContext !is null)
62         {
63             context = init_context.makeContext();
64         }
65 
66         pthread_attr_t attr;
67         void* stack_addr;
68         size_t stack_size;
69 
70         pthread_getattr_np(pthread_self(), &attr);
71         pthread_attr_getstack(&attr, &stack_addr, &stack_size);
72         pthread_attr_destroy(&attr);
73 
74         GC.addRange(stack_addr, stack_size);
75 
76         init_context.to_create--;
77     }
78 
79     pthread_cond_signal(&init_context.init_cond);
80 
81     // Wait for new jobs and execute them
82     while (true)
83     {
84         // Current job
85         Job* job;
86 
87         // Wait on the semaphore for new jobs
88         while (true)
89         {
90             auto ret = sem_wait(&(jobs.jobs_available));
91 
92             if (ret == -1 && .errno != EINTR)
93             {
94                 exit_thread(-1);
95             }
96             else if (ret == 0)
97             {
98                 break;
99             }
100         }
101 
102         // Get the next job, if any
103         job = jobs.takeFirstNonTakenJob(&thread_lock_mutex,
104                 &thread_unlock_mutex);
105 
106         // No more jobs
107         if (job == null)
108         {
109             break;
110         }
111 
112         ssize_t ret_value;
113         switch (job.cmd)
114         {
115             case Job.Command.Read:
116                 ret_value = do_pread(job);
117                 break;
118             case Job.Command.Write:
119                 ret_value = do_write(job);
120                 break;
121             case Job.Command.Fsync:
122                 ret_value = do_fsync(job);
123                 break;
124             case Job.Command.Close:
125                 ret_value = do_close(job);
126                 break;
127             case Job.Command.CallDelegate:
128                 ret_value = do_call_delegate(context, job);
129                 break;
130             default:
131                 break;
132         }
133 
134         job.return_value = ret_value;
135 
136         if (ret_value != 0 && job.ret_val !is null)
137         {
138             *job.errno_val = .errno;
139             *job.ret_val = ret_value;
140         }
141 
142         // Signal that you have done the job
143         signalJobDone(jobs, job);
144     }
145 
146     return cast(void*)0;
147 }
148 
149 
150 /**************************************************************************
151 
152     Wrapper around fsync call.
153 
154     Params:
155         job = job for which the request is executed.
156 
157     Returns:
158         0 in case of success, -1 in case of failure
159 
160 **************************************************************************/
161 
162 private static ssize_t do_fsync (Job* job)
163 {
164     while (true)
165     {
166         auto ret = .fsync(job.fd);
167 
168         if (ret == 0 || .errno != EINTR)
169         {
170             return ret;
171         }
172     }
173 }
174 
175 /**************************************************************************
176 
177     Wrapper around close call.
178 
179     Params:
180         job = job for which the request is executed.
181 
182     Returns:
183         0 in case of success, -1 in case of failure
184 
185 **************************************************************************/
186 
187 private static ssize_t do_close (Job* job)
188 {
189     ssize_t ret;
190 
191     do
192     {
193         ret = .close(job.fd);
194     }
195     while (ret != 0 && .errno == EINTR);
196 
197     return ret;
198 }
199 
200 /**************************************************************************
201 
202     Wrapper around pread call.
203 
204     Params:
205         job = job for which the request is executed.
206 
207     Returns:
208         number of bytes read, or -1 in case of error.
209 
210 **************************************************************************/
211 
212 private static ssize_t do_pread (Job* job)
213 {
214     // Now, do the reading!
215     size_t count = 0;
216     while (count < job.recv_buffer.length)
217     {
218         ssize_t read;
219 
220         while (true)
221         {
222             read = .pread(job.fd, job.recv_buffer.ptr + count,
223                         job.recv_buffer.length - count, job.offset + count);
224 
225             if (read >= 0 || .errno != EINTR)
226             {
227                 break;
228             }
229         }
230 
231         // Check for the error
232         if (read < 0)
233         {
234             return read;
235         }
236 
237         if (read == 0)
238         {
239             // No more data
240             break;
241         }
242 
243         count += read;
244     }
245 
246     return count;
247 }
248 
249 /**************************************************************************
250 
251     Wrapper around write call.
252 
253     Params:
254         job = job for which the request is executed.
255 
256     Returns:
257         number of bytes writen, or -1 in case of error.
258 
259 **************************************************************************/
260 
261 private static ssize_t do_write (Job* job)
262 {
263     size_t count = 0;
264     // TODO: check sync_writes here
265     while (count < job.recv_buffer.length)
266     {
267         ssize_t ret;
268 
269         while (true)
270         {
271             ret = .write(job.fd, job.recv_buffer.ptr + count,
272                         job.recv_buffer.length - count);
273 
274             if (ret >= 0 || .errno != EINTR)
275             {
276                 break;
277             }
278         }
279 
280         // Check for the error
281         if (ret < 0)
282         {
283             return ret;
284         }
285 
286         count += ret;
287     }
288 
289     return count;
290 }
291 
292 /**************************************************************************
293 
294     Wrapper around call of the arbitrary delegate.
295 
296     Params:
297         job = job for which the request is executed.
298 
299     Returns:
300         non-zero if the delegate call finishes successfully, zero
301         if the delegate thrown an exception
302 
303 **************************************************************************/
304 
305 private static ssize_t do_call_delegate (AsyncIO.Context context, Job* job)
306 {
307     try
308     {
309         job.user_delegate(context);
310         return 1;
311     }
312     catch (Exception)
313     {
314         return 0;
315     }
316 }
317 
318 /*********************************************************************
319 
320     Helper method to exit the thread and raise a signal
321     in parent AsyncIO
322 
323     This method will signal the main thread that this thread has
324     performed the invalid operation from which it can't recover
325     and it will exit the current thread with a return code.
326 
327 *********************************************************************/
328 
329 private void exit_thread(int return_code)
330 {
331     // getpid is always successful
332     int parent_id = getpid();
333     pthread_kill(parent_id, SIGRTMIN);
334     pthread_exit(cast(void*)return_code);
335 }
336 
337 /*********************************************************************
338 
339     Method implementing locking the mutex with non-allocating and non
340     throwing error handling.
341 
342     Since this method will be called from the pthread, we must not
343     throw or allocate anything from here.
344 
345     Instead, main thread is being signaled and the current thread
346     exits with the -1 value.
347 
348     Params:
349         mutex = mutex to perform the operation on
350 
351 *********************************************************************/
352 
353 private void thread_lock_mutex (pthread_mutex_t* mutex)
354 {
355     if (pthread_mutex_lock(mutex) != 0)
356     {
357         exit_thread(-1);
358     }
359 }
360 
361 /*********************************************************************
362 
363     Method implementing unlocking the mutex with non-allocating and non
364     throwing error handling.
365 
366     Since this method will be called from the pthread, we must not
367     throw or allocate anything from here.
368 
369     Instead, main thread is being signaled and the current thread
370     exits with the -1 value.
371 
372     Params:
373         mutex = mutex to perform the operation on
374 
375 *********************************************************************/
376 
377 private void thread_unlock_mutex (pthread_mutex_t* mutex)
378 {
379     if (pthread_mutex_unlock(mutex) != 0)
380     {
381         exit_thread(-1);
382     }
383 }
384 
385 /**********************************************************************
386 
387     Signals that the request has been executed and checks for the
388     cancelation
389 
390     Params:
391         jobs = queue containing jobs to be run
392         job = pointer to job containing executed request
393 
394 **********************************************************************/
395 
396 private void signalJobDone (JobQueue jobs, Job* job)
397 {
398     jobs.markJobReady(job,
399             &thread_lock_mutex, &thread_unlock_mutex);
400 }