1 /******************************************************************************
2 
3     Module containing worker thread implementation of AsyncIO.
4 
5     In async IO framework, fixed amount of worker threads are taking request
6     from the queue, and performing it (using blocking call which will in turn
7     block this thread). When finished, the worker thread will resume the
8     blocked fiber, and block on the semaphore waiting for the next request.
9 
10     Copyright:
11         Copyright (c) 2018 dunnhumby Germany GmbH.
12         All rights reserved.
13 
14     License:
15         Boost Software License Version 1.0. See LICENSE.txt for details.
16 
17 ******************************************************************************/
18 
19 module ocean.util.aio.internal.ThreadWorker;
20 
21 import core.memory;
22 import core.stdc.errno;
23 import core.sys.posix.semaphore;
24 import core.sys.posix.signal: SIGRTMIN;
25 import core.sys.posix.pthread;
26 import core.sys.posix.unistd;
27 import core.stdc.stdint;
28 import core.stdc.stdio;
29 import core.thread;
30 
31 import ocean.util.aio.AsyncIO;
32 import ocean.util.aio.internal.JobQueue;
33 
34 /**************************************************************************
35 
36     Thread's entry point.
37 
38 **************************************************************************/
39 
40 extern (C) static void* thread_entry_point(ThreadInitializationContext)(void* init_context_ptr)
41 {
42     ThreadInitializationContext* init_context = cast(ThreadInitializationContext*)init_context_ptr;
43     JobQueue jobs = init_context.job_queue;
44     typeof(ThreadInitializationContext.makeContext()) context;
45 
46     // Block all signals delivered to this thread
47     sigset_t block_set;
48 
49     // Return value is ignored here, as this can fail only
50     // because programming error, and we can't use assert here
51     cast(void) sigfillset(&block_set);
52     cast(void)pthread_sigmask(SIG_BLOCK, &block_set, null);
53 
54     {
55         thread_lock_mutex(&init_context.init_mutex);
56         scope (exit)
57             thread_unlock_mutex(&init_context.init_mutex);
58 
59         if (init_context.makeContext !is null)
60         {
61             context = init_context.makeContext();
62         }
63 
64         pthread_attr_t attr;
65         void* stack_addr;
66         size_t stack_size;
67 
68         core.thread.pthread_getattr_np(pthread_self(), &attr);
69         pthread_attr_getstack(&attr, &stack_addr, &stack_size);
70         pthread_attr_destroy(&attr);
71 
72         GC.addRange(stack_addr, stack_size);
73 
74         init_context.to_create--;
75     }
76 
77     pthread_cond_signal(&init_context.init_cond);
78 
79     // Wait for new jobs and execute them
80     while (true)
81     {
82         // Current job
83         Job* job;
84 
85         // Wait on the semaphore for new jobs
86         while (true)
87         {
88             auto ret = sem_wait(&(jobs.jobs_available));
89 
90             if (ret == -1 && .errno != EINTR)
91             {
92                 exit_thread(-1);
93             }
94             else if (ret == 0)
95             {
96                 break;
97             }
98         }
99 
100         // Get the next job, if any
101         job = jobs.takeFirstNonTakenJob(&thread_lock_mutex,
102                 &thread_unlock_mutex);
103 
104         // No more jobs
105         if (job == null)
106         {
107             break;
108         }
109 
110         ssize_t ret_value;
111         switch (job.cmd)
112         {
113             case Job.Command.Read:
114                 ret_value = do_pread(job);
115                 break;
116             case Job.Command.Write:
117                 ret_value = do_write(job);
118                 break;
119             case Job.Command.Fsync:
120                 ret_value = do_fsync(job);
121                 break;
122             case Job.Command.Close:
123                 ret_value = do_close(job);
124                 break;
125             case Job.Command.CallDelegate:
126                 ret_value = do_call_delegate(context, job);
127                 break;
128             default:
129                 break;
130         }
131 
132         job.return_value = ret_value;
133 
134         if (ret_value != 0 && job.ret_val !is null)
135         {
136             *job.errno_val = .errno;
137             *job.ret_val = ret_value;
138         }
139 
140         // Signal that you have done the job
141         signalJobDone(jobs, job);
142     }
143 
144     return cast(void*)0;
145 }
146 
147 
148 /**************************************************************************
149 
150     Wrapper around fsync call.
151 
152     Params:
153         job = job for which the request is executed.
154 
155     Returns:
156         0 in case of success, -1 in case of failure
157 
158 **************************************************************************/
159 
160 private static ssize_t do_fsync (Job* job)
161 {
162     while (true)
163     {
164         auto ret = .fsync(job.fd);
165 
166         if (ret == 0 || .errno != EINTR)
167         {
168             return ret;
169         }
170     }
171 }
172 
173 /**************************************************************************
174 
175     Wrapper around close call.
176 
177     Params:
178         job = job for which the request is executed.
179 
180     Returns:
181         0 in case of success, -1 in case of failure
182 
183 **************************************************************************/
184 
185 private static ssize_t do_close (Job* job)
186 {
187     ssize_t ret;
188 
189     do
190     {
191         ret = .close(job.fd);
192     }
193     while (ret != 0 && .errno == EINTR);
194 
195     return ret;
196 }
197 
198 /**************************************************************************
199 
200     Wrapper around pread call.
201 
202     Params:
203         job = job for which the request is executed.
204 
205     Returns:
206         number of bytes read, or -1 in case of error.
207 
208 **************************************************************************/
209 
210 private static ssize_t do_pread (Job* job)
211 {
212     // Now, do the reading!
213     size_t count = 0;
214     while (count < job.recv_buffer.length)
215     {
216         ssize_t read;
217 
218         while (true)
219         {
220             read = .pread(job.fd, job.recv_buffer.ptr + count,
221                         job.recv_buffer.length - count, job.offset + count);
222 
223             if (read >= 0 || .errno != EINTR)
224             {
225                 break;
226             }
227         }
228 
229         // Check for the error
230         if (read < 0)
231         {
232             return read;
233         }
234 
235         if (read == 0)
236         {
237             // No more data
238             break;
239         }
240 
241         count += read;
242     }
243 
244     return count;
245 }
246 
247 /**************************************************************************
248 
249     Wrapper around write call.
250 
251     Params:
252         job = job for which the request is executed.
253 
254     Returns:
255         number of bytes writen, or -1 in case of error.
256 
257 **************************************************************************/
258 
259 private static ssize_t do_write (Job* job)
260 {
261     size_t count = 0;
262     // TODO: check sync_writes here
263     while (count < job.recv_buffer.length)
264     {
265         ssize_t ret;
266 
267         while (true)
268         {
269             ret = .write(job.fd, job.recv_buffer.ptr + count,
270                         job.recv_buffer.length - count);
271 
272             if (ret >= 0 || .errno != EINTR)
273             {
274                 break;
275             }
276         }
277 
278         // Check for the error
279         if (ret < 0)
280         {
281             return ret;
282         }
283 
284         count += ret;
285     }
286 
287     return count;
288 }
289 
290 /**************************************************************************
291 
292     Wrapper around call of the arbitrary delegate.
293 
294     Params:
295         job = job for which the request is executed.
296 
297     Returns:
298         non-zero if the delegate call finishes successfully, zero
299         if the delegate thrown an exception
300 
301 **************************************************************************/
302 
303 private static ssize_t do_call_delegate (AsyncIO.Context context, Job* job)
304 {
305     try
306     {
307         job.user_delegate(context);
308         return 1;
309     }
310     catch (Exception)
311     {
312         return 0;
313     }
314 }
315 
316 /*********************************************************************
317 
318     Helper method to exit the thread and raise a signal
319     in parent AsyncIO
320 
321     This method will signal the main thread that this thread has
322     performed the invalid operation from which it can't recover
323     and it will exit the current thread with a return code.
324 
325 *********************************************************************/
326 
327 private void exit_thread(int return_code)
328 {
329     // getpid is always successful
330     int parent_id = getpid();
331     pthread_kill(parent_id, SIGRTMIN);
332     pthread_exit(cast(void*)return_code);
333 }
334 
335 /*********************************************************************
336 
337     Method implementing locking the mutex with non-allocating and non
338     throwing error handling.
339 
340     Since this method will be called from the pthread, we must not
341     throw or allocate anything from here.
342 
343     Instead, main thread is being signaled and the current thread
344     exits with the -1 value.
345 
346     Params:
347         mutex = mutex to perform the operation on
348 
349 *********************************************************************/
350 
351 private void thread_lock_mutex (pthread_mutex_t* mutex)
352 {
353     if (pthread_mutex_lock(mutex) != 0)
354     {
355         exit_thread(-1);
356     }
357 }
358 
359 /*********************************************************************
360 
361     Method implementing unlocking the mutex with non-allocating and non
362     throwing error handling.
363 
364     Since this method will be called from the pthread, we must not
365     throw or allocate anything from here.
366 
367     Instead, main thread is being signaled and the current thread
368     exits with the -1 value.
369 
370     Params:
371         mutex = mutex to perform the operation on
372 
373 *********************************************************************/
374 
375 private void thread_unlock_mutex (pthread_mutex_t* mutex)
376 {
377     if (pthread_mutex_unlock(mutex) != 0)
378     {
379         exit_thread(-1);
380     }
381 }
382 
383 /**********************************************************************
384 
385     Signals that the request has been executed and checks for the
386     cancelation
387 
388     Params:
389         jobs = queue containing jobs to be run
390         job = pointer to job containing executed request
391 
392 **********************************************************************/
393 
394 private void signalJobDone (JobQueue jobs, Job* job)
395 {
396     jobs.markJobReady(job,
397             &thread_lock_mutex, &thread_unlock_mutex);
398 }