1 /******************************************************************************
2 3 Module for doing non-blocking reads supported by threads.
4 5 This module contains AsyncIO definition. Intented usage of AsyncIO is to
6 perform normally blocking IO calls (disk requests) in fiber-blocking
7 manner.
8 9 Fiber wanting to perform a request should submit its request to AsyncIO
10 using public interface, passing all the arguments normally used by the
11 blocking call and JobNotification instance on which it will be
12 blocked. After issuing the request, request will be put in the queue and
13 the fiber will block immidiatelly, giving chance to other fibers to run.
14 15 In the background, fixed amount of worker threads are taking request from
16 the queue, and performing it (using blocking call which will in turn block
17 this thread). When finished, the worker thread will resume the blocked fiber,
18 and block on the semaphore waiting for the next request.
19 20 Copyright:
21 Copyright (c) 2018 dunnhumby Germany GmbH.
22 All rights reserved.
23 24 License:
25 Boost Software License Version 1.0. See LICENSE.txt for details.
26 27 ******************************************************************************/28 29 moduleocean.util.aio.AsyncIO;
30 31 importocean.meta.types.Qualifiers;
32 importocean.core.Verify;
33 34 importcore.stdc.errno;
35 importcore.sys.posix.semaphore;
36 importcore.sys.posix.pthread;
37 importcore.sys.posix.unistd;
38 importcore.stdc.stdint;
39 importcore.stdc.stdio;
40 importocean.core.array.Mutation: copy;
41 importocean.sys.ErrnoException;
42 importocean.io.select.EpollSelectDispatcher;
43 44 importocean.util.aio.internal.JobQueue;
45 importocean.util.aio.internal.ThreadWorker;
46 importocean.util.aio.internal.MutexOps;
47 importocean.util.aio.internal.AioScheduler;
48 importocean.util.aio.JobNotification;
49 50 version (unittest)
51 {
52 importocean.core.Test;
53 importocean.task.Scheduler;
54 importocean.io.device.File;
55 }
56 57 /******************************************************************************
58 59 Class implementing AsyncIO support.
60 61 ******************************************************************************/62 63 classAsyncIO64 {
65 /**************************************************************************
66 67 Base class for the thread worker context
68 69 **************************************************************************/70 71 publicstaticclassContext72 {
73 74 }
75 76 /**************************************************************************
77 78 Ernno exception instance
79 80 NOTE: must be thrown and catched only from/in main thread, as it is not
81 multithreaded-safe
82 83 **************************************************************************/84 85 privateErrnoExceptionexception;
86 87 88 /**************************************************************************
89 90 Job queue
91 92 ***************************************************************************/93 94 privateJobQueuejobs;
95 96 /**************************************************************************
97 98 AioScheduler used to wake the ready jobs.
99 100 **************************************************************************/101 102 privateAioSchedulerscheduler;
103 104 /**************************************************************************
105 106 Handles of worker threads.
107 108 **************************************************************************/109 110 privatepthread_t[] threads;
111 112 /**************************************************************************
113 114 Indicator if the AsyncIO is destroyed
115 116 **************************************************************************/117 118 privatebooldestroyed;
119 120 /**************************************************************************
121 122 Struct providing the initialization data for the thread.
123 124 **************************************************************************/125 126 publicstructThreadInitializationContext127 {
128 JobQueuejob_queue;
129 AsyncIO.Contextdelegate() makeContext;
130 pthread_mutex_tinit_mutex;
131 pthread_cond_tinit_cond;
132 intto_create;
133 }
134 135 /**************************************************************************
136 137 Ditto
138 139 **************************************************************************/140 141 privateThreadInitializationContextthread_init_context;
142 143 /**************************************************************************
144 145 Constructor.
146 147 Params:
148 epoll = epoll select dispatcher instance
149 number_of_threads = number of worker threads to allocate
150 make_context = delegate to create a context within a thread
151 thread_stack_size = default stack size to allocate
152 153 **************************************************************************/154 155 publicthis (EpollSelectDispatcherepoll, intnumber_of_threads,
156 scopeAsyncIO.Contextdelegate() makeContext = null,
157 longthread_stack_size = 256 * 1024)
158 {
159 160 this.exception = newErrnoException;
161 162 this.scheduler = newAioScheduler(this.exception);
163 this.jobs = newJobQueue(this.exception, this.scheduler);
164 this.nonblocking = newtypeof(this.nonblocking);
165 this.blocking = newtypeof(this.blocking);
166 167 // create worker threads168 this.threads.length = number_of_threads;
169 this.thread_init_context.to_create = number_of_threads;
170 171 this.thread_init_context.job_queue = this.jobs;
172 this.thread_init_context.makeContext = makeContext;
173 exception.enforceRetCode!(pthread_mutex_init).call(
174 &this.thread_init_context.init_mutex, null);
175 exception.enforceRetCode!(pthread_cond_init).call(
176 &this.thread_init_context.init_cond, null);
177 178 pthread_attr_tattr;
179 pthread_attr_setstacksize(&attr, thread_stack_size);
180 181 foreach (i, tid; this.threads)
182 {
183 // Create a thread passing this instance as a parameter184 // to thread's entry point185 this.exception.enforceRetCode!(pthread_create).call(&this.threads[i],
186 &attr,
187 &thread_entry_point!(ThreadInitializationContext),
188 cast(void*)&this.thread_init_context);
189 }
190 191 // wait all threads to create192 pthread_mutex_lock(&this.thread_init_context.init_mutex);
193 while (this.thread_init_context.to_create > 0)
194 {
195 pthread_cond_wait(&thread_init_context.init_cond, &this.thread_init_context.init_mutex);
196 }
197 198 pthread_mutex_unlock(&this.thread_init_context.init_mutex);
199 200 epoll.register(this.scheduler);
201 }
202 203 /**************************************************************************
204 205 Issues a pread request, blocking the fiber connected to the provided
206 suspended_job until the request finishes.
207 208 This will read buf.length number of bytes from fd to buf, starting
209 from offset.
210 211 Params:
212 buf = buffer to fill
213 fd = file descriptor to read from
214 offset = offset in the file to read from
215 suspended_job = JobNotification instance to
216 block the fiber on
217 218 Returns:
219 number of the bytes read
220 221 Throws:
222 ErrnoException with appropriate errno set in case of failure
223 224 **************************************************************************/225 226 publicsize_tpread (void[] buf, intfd, size_toffset,
227 JobNotificationsuspended_job)
228 {
229 ssize_tret_val;
230 interrno_val;
231 autojob = this.jobs.reserveJobSlot(&lock_mutex,
232 &unlock_mutex);
233 234 job.recv_buffer.length = buf.length;
235 assumeSafeAppend(job.recv_buffer);
236 job.fd = fd;
237 job.suspended_job = suspended_job;
238 job.offset = offset;
239 job.cmd = Job.Command.Read;
240 job.ret_val = &ret_val;
241 job.errno_val = &errno_val;
242 job.user_buffer = buf;
243 job.finalize_results = &finalizeRead;
244 245 // Let the threads waiting on the semaphore know that they246 // can start doing single read247 post_semaphore(&this.jobs.jobs_available);
248 249 // Block the fiber250 suspended_job.wait(job, &this.scheduler.discardResults);
251 252 // At this point, fiber is resumed,253 // check the return value and throw if needed254 if (ret_val == -1)
255 {
256 throwthis.exception.set(errno_val,
257 "pread");
258 }
259 260 assert(ret_val >= 0);
261 returncast(size_t)ret_val;
262 }
263 264 /***************************************************************************
265 266 Appends a buffer to the file.
267 268 Buffer must be alive during the lifetime of the request (until the
269 notification fires)
270 271 Returns:
272 number of bytes written
273 274 **************************************************************************/275 276 publicsize_twrite (void[] buf, intfd, JobNotificationnotification)
277 {
278 ssize_tret_val;
279 interrno_val;
280 autojob = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex);
281 282 job.recv_buffer = buf;
283 job.fd = fd;
284 job.suspended_job = notification;
285 job.cmd = Job.Command.Write;
286 job.ret_val = &ret_val;
287 job.errno_val = &errno_val;
288 289 post_semaphore(&this.jobs.jobs_available);
290 notification.wait(job, &this.scheduler.discardResults);
291 292 if (ret_val == -1)
293 {
294 throwthis.exception.set(errno_val, "write");
295 }
296 297 verify(ret_val >= 0);
298 returncast(size_t)ret_val;
299 }
300 301 /***************************************************************************
302 303 Calls an user provided delegate. The delegate is called from within a
304 separate thread and it should not do anything non-thread safe (for example,
305 using GC must be avoided) from the runtime's perspective. Delegate receives
306 a reference to the per-thread context which it can use.
307 308 Params:
309 user_delegate = delegate to call
310 JobNotification = notification used to resume/suspend the caller
311 312 **************************************************************************/313 314 publicvoidcallDelegate (scopevoiddelegate(AsyncIO.Context) user_delegate,
315 JobNotificationnotification)
316 {
317 ssize_tret_val;
318 interrno_val;
319 autojob = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex);
320 321 job.user_delegate = user_delegate;
322 job.suspended_job = notification;
323 job.cmd = Job.Command.CallDelegate;
324 job.ret_val = &ret_val;
325 job.errno_val = &errno_val;
326 327 post_semaphore(&this.jobs.jobs_available);
328 notification.wait(job, &this.scheduler.discardResults);
329 330 if (ret_val == -1)
331 {
332 throwthis.exception.set(errno_val, "delegate call");
333 }
334 335 verify(ret_val >= 0);
336 }
337 338 /***************************************************************************
339 340 Finalizes the read request - copies the contents of receive buffer
341 to user provided buffer.
342 343 Params:
344 job = job to finalize.
345 346 ***************************************************************************/347 348 privatestaticvoidfinalizeRead (Job* job)
349 {
350 if (job.ret_val !isnull)
351 {
352 *job.ret_val = job.return_value;
353 }
354 355 if (job.return_value >= 0)
356 {
357 autodest = (job.user_buffer.ptr)[0..job.return_value];
358 copy(dest, job.recv_buffer[0..job.return_value]);
359 }
360 }
361 362 /***************************************************************************
363 364 Set of non-blocking methods. The user is responsible to suspend the fiber,
365 and AsyncIO will not resume it. Instead, the callback will be called
366 where the user can do whatever is required.
367 368 ***************************************************************************/369 370 publicfinalclassNonblocking371 {
372 /**************************************************************************
373 374 Issues a pread request, filling the buffer as much as possible,
375 expecting the user to suspend the caller manually.
376 377 This will read buf.length number of bytes from fd to buf, starting
378 from offset.
379 380 Params:
381 buf = buffer to fill
382 ret_val = return value to fill
383 fd = file descriptor to read from
384 offset = offset in the file to read from
385 finish_callback_dg = method to call when the request has finished,
386 passing the return value of the pread call
387 suspended_job = suspended job to resume upon finishing the
388 IO operation and calling finish_callback_dg
389 390 Returns:
391 Job that's scheduled
392 393 Throws:
394 ErrnoException with appropriate errno set in case of failure
395 396 **************************************************************************/397 398 publicJob* pread (void[] buf,
399 intfd, size_toffset,
400 JobNotificationsuspended_job)
401 {
402 autojob = this.outer.jobs.reserveJobSlot(&lock_mutex,
403 &unlock_mutex);
404 405 job.recv_buffer.length = buf.length;
406 assumeSafeAppend(job.recv_buffer);
407 408 job.fd = fd;
409 job.offset = offset;
410 job.cmd = Job.Command.Read;
411 job.user_buffer = buf;
412 job.finalize_results = &finalizeRead;
413 job.suspended_job = suspended_job;
414 suspended_job.register(job, &this.outer.scheduler.discardResults);
415 416 // Let the threads waiting on the semaphore know that they417 // can start doing single read418 post_semaphore(&this.outer.jobs.jobs_available);
419 420 returnjob;
421 }
422 }
423 424 /// Ditto425 publicNonblockingnonblocking;
426 427 /// Task wrapper428 publicfinalclassTaskBlocking429 {
430 importocean.util.aio.TaskJobNotification;
431 importocean.task.Task;
432 433 publicsize_twrite (void[] buf, intfd)
434 {
435 assert (Task.getThis() !isnull);
436 scopeJobNotificationnotification = newTaskJobNotification;
437 returnthis.outer.write(buf, fd, notification);
438 }
439 440 publicsize_tpread (void[] buf, intfd, size_toffset)
441 {
442 assert (Task.getThis() !isnull);
443 scopeJobNotificationnotification = newTaskJobNotification;
444 returnthis.outer.pread(buf, fd, offset, notification);
445 }
446 447 publicvoidcallDelegate (scopevoiddelegate(AsyncIO.Context) user_delegate)
448 {
449 assert (Task.getThis() !isnull);
450 scopeJobNotificationnotification = newTaskJobNotification;
451 this.outer.callDelegate(user_delegate, notification);
452 }
453 }
454 455 /// ditto456 publicTaskBlockingblocking;
457 458 /**************************************************************************
459 460 Issues a fsync request, blocking the fiber connected to the provided
461 suspended_job until the request finishes.
462 463 Synchronize a file's in-core state with storage device.
464 465 Params:
466 fd = file descriptor to perform fsync on
467 suspended_job = JobNotification instance to
468 block the fiber on
469 470 Throws:
471 ErrnoException with appropriate errno set in the case of failure
472 473 **************************************************************************/474 475 publicvoidfsync (intfd,
476 JobNotificationsuspended_job)
477 {
478 longret_val;
479 interrno_val;
480 481 autojob = this.jobs.reserveJobSlot(&lock_mutex,
482 &unlock_mutex);
483 484 job.fd = fd;
485 job.suspended_job = suspended_job;
486 job.cmd = Job.Command.Fsync;
487 job.ret_val = &ret_val;
488 job.errno_val = &errno_val;
489 job.finalize_results = null;
490 491 // Let the threads waiting on the semaphore that they492 // can perform fsync493 post_semaphore(&this.jobs.jobs_available);
494 495 // Block the fiber496 suspended_job.wait(job, &this.scheduler.discardResults);
497 498 // At this point, fiber is resumed,499 // check the return value and throw if needed500 if (ret_val == -1)
501 {
502 throwthis.exception.set(errno_val,
503 "fsync");
504 }
505 }
506 507 /**************************************************************************
508 509 Issues a close request, blocking the fiber connected to the provided
510 suspendable request handler until the request finishes.
511 512 Synchronize a file's in-core state with storage device.
513 514 Params:
515 fd = file descriptor to close
516 suspended_job = JobNotification instance to
517 block the caller on
518 519 Throws:
520 ErrnoException with appropriate errno set in the case of failure
521 522 **************************************************************************/523 524 publicvoidclose (intfd,
525 JobNotificationsuspended_job)
526 {
527 longret_val;
528 interrno_val;
529 530 autojob = this.jobs.reserveJobSlot(&lock_mutex,
531 &unlock_mutex);
532 533 job.fd = fd;
534 job.suspended_job = suspended_job;
535 job.cmd = Job.Command.Close;
536 job.ret_val = &ret_val;
537 job.errno_val = &errno_val;
538 job.finalize_results = null;
539 540 post_semaphore(&this.jobs.jobs_available);
541 542 // Block the fiber543 suspended_job.wait(job, &this.scheduler.discardResults);
544 545 // At this point, fiber is resumed,546 // check the return value and throw if needed547 if (ret_val == -1)
548 {
549 throwthis.exception.set(errno_val,
550 "close");
551 }
552 }
553 554 /*********************************************************************
555 556 Destroys entire AsyncIO object.
557 It's unusable after this point.
558 559 NOTE: this blocks the calling thread
560 561 Throws:
562 ErrnoException if one of the underlying system calls
563 failed
564 565 *********************************************************************/566 567 publicvoiddestroy ()
568 {
569 assert(!this.destroyed);
570 571 // Stop all workers572 // and wait for all threads to exit573 this.join();
574 575 this.jobs.destroy(this.exception);
576 this.destroyed = true;
577 }
578 579 /**************************************************************************
580 581 Indicate worker threads not to take any more jobs.
582 583 Throws:
584 ErrnoException if one of the underlying system calls
585 failed
586 587 **************************************************************************/588 589 privatevoidstopAll ()
590 {
591 this.jobs.stop(&lock_mutex,
592 &unlock_mutex);
593 594 // Let all potential threads blocked on semaphore595 // move forward and exit596 for (inti; i < this.threads.length; i++)
597 {
598 post_semaphore(&this.jobs.jobs_available);
599 }
600 }
601 602 /**************************************************************************
603 604 Waits for all threads to finish and checks the exit codes.
605 606 Throws:
607 ErrnoException if one of the underlying system calls
608 failed
609 610 **************************************************************************/611 612 privatevoidjoin ()
613 {
614 // We need to tell threads actually to stop working615 this.stopAll();
616 617 for (inti = 0; i < this.threads.length; i++)
618 {
619 // Note: no need for mutex guarding this620 // as this is just an array of ids which621 // will not change during the program's lifetime622 void* ret_from_thread;
623 intret = pthread_join(this.threads[i], &ret_from_thread);
624 625 switch (ret)
626 {
627 case0:
628 break;
629 default:
630 throwthis.exception.set(ret, "pthread_join");
631 caseEDEADLK:
632 assert(false, "Deadlock was detected");
633 caseEINVAL:
634 assert(false, "Join performed on non-joinable thread" ~
635 " or another thread is already waiting on it");
636 caseESRCH:
637 assert(false, "No thread with this tid can be found");
638 }
639 640 // Check the return value from the thread routine641 if (cast(intptr_t)ret_from_thread != 0)
642 {
643 throwthis.exception.set(cast(int)ret_from_thread,
644 "thread_method");
645 }
646 }
647 }
648 649 /*********************************************************************
650 651 Helper function for posting the semaphore value
652 and checking for the return value
653 654 Params:
655 sem = pointer to the semaphore handle
656 657 658 *********************************************************************/659 660 privatevoidpost_semaphore (sem_t* sem)
661 {
662 intret = sem_post(sem);
663 664 switch (ret)
665 {
666 case0:
667 break;
668 default:
669 throwthis.exception.set(ret, "sem_post");
670 caseEINVAL:
671 assert(false, "The semaphore is not valid");
672 }
673 }
674 }
675 676 /// Example showing the task-blocking API677 unittest678 {
679 /// Per-thread context. Can be anything, but it needs to inherit680 // from AsyncIO.Context681 classAioContext: AsyncIO.Context682 {
683 inti;
684 }
685 686 // Creates the per thread context. This is executed inside687 // each worker thread. Useful to initialize C libraries (e.g.688 // curl_easy_init)689 // This method is synchronized, so everything here is thread safe690 // One must only pay attention not to call methods that need691 // thread-local state692 AsyncIO.ContextmakeContext()
693 {
694 autoctx = newAioContext;
695 696 // set some things697 ctx.i = 0;
698 699 returnctx;
700 }
701 /// Callback called from another thread to set the counter702 voidsetCounter (AsyncIO.Contextctx)
703 {
704 // cast the per-thread context705 automyctx = cast(AioContext)ctx;
706 myctx.i++;
707 }
708 709 voidexample()
710 {
711 autoasync_io = newAsyncIO(theScheduler.epoll, 10, &makeContext);
712 713 // open a new file714 autof = newFile("var/output.txt", File.ReadWriteAppending);
715 716 // write a file in another thread717 char[] buf = "Hello darkness, my old friend.".dup;
718 async_io.blocking.write(buf, f.fileHandle());
719 720 // read the file from another thread721 buf[] = '\0';
722 async_io.blocking.pread(buf, f.fileHandle(), 0);
723 724 test!("==")(buf[], "Hello darkness, my old friend.");
725 test!("==")(f.length, buf.length);
726 727 // call the delegate from another thread728 async_io.blocking.callDelegate(&setCounter);
729 }
730 }