1 /******************************************************************************
2 
3     Module containing implementation of the request queue for AsyncIO.
4 
5     Copyright:
6         Copyright (c) 2018 dunnhumby Germany GmbH.
7         All rights reserved.
8 
9     License:
10         Boost Software License Version 1.0. See LICENSE.txt for details.
11 
12 ******************************************************************************/
13 
14 module ocean.util.aio.internal.JobQueue;
15 
16 import ocean.meta.types.Qualifiers;
17 
18 import core.stdc.errno;
19 import core.stdc.stdint;
20 import core.sys.posix.unistd;
21 import core.sys.posix.semaphore;
22 import core.sys.posix.pthread;
23 
24 import ocean.sys.ErrnoException;
25 
26 import ocean.util.aio.AsyncIO;
27 import ocean.util.aio.JobNotification;
28 import ocean.util.aio.internal.AioScheduler;
29 
30 /**********************************************************************
31 
32     Single job definition
33 
34 **********************************************************************/
35 
36 public static struct Job
37 {
38     import ocean.core.array.Mutation: copy;
39     import ocean.util.aio.internal.MutexOps;
40 
41     /******************************************************************
42 
43         Command to be executed for the current request.
44 
45     *******************************************************************/
46 
47     public enum Command
48     {
49         Read,
50         Write,
51         Fsync,
52         Close,
53         CallDelegate,
54     }
55 
56     /******************************************************************
57 
58         Command to run for this request
59 
60     ******************************************************************/
61 
62     public Command cmd;
63 
64     /****************************************************************
65 
66         File descriptor of the file to perform
67         the request on
68 
69     ****************************************************************/
70 
71     public int fd;
72 
73     /****************************************************************
74 
75         Offset from the file to perform the request
76 
77     ****************************************************************/
78 
79     public size_t offset;
80 
81     /****************************************************************
82 
83         User supplied delegate to call.
84 
85     ****************************************************************/
86 
87     public void delegate (AsyncIO.Context) user_delegate;
88 
89     /****************************************************************
90 
91         The field to store the return value of the system call.
92 
93     ****************************************************************/
94 
95     public ssize_t return_value;
96 
97     /****************************************************************
98 
99         Pointer to variable which should receive the return value of
100         the system call.
101 
102     ****************************************************************/
103 
104     public ssize_t* ret_val;
105 
106     /****************************************************************
107 
108         Pinter to variable which should receive the errno value
109         after the system call
110 
111     ****************************************************************/
112 
113     public int* errno_val;
114 
115     /****************************************************************
116 
117         JobNotification used to wake the job.
118 
119     ****************************************************************/
120 
121     public JobNotification suspended_job;
122 
123     /****************************************************************
124 
125         Indicates if the job is being handled within one
126         of the threads
127 
128     ****************************************************************/
129 
130     private bool is_taken;
131 
132     /****************************************************************
133 
134         Indicates if this slot in the queue is empty
135         and can be taken
136 
137     ****************************************************************/
138 
139     private bool is_slot_free;
140 
141     /****************************************************************
142 
143         Buffer to be filled by the worker thread. When the job is
144         finalised, the contents are copied into user_buffer.
145 
146     ****************************************************************/
147 
148     public void[] recv_buffer;
149 
150     /***************************************************************
151 
152         User buffer to copy the results to. NOTE: the reason to have
153         recv_buffer and user_buffer separate is because it might happen
154         that user discards the job and the buffer, while thread is
155         writing into it, in which case the contents of recv_buffer
156         are discarded.
157 
158     ****************************************************************/
159 
160     public void[] user_buffer;
161 
162     /***************************************************************************
163 
164         Prepares the results, called upon the completion of the request,
165         if the jobs has not been cancelled while waiting for the completion.
166 
167     ***************************************************************************/
168 
169     public void function(Job* job) finalize_results;
170 
171     /***************************************************************************
172 
173         User-provided method to call when the job has been completed.
174 
175     ***************************************************************************/
176 
177     public void delegate(ssize_t)[]  finish_callback_dgs;
178 
179     /***************************************************************************
180 
181         Job queue where this jobs is currently resident. Used for recycling.
182 
183     ***************************************************************************/
184 
185     private JobQueue owner_queue;
186 
187     /***************************************************************************
188 
189         Performs any actions at the end of the AIO operation.
190 
191     ***************************************************************************/
192 
193     public void finished ()
194     {
195         if (this.finalize_results)
196         {
197             this.finalize_results(&this);
198         }
199 
200         if (this.finish_callback_dgs.length)
201         {
202             foreach (dg; this.finish_callback_dgs)
203             {
204                 dg(this.return_value);
205             }
206         }
207     }
208 
209     /***************************************************************************
210 
211         Registers the callback to call after finishing the job.
212 
213         Params:
214             dg = delegate to call when the job is finished.
215 
216     ***************************************************************************/
217 
218     public Job* registerCallback (scope void delegate(ssize_t) dg) return
219     {
220         this.finish_callback_dgs ~= dg;
221         return &this;
222     }
223 
224     /***************************************************************************
225 
226         Recycles the job and marks it free to use by the AIO for the next
227         operation.
228 
229     ***************************************************************************/
230 
231     public void recycle ()
232     {
233         this.owner_queue.recycleJob(&this, &lock_mutex, &unlock_mutex);
234     }
235 }
236 
237 /**************************************************************************
238 
239     Pending jobs queue.
240 
241 **************************************************************************/
242 
243 public static class JobQueue
244 {
245     import ocean.util.container.LinkedList;
246 
247 
248     /********************************************************************
249 
250         List containing requests.
251         Note that, since the worker threads are taking
252         pointers to jobs, moving container must not be used
253         as that would invalidate pointers to the exiting jobs
254         held by other threads
255 
256     *********************************************************************/
257 
258     private LinkedList!(Job*) jobs;
259 
260 
261     /********************************************************************
262 
263         Mutex protecting job queue.
264 
265     ********************************************************************/
266 
267     private pthread_mutex_t jobs_mutex;
268 
269 
270     /*********************************************************************
271 
272         Indicator if workers should
273         stop doing more work
274 
275     *********************************************************************/
276 
277     private bool cancel_further_jobs;
278 
279 
280     /*********************************************************************
281 
282         AioScheduler used to wake the ready jobs.
283 
284     *********************************************************************/
285 
286     private AioScheduler scheduler;
287 
288     /*********************************************************************
289 
290         Constructor
291 
292         Params:
293             exception = ErrnoException instance to throw in case
294                         initialization failed
295             scheduler = AioScheduler to schedule waking the jobs on
296 
297     *********************************************************************/
298 
299     public this (ErrnoException exception, AioScheduler scheduler)
300     {
301         this.jobs = new typeof(this.jobs);
302         this.scheduler = scheduler;
303 
304         exception.enforceRetCode!(pthread_mutex_init).call(
305                 &this.jobs_mutex, null);
306 
307         exception.enforceRetCode!(sem_init).call
308             (&this.jobs_available, 0, 0); // No jobs are ready initially
309     }
310 
311 
312     /*********************************************************************
313 
314         Takes the first jobs in the queue that's not being
315         served by any other thread.
316 
317         Params:
318             MutexOp = function or delegate mutex accepting the pointer to the
319                 mutex. Used so this method works both with delegate and
320                 function.
321             lock_mutex = method to be called to lock a mutex and perform
322                          error checking
323             unlock_mutex = method to be called to unlock a mutex and perform
324                          error checking
325 
326     *********************************************************************/
327 
328     public Job* takeFirstNonTakenJob(MutexOp)(MutexOp lock_mutex,
329             MutexOp unlock_mutex)
330     {
331         lock_mutex(&this.jobs_mutex);
332         scope (exit)
333         {
334             unlock_mutex(&this.jobs_mutex);
335         }
336 
337         if (this.cancel_further_jobs)
338         {
339             return null;
340         }
341 
342         foreach (ref job; this.jobs)
343         {
344             if (job.is_slot_free == false && job.is_taken == false)
345             {
346                 job.is_taken = true;
347                 return job;
348             }
349         }
350 
351         return null;
352     }
353 
354     /*********************************************************************
355 
356         Reserves a job slot in the queue. It either reuses
357         existing slot, or allocates a new one if all
358         existing slots are occupied
359 
360         Params:
361             MutexOp = function or delegate mutex accepting the pointer to the
362                 mutex. Used so this method works both with delegate and
363                 function.
364             lock_mutex = method to be called to lock a mutex and perform
365                          error checking
366             unlock_mutex = method to be called to unlock a mutex and perform
367                          error checking
368 
369         Returns:
370             pointer to the job slot in the queue
371 
372     *********************************************************************/
373 
374     public Job* reserveJobSlot(MutexOp)(MutexOp lock_mutex,
375             MutexOp unlock_mutex)
376     {
377         lock_mutex(&this.jobs_mutex);
378         scope (exit)
379         {
380             unlock_mutex(&this.jobs_mutex);
381         }
382 
383         Job* free_job = null;
384 
385         foreach (ref job; this.jobs)
386         {
387             if (job.is_slot_free && !job.is_taken)
388             {
389                 free_job = job;
390                 break;
391             }
392         }
393 
394         if (!free_job)
395         {
396             // adds at the beginning
397             auto new_job = new Job();
398             this.jobs.add(new_job);
399             free_job = this.jobs.get(0);
400         }
401 
402         free_job.is_taken = false;
403         free_job.is_slot_free = false;
404         free_job.owner_queue = this;
405         free_job.finish_callback_dgs.length = 0;
406         assumeSafeAppend(free_job.finish_callback_dgs);
407         free_job.ret_val = null;
408         free_job.errno_val = null;
409 
410         return free_job;
411     }
412 
413     /*********************************************************************
414 
415         Marks the job as ready and schedules the routine to be waken
416         up by the scheduler.
417 
418         Params:
419             job = job that has completed
420             lock_mutex = function or delegate to lock the mutex
421             unlock_mutex = function or delegate to unlock the mutex
422 
423     *********************************************************************/
424 
425     public void markJobReady(MutexOp) ( Job* job,
426             MutexOp lock_mutex, MutexOp unlock_mutex )
427     {
428         this.scheduler.requestReady(job,
429                 lock_mutex, unlock_mutex);
430     }
431 
432     /*********************************************************************
433 
434         Recycles the job slot and checks if there are any more jobs to
435         be served.
436 
437         Params:
438             MutexOp = function or delegate mutex accepting the pointer to the
439                 mutex. Used so this method works both with delegate and
440                 function.
441             job = job to recycle
442             lock_mutex = method to be called to lock a mutex and perform
443                          error checking
444             unlock_mutex = method to be called to unlock a mutex and perform
445                          error checking
446 
447         Returns:
448             true if there will be more jobs, false otherwise
449 
450 
451     *********************************************************************/
452 
453     public bool recycleJob(MutexOp) (Job* job, MutexOp lock_mutex,
454             MutexOp unlock_mutex)
455     {
456         if (this.cancel_further_jobs)
457             return false;
458 
459         lock_mutex(&this.jobs_mutex);
460         scope(exit)
461         {
462             unlock_mutex(&this.jobs_mutex);
463         }
464 
465         job.is_taken = false;
466         job.is_slot_free = true;
467 
468         return !this.cancel_further_jobs;
469     }
470 
471     /*********************************************************************
472 
473         Tells the queue to stop serving more jobs to workers.
474 
475         Params:
476             MutexOp = function or delegate mutex accepting the pointer to the
477                 mutex. Used so this method works both with delegate and
478                 function.
479             lock_mutex = method to be called to lock a mutex and perform
480                          error checking
481             unlock_mutex = method to be called to unlock a mutex and perform
482                          error checking
483 
484 
485     *********************************************************************/
486 
487     public void stop(MutexOp) (MutexOp lock_mutex,
488             MutexOp unlock_mutex)
489     {
490         lock_mutex(&this.jobs_mutex);
491         scope(exit)
492         {
493             unlock_mutex(&this.jobs_mutex);
494         }
495 
496         this.cancel_further_jobs = true;
497     }
498 
499     /*********************************************************************
500 
501         Destructor
502 
503         Params:
504             exception = ErrnoException instance to throw in case destruction
505             failed
506 
507     *********************************************************************/
508 
509     public void destroy (ErrnoException exception)
510     {
511         // Can only fail if the mutex is still held somewhere.  Since users
512         // should already be joined on all threads, that should not be
513         // possible.
514         auto ret = pthread_mutex_destroy(&this.jobs_mutex);
515 
516         switch (ret)
517         {
518             case 0:
519                 break;
520             default:
521                 throw exception.set(ret, "pthread_mutex_destroy");
522             case EBUSY:
523                 assert(false, "Mutex still held");
524             case EINVAL:
525                 assert(false, "Mutex reference is invalid");
526         }
527 
528         ret = sem_destroy(&this.jobs_available);
529 
530         switch (ret)
531         {
532             case 0:
533                 break;
534             default:
535                 throw exception.set(ret, "sem_destroy");
536             case EINVAL:
537                 assert(false, "Semaphore is not valid.");
538         }
539     }
540 
541     /********************************************************************
542 
543         Semaphore indicating number of jobs in the request queue.
544 
545     ********************************************************************/
546 
547     public sem_t jobs_available;
548 }