1 /******************************************************************************
2 
3     Module for doing non-blocking reads supported by threads.
4 
5     This module contains AsyncIO definition. Intented usage of AsyncIO is to
6     perform normally blocking IO calls (disk requests) in fiber-blocking
7     manner.
8 
9     Fiber wanting to perform a request should submit its request to AsyncIO
10     using public interface, passing all the arguments normally used by the
11     blocking call and JobNotification instance on which it will be
12     blocked.  After issuing the request, request will be put in the queue and
13     the fiber will block immidiatelly, giving chance to other fibers to run.
14 
15     In the background, fixed amount of worker threads are taking request from
16     the queue, and performing it (using blocking call which will in turn block
17     this thread). When finished, the worker thread will resume the blocked fiber,
18     and block on the semaphore waiting for the next request.
19 
20     Copyright:
21         Copyright (c) 2018 dunnhumby Germany GmbH.
22         All rights reserved.
23 
24     License:
25         Boost Software License Version 1.0. See LICENSE.txt for details.
26 
27 ******************************************************************************/
28 
29 module ocean.util.aio.AsyncIO;
30 
31 import ocean.meta.types.Qualifiers;
32 import ocean.core.Verify;
33 
34 import core.stdc.errno;
35 import core.sys.posix.semaphore;
36 import core.sys.posix.pthread;
37 import core.sys.posix.unistd;
38 import core.stdc.stdint;
39 import core.stdc.stdio;
40 import ocean.core.array.Mutation: copy;
41 import ocean.sys.ErrnoException;
42 import ocean.io.select.EpollSelectDispatcher;
43 
44 import ocean.util.aio.internal.JobQueue;
45 import ocean.util.aio.internal.ThreadWorker;
46 import ocean.util.aio.internal.MutexOps;
47 import ocean.util.aio.internal.AioScheduler;
48 import ocean.util.aio.JobNotification;
49 
50 version (unittest)
51 {
52     import ocean.core.Test;
53     import ocean.task.Scheduler;
54     import ocean.io.device.File;
55 }
56 
57 /******************************************************************************
58 
59     Class implementing AsyncIO support.
60 
61 ******************************************************************************/
62 
63 class AsyncIO
64 {
65     /**************************************************************************
66 
67         Base class for the thread worker context
68 
69     **************************************************************************/
70 
71     public static class Context
72     {
73 
74     }
75 
76     /**************************************************************************
77 
78         Ernno exception instance
79 
80         NOTE: must be thrown and catched only from/in main thread, as it is not
81               multithreaded-safe
82 
83     **************************************************************************/
84 
85     private ErrnoException exception;
86 
87 
88     /**************************************************************************
89 
90         Job queue
91 
92    ***************************************************************************/
93 
94     private JobQueue jobs;
95 
96     /**************************************************************************
97 
98         AioScheduler used to wake the ready jobs.
99 
100     **************************************************************************/
101 
102     private AioScheduler scheduler;
103 
104     /**************************************************************************
105 
106         Handles of worker threads.
107 
108     **************************************************************************/
109 
110     private pthread_t[] threads;
111 
112     /**************************************************************************
113 
114         Indicator if the AsyncIO is destroyed
115 
116     **************************************************************************/
117 
118     private bool destroyed;
119 
120     /**************************************************************************
121 
122         Struct providing the initialization data for the thread.
123 
124     **************************************************************************/
125 
126     public struct ThreadInitializationContext
127     {
128         JobQueue job_queue;
129         AsyncIO.Context delegate() makeContext;
130         pthread_mutex_t init_mutex;
131         pthread_cond_t init_cond;
132         int to_create;
133     }
134 
135     /**************************************************************************
136 
137         Ditto
138 
139     **************************************************************************/
140 
141     private ThreadInitializationContext thread_init_context;
142 
143     /**************************************************************************
144 
145         Constructor.
146 
147         Params:
148             epoll = epoll select dispatcher instance
149             number_of_threads = number of worker threads to allocate
150             make_context = delegate to create a context within a thread
151             thread_stack_size = default stack size to allocate
152 
153     **************************************************************************/
154 
155     public this (EpollSelectDispatcher epoll, int number_of_threads,
156             scope AsyncIO.Context delegate() makeContext = null,
157             long thread_stack_size = 256 * 1024)
158     {
159 
160         this.exception = new ErrnoException;
161 
162         this.scheduler = new AioScheduler(this.exception);
163         this.jobs = new JobQueue(this.exception, this.scheduler);
164         this.nonblocking = new typeof(this.nonblocking);
165         this.blocking = new typeof(this.blocking);
166 
167         // create worker threads
168         this.threads.length = number_of_threads;
169         this.thread_init_context.to_create = number_of_threads;
170 
171         this.thread_init_context.job_queue = this.jobs;
172         this.thread_init_context.makeContext = makeContext;
173         exception.enforceRetCode!(pthread_mutex_init).call(
174                 &this.thread_init_context.init_mutex, null);
175         exception.enforceRetCode!(pthread_cond_init).call(
176                 &this.thread_init_context.init_cond, null);
177 
178         pthread_attr_t attr;
179         pthread_attr_setstacksize(&attr, thread_stack_size);
180 
181         foreach (i, tid; this.threads)
182         {
183             // Create a thread passing this instance as a parameter
184             // to thread's entry point
185             this.exception.enforceRetCode!(pthread_create).call(&this.threads[i],
186                 &attr,
187                 &thread_entry_point!(ThreadInitializationContext),
188                 cast(void*)&this.thread_init_context);
189         }
190 
191         // wait all threads to create
192         pthread_mutex_lock(&this.thread_init_context.init_mutex);
193         while (this.thread_init_context.to_create > 0)
194         {
195             pthread_cond_wait(&thread_init_context.init_cond, &this.thread_init_context.init_mutex);
196         }
197 
198         pthread_mutex_unlock(&this.thread_init_context.init_mutex);
199 
200         epoll.register(this.scheduler);
201     }
202 
203     /**************************************************************************
204 
205         Issues a pread request, blocking the fiber connected to the provided
206         suspended_job until the request finishes.
207 
208         This will read buf.length number of bytes from fd to buf, starting
209         from offset.
210 
211         Params:
212             buf = buffer to fill
213             fd = file descriptor to read from
214             offset = offset in the file to read from
215             suspended_job = JobNotification instance to
216                 block the fiber on
217 
218         Returns:
219             number of the bytes read
220 
221         Throws:
222             ErrnoException with appropriate errno set in case of failure
223 
224     **************************************************************************/
225 
226     public size_t pread (void[] buf, int fd, size_t offset,
227             JobNotification suspended_job)
228     {
229         ssize_t ret_val;
230         int errno_val;
231         auto job = this.jobs.reserveJobSlot(&lock_mutex,
232                 &unlock_mutex);
233 
234         job.recv_buffer.length = buf.length;
235         assumeSafeAppend(job.recv_buffer);
236         job.fd = fd;
237         job.suspended_job = suspended_job;
238         job.offset = offset;
239         job.cmd = Job.Command.Read;
240         job.ret_val = &ret_val;
241         job.errno_val = &errno_val;
242         job.user_buffer = buf;
243         job.finalize_results = &finalizeRead;
244 
245         // Let the threads waiting on the semaphore know that they
246         // can start doing single read
247         post_semaphore(&this.jobs.jobs_available);
248 
249         // Block the fiber
250         suspended_job.wait(job, &this.scheduler.discardResults);
251 
252         // At this point, fiber is resumed,
253         // check the return value and throw if needed
254         if (ret_val == -1)
255         {
256             throw this.exception.set(errno_val,
257                     "pread");
258         }
259 
260         assert(ret_val >= 0);
261         return cast(size_t)ret_val;
262     }
263 
264     /***************************************************************************
265 
266         Appends a buffer to the file.
267 
268         Buffer must be alive during the lifetime of the request (until the
269         notification fires)
270 
271         Returns:
272             number of bytes written
273 
274     **************************************************************************/
275     
276     public size_t write (void[] buf, int fd, JobNotification notification)
277     {
278         ssize_t ret_val;
279         int errno_val;
280         auto job = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex);
281 
282         job.recv_buffer = buf;
283         job.fd = fd;
284         job.suspended_job = notification;
285         job.cmd = Job.Command.Write;
286         job.ret_val = &ret_val;
287         job.errno_val = &errno_val;
288 
289         post_semaphore(&this.jobs.jobs_available); 
290         notification.wait(job, &this.scheduler.discardResults);
291 
292         if (ret_val == -1)
293         {
294             throw this.exception.set(errno_val, "write");
295         }
296 
297         verify(ret_val >= 0);
298         return cast(size_t)ret_val;
299     }
300 
301     /***************************************************************************
302 
303         Calls an user provided delegate. The delegate is called from within a
304         separate thread and it should not do anything non-thread safe (for example,
305         using GC must be avoided) from the runtime's perspective. Delegate receives
306         a reference to the per-thread context which it can use.
307 
308         Params:
309             user_delegate = delegate to call
310             JobNotification = notification used to resume/suspend the caller
311 
312     **************************************************************************/
313 
314     public void callDelegate (scope void delegate(AsyncIO.Context) user_delegate,
315             JobNotification notification)
316     {
317         ssize_t ret_val;
318         int errno_val;
319         auto job = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex);
320 
321         job.user_delegate = user_delegate;
322         job.suspended_job = notification;
323         job.cmd = Job.Command.CallDelegate;
324         job.ret_val = &ret_val;
325         job.errno_val = &errno_val;
326 
327         post_semaphore(&this.jobs.jobs_available);
328         notification.wait(job, &this.scheduler.discardResults);
329 
330         if (ret_val == -1)
331         {
332             throw this.exception.set(errno_val, "delegate call");
333         }
334 
335         verify(ret_val >= 0);
336     }
337 
338     /***************************************************************************
339 
340         Finalizes the read request - copies the contents of receive buffer
341         to user provided buffer.
342 
343         Params:
344             job = job to finalize.
345 
346     ***************************************************************************/
347 
348     private static void finalizeRead (Job* job)
349     {
350         if (job.ret_val !is null)
351         {
352             *job.ret_val = job.return_value;
353         }
354 
355         if (job.return_value >= 0)
356         {
357             auto dest = (job.user_buffer.ptr)[0..job.return_value];
358             copy(dest, job.recv_buffer[0..job.return_value]);
359         }
360     }
361 
362     /***************************************************************************
363 
364         Set of non-blocking methods. The user is responsible to suspend the fiber,
365         and AsyncIO will not resume it. Instead, the callback will be called
366         where the user can do whatever is required.
367 
368     ***************************************************************************/
369 
370     public final class Nonblocking
371     {
372         /**************************************************************************
373 
374             Issues a pread request, filling the buffer as much as possible,
375             expecting the user to suspend the caller manually.
376 
377             This will read buf.length number of bytes from fd to buf, starting
378             from offset.
379 
380             Params:
381                 buf = buffer to fill
382                 ret_val = return value to fill
383                 fd = file descriptor to read from
384                 offset = offset in the file to read from
385                 finish_callback_dg = method to call when the request has finished,
386                     passing the return value of the pread call
387                 suspended_job = suspended job to resume upon finishing the
388                     IO operation and calling finish_callback_dg
389 
390             Returns:
391                 Job that's scheduled
392 
393             Throws:
394                 ErrnoException with appropriate errno set in case of failure
395 
396         **************************************************************************/
397 
398         public Job* pread (void[] buf,
399                 int fd, size_t offset,
400                 JobNotification suspended_job)
401         {
402             auto job = this.outer.jobs.reserveJobSlot(&lock_mutex,
403                     &unlock_mutex);
404 
405             job.recv_buffer.length = buf.length;
406             assumeSafeAppend(job.recv_buffer);
407 
408             job.fd = fd;
409             job.offset = offset;
410             job.cmd = Job.Command.Read;
411             job.user_buffer = buf;
412             job.finalize_results = &finalizeRead;
413             job.suspended_job = suspended_job;
414             suspended_job.register(job, &this.outer.scheduler.discardResults);
415 
416             // Let the threads waiting on the semaphore know that they
417             // can start doing single read
418             post_semaphore(&this.outer.jobs.jobs_available);
419 
420             return job;
421         }
422     }
423 
424     /// Ditto
425     public Nonblocking nonblocking;
426 
427     /// Task wrapper
428     public final class TaskBlocking
429     {
430         import ocean.util.aio.TaskJobNotification;
431         import ocean.task.Task;
432 
433         public size_t write (void[] buf, int fd)
434         {
435             assert (Task.getThis() !is null);
436             scope JobNotification notification = new TaskJobNotification;
437             return this.outer.write(buf, fd, notification);
438         }
439 
440         public size_t pread (void[] buf, int fd, size_t offset)
441         {
442             assert (Task.getThis() !is null);
443             scope JobNotification notification = new TaskJobNotification;
444             return this.outer.pread(buf, fd, offset, notification);
445         }
446 
447         public void callDelegate (scope void delegate(AsyncIO.Context) user_delegate)
448         {
449             assert (Task.getThis() !is null);
450             scope JobNotification notification = new TaskJobNotification;
451             this.outer.callDelegate(user_delegate, notification);
452         }
453     }
454 
455     /// ditto
456     public TaskBlocking blocking;
457 
458     /**************************************************************************
459 
460         Issues a fsync request, blocking the fiber connected to the provided
461         suspended_job until the request finishes.
462 
463         Synchronize a file's in-core state with storage device.
464 
465         Params:
466             fd = file descriptor to perform fsync on
467             suspended_job = JobNotification instance to
468                 block the fiber on
469 
470         Throws:
471             ErrnoException with appropriate errno set in the case of failure
472 
473     **************************************************************************/
474 
475     public void fsync (int fd,
476             JobNotification suspended_job)
477     {
478         long ret_val;
479         int errno_val;
480 
481         auto job = this.jobs.reserveJobSlot(&lock_mutex,
482                 &unlock_mutex);
483 
484         job.fd = fd;
485         job.suspended_job = suspended_job;
486         job.cmd = Job.Command.Fsync;
487         job.ret_val = &ret_val;
488         job.errno_val = &errno_val;
489         job.finalize_results = null;
490 
491         // Let the threads waiting on the semaphore that they
492         // can perform fsync
493         post_semaphore(&this.jobs.jobs_available);
494 
495         // Block the fiber
496         suspended_job.wait(job, &this.scheduler.discardResults);
497 
498         // At this point, fiber is resumed,
499         // check the return value and throw if needed
500         if (ret_val == -1)
501         {
502             throw this.exception.set(errno_val,
503                     "fsync");
504         }
505     }
506 
507     /**************************************************************************
508 
509         Issues a close request, blocking the fiber connected to the provided
510         suspendable request handler until the request finishes.
511 
512         Synchronize a file's in-core state with storage device.
513 
514         Params:
515             fd = file descriptor to close
516             suspended_job = JobNotification instance to
517                 block the caller on
518 
519         Throws:
520             ErrnoException with appropriate errno set in the case of failure
521 
522     **************************************************************************/
523 
524     public void close (int fd,
525             JobNotification suspended_job)
526     {
527         long ret_val;
528         int errno_val;
529 
530         auto job = this.jobs.reserveJobSlot(&lock_mutex,
531                 &unlock_mutex);
532 
533         job.fd = fd;
534         job.suspended_job = suspended_job;
535         job.cmd = Job.Command.Close;
536         job.ret_val = &ret_val;
537         job.errno_val = &errno_val;
538         job.finalize_results = null;
539 
540         post_semaphore(&this.jobs.jobs_available);
541 
542         // Block the fiber
543         suspended_job.wait(job, &this.scheduler.discardResults);
544 
545         // At this point, fiber is resumed,
546         // check the return value and throw if needed
547         if (ret_val == -1)
548         {
549             throw this.exception.set(errno_val,
550                     "close");
551         }
552     }
553 
554     /*********************************************************************
555 
556         Destroys entire AsyncIO object.
557         It's unusable after this point.
558 
559         NOTE: this blocks the calling thread
560 
561         Throws:
562             ErrnoException if one of the underlying system calls
563             failed
564 
565     *********************************************************************/
566 
567     public void destroy ()
568     {
569         assert(!this.destroyed);
570 
571         // Stop all workers
572         // and wait for all threads to exit
573         this.join();
574 
575         this.jobs.destroy(this.exception);
576         this.destroyed = true;
577     }
578 
579     /**************************************************************************
580 
581         Indicate worker threads not to take any more jobs.
582 
583         Throws:
584             ErrnoException if one of the underlying system calls
585             failed
586 
587     **************************************************************************/
588 
589     private void stopAll ()
590     {
591         this.jobs.stop(&lock_mutex,
592                 &unlock_mutex);
593 
594         // Let all potential threads blocked on semaphore
595         // move forward and exit
596         for (int i; i < this.threads.length; i++)
597         {
598             post_semaphore(&this.jobs.jobs_available);
599         }
600     }
601 
602     /**************************************************************************
603 
604         Waits for all threads to finish and checks the exit codes.
605 
606         Throws:
607             ErrnoException if one of the underlying system calls
608             failed
609 
610     **************************************************************************/
611 
612     private void join ()
613     {
614         // We need to tell threads actually to stop working
615         this.stopAll();
616 
617         for (int i = 0; i < this.threads.length; i++)
618         {
619             // Note: no need for mutex guarding this
620             // as this is just an array of ids which
621             // will not change during the program's lifetime
622             void* ret_from_thread;
623             int ret = pthread_join(this.threads[i], &ret_from_thread);
624 
625             switch (ret)
626             {
627                 case 0:
628                     break;
629                 default:
630                     throw this.exception.set(ret, "pthread_join");
631                 case EDEADLK:
632                     assert(false, "Deadlock was detected");
633                 case EINVAL:
634                     assert(false, "Join performed on non-joinable thread" ~
635                             " or another thread is already waiting on it");
636                 case ESRCH:
637                     assert(false, "No thread with this tid can be found");
638             }
639 
640             // Check the return value from the thread routine
641             if (cast(intptr_t)ret_from_thread != 0)
642             {
643                 throw this.exception.set(cast(int)ret_from_thread,
644                         "thread_method");
645             }
646         }
647     }
648 
649     /*********************************************************************
650 
651         Helper function for posting the semaphore value
652         and checking for the return value
653 
654         Params:
655             sem = pointer to the semaphore handle
656 
657 
658     *********************************************************************/
659 
660     private void post_semaphore (sem_t* sem)
661     {
662         int ret = sem_post(sem);
663 
664         switch (ret)
665         {
666             case 0:
667                 break;
668             default:
669                 throw this.exception.set(ret, "sem_post");
670             case EINVAL:
671                 assert(false, "The semaphore is not valid");
672         }
673     }
674 }
675 
676 /// Example showing the task-blocking API
677 unittest
678 {
679     /// Per-thread context. Can be anything, but it needs to inherit
680     // from AsyncIO.Context
681     class AioContext: AsyncIO.Context
682     {
683         int i;
684     }
685 
686     // Creates the per thread context. This is executed inside
687     // each worker thread. Useful to initialize C libraries (e.g.
688     // curl_easy_init)
689     // This method is synchronized, so everything here is thread safe
690     // One must only pay attention not to call methods that need
691     // thread-local state
692     AsyncIO.Context makeContext()
693     {
694         auto ctx = new AioContext;
695 
696         // set some things
697         ctx.i = 0;
698 
699         return ctx;
700     }
701     /// Callback called from another thread to set the counter
702     void setCounter (AsyncIO.Context ctx)
703     {
704         // cast the per-thread context
705         auto myctx = cast(AioContext)ctx;
706         myctx.i++;
707     }
708 
709     void example()
710     {
711         auto async_io = new AsyncIO(theScheduler.epoll, 10, &makeContext);
712 
713         // open a new file
714         auto f = new File("var/output.txt", File.ReadWriteAppending);
715 
716         // write a file in another thread
717         char[] buf = "Hello darkness, my old friend.".dup;
718         async_io.blocking.write(buf, f.fileHandle());
719 
720         // read the file from another thread
721         buf[] = '\0';
722         async_io.blocking.pread(buf, f.fileHandle(), 0);
723 
724         test!("==")(buf[], "Hello darkness, my old friend.");
725         test!("==")(f.length, buf.length);
726 
727         // call the delegate from another thread
728         async_io.blocking.callDelegate(&setCounter);
729     }
730 }