1 /****************************************************************************** 2 3 Module for doing non-blocking reads supported by threads. 4 5 This module contains AsyncIO definition. Intented usage of AsyncIO is to 6 perform normally blocking IO calls (disk requests) in fiber-blocking 7 manner. 8 9 Fiber wanting to perform a request should submit its request to AsyncIO 10 using public interface, passing all the arguments normally used by the 11 blocking call and JobNotification instance on which it will be 12 blocked. After issuing the request, request will be put in the queue and 13 the fiber will block immidiatelly, giving chance to other fibers to run. 14 15 In the background, fixed amount of worker threads are taking request from 16 the queue, and performing it (using blocking call which will in turn block 17 this thread). When finished, the worker thread will resume the blocked fiber, 18 and block on the semaphore waiting for the next request. 19 20 Copyright: 21 Copyright (c) 2018 dunnhumby Germany GmbH. 22 All rights reserved. 23 24 License: 25 Boost Software License Version 1.0. See LICENSE.txt for details. 26 27 ******************************************************************************/ 28 29 module ocean.util.aio.AsyncIO; 30 31 import ocean.meta.types.Qualifiers; 32 import ocean.core.Verify; 33 34 import core.stdc.errno; 35 import core.sys.posix.semaphore; 36 import core.sys.posix.pthread; 37 import core.sys.posix.unistd; 38 import core.stdc.stdint; 39 import core.stdc.stdio; 40 import ocean.core.array.Mutation: copy; 41 import ocean.sys.ErrnoException; 42 import ocean.io.select.EpollSelectDispatcher; 43 44 import ocean.util.aio.internal.JobQueue; 45 import ocean.util.aio.internal.ThreadWorker; 46 import ocean.util.aio.internal.MutexOps; 47 import ocean.util.aio.internal.AioScheduler; 48 import ocean.util.aio.JobNotification; 49 50 version (unittest) 51 { 52 import ocean.core.Test; 53 import ocean.task.Scheduler; 54 import ocean.io.device.File; 55 } 56 57 /****************************************************************************** 58 59 Class implementing AsyncIO support. 60 61 ******************************************************************************/ 62 63 class AsyncIO 64 { 65 /************************************************************************** 66 67 Base class for the thread worker context 68 69 **************************************************************************/ 70 71 public static class Context 72 { 73 74 } 75 76 /************************************************************************** 77 78 Ernno exception instance 79 80 NOTE: must be thrown and catched only from/in main thread, as it is not 81 multithreaded-safe 82 83 **************************************************************************/ 84 85 private ErrnoException exception; 86 87 88 /************************************************************************** 89 90 Job queue 91 92 ***************************************************************************/ 93 94 private JobQueue jobs; 95 96 /************************************************************************** 97 98 AioScheduler used to wake the ready jobs. 99 100 **************************************************************************/ 101 102 private AioScheduler scheduler; 103 104 /************************************************************************** 105 106 Handles of worker threads. 107 108 **************************************************************************/ 109 110 private pthread_t[] threads; 111 112 /************************************************************************** 113 114 Indicator if the AsyncIO is destroyed 115 116 **************************************************************************/ 117 118 private bool destroyed; 119 120 /************************************************************************** 121 122 Struct providing the initialization data for the thread. 123 124 **************************************************************************/ 125 126 public struct ThreadInitializationContext 127 { 128 JobQueue job_queue; 129 AsyncIO.Context delegate() makeContext; 130 pthread_mutex_t init_mutex; 131 pthread_cond_t init_cond; 132 int to_create; 133 } 134 135 /************************************************************************** 136 137 Ditto 138 139 **************************************************************************/ 140 141 private ThreadInitializationContext thread_init_context; 142 143 /************************************************************************** 144 145 Constructor. 146 147 Params: 148 epoll = epoll select dispatcher instance 149 number_of_threads = number of worker threads to allocate 150 make_context = delegate to create a context within a thread 151 thread_stack_size = default stack size to allocate 152 153 **************************************************************************/ 154 155 public this (EpollSelectDispatcher epoll, int number_of_threads, 156 scope AsyncIO.Context delegate() makeContext = null, 157 long thread_stack_size = 256 * 1024) 158 { 159 160 this.exception = new ErrnoException; 161 162 this.scheduler = new AioScheduler(this.exception); 163 this.jobs = new JobQueue(this.exception, this.scheduler); 164 this.nonblocking = new typeof(this.nonblocking); 165 this.blocking = new typeof(this.blocking); 166 167 // create worker threads 168 this.threads.length = number_of_threads; 169 this.thread_init_context.to_create = number_of_threads; 170 171 this.thread_init_context.job_queue = this.jobs; 172 this.thread_init_context.makeContext = makeContext; 173 exception.enforceRetCode!(pthread_mutex_init).call( 174 &this.thread_init_context.init_mutex, null); 175 exception.enforceRetCode!(pthread_cond_init).call( 176 &this.thread_init_context.init_cond, null); 177 178 pthread_attr_t attr; 179 pthread_attr_setstacksize(&attr, thread_stack_size); 180 181 foreach (i, tid; this.threads) 182 { 183 // Create a thread passing this instance as a parameter 184 // to thread's entry point 185 this.exception.enforceRetCode!(pthread_create).call(&this.threads[i], 186 &attr, 187 &thread_entry_point!(ThreadInitializationContext), 188 cast(void*)&this.thread_init_context); 189 } 190 191 // wait all threads to create 192 pthread_mutex_lock(&this.thread_init_context.init_mutex); 193 while (this.thread_init_context.to_create > 0) 194 { 195 pthread_cond_wait(&thread_init_context.init_cond, &this.thread_init_context.init_mutex); 196 } 197 198 pthread_mutex_unlock(&this.thread_init_context.init_mutex); 199 200 epoll.register(this.scheduler); 201 } 202 203 /************************************************************************** 204 205 Issues a pread request, blocking the fiber connected to the provided 206 suspended_job until the request finishes. 207 208 This will read buf.length number of bytes from fd to buf, starting 209 from offset. 210 211 Params: 212 buf = buffer to fill 213 fd = file descriptor to read from 214 offset = offset in the file to read from 215 suspended_job = JobNotification instance to 216 block the fiber on 217 218 Returns: 219 number of the bytes read 220 221 Throws: 222 ErrnoException with appropriate errno set in case of failure 223 224 **************************************************************************/ 225 226 public size_t pread (void[] buf, int fd, size_t offset, 227 JobNotification suspended_job) 228 { 229 ssize_t ret_val; 230 int errno_val; 231 auto job = this.jobs.reserveJobSlot(&lock_mutex, 232 &unlock_mutex); 233 234 job.recv_buffer.length = buf.length; 235 assumeSafeAppend(job.recv_buffer); 236 job.fd = fd; 237 job.suspended_job = suspended_job; 238 job.offset = offset; 239 job.cmd = Job.Command.Read; 240 job.ret_val = &ret_val; 241 job.errno_val = &errno_val; 242 job.user_buffer = buf; 243 job.finalize_results = &finalizeRead; 244 245 // Let the threads waiting on the semaphore know that they 246 // can start doing single read 247 post_semaphore(&this.jobs.jobs_available); 248 249 // Block the fiber 250 suspended_job.wait(job, &this.scheduler.discardResults); 251 252 // At this point, fiber is resumed, 253 // check the return value and throw if needed 254 if (ret_val == -1) 255 { 256 throw this.exception.set(errno_val, 257 "pread"); 258 } 259 260 assert(ret_val >= 0); 261 return cast(size_t)ret_val; 262 } 263 264 /*************************************************************************** 265 266 Appends a buffer to the file. 267 268 Buffer must be alive during the lifetime of the request (until the 269 notification fires) 270 271 Returns: 272 number of bytes written 273 274 **************************************************************************/ 275 276 public size_t write (void[] buf, int fd, JobNotification notification) 277 { 278 ssize_t ret_val; 279 int errno_val; 280 auto job = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex); 281 282 job.recv_buffer = buf; 283 job.fd = fd; 284 job.suspended_job = notification; 285 job.cmd = Job.Command.Write; 286 job.ret_val = &ret_val; 287 job.errno_val = &errno_val; 288 289 post_semaphore(&this.jobs.jobs_available); 290 notification.wait(job, &this.scheduler.discardResults); 291 292 if (ret_val == -1) 293 { 294 throw this.exception.set(errno_val, "write"); 295 } 296 297 verify(ret_val >= 0); 298 return cast(size_t)ret_val; 299 } 300 301 /*************************************************************************** 302 303 Calls an user provided delegate. The delegate is called from within a 304 separate thread and it should not do anything non-thread safe (for example, 305 using GC must be avoided) from the runtime's perspective. Delegate receives 306 a reference to the per-thread context which it can use. 307 308 Params: 309 user_delegate = delegate to call 310 JobNotification = notification used to resume/suspend the caller 311 312 **************************************************************************/ 313 314 public void callDelegate (scope void delegate(AsyncIO.Context) user_delegate, 315 JobNotification notification) 316 { 317 ssize_t ret_val; 318 int errno_val; 319 auto job = this.jobs.reserveJobSlot(&lock_mutex, &unlock_mutex); 320 321 job.user_delegate = user_delegate; 322 job.suspended_job = notification; 323 job.cmd = Job.Command.CallDelegate; 324 job.ret_val = &ret_val; 325 job.errno_val = &errno_val; 326 327 post_semaphore(&this.jobs.jobs_available); 328 notification.wait(job, &this.scheduler.discardResults); 329 330 if (ret_val == -1) 331 { 332 throw this.exception.set(errno_val, "delegate call"); 333 } 334 335 verify(ret_val >= 0); 336 } 337 338 /*************************************************************************** 339 340 Finalizes the read request - copies the contents of receive buffer 341 to user provided buffer. 342 343 Params: 344 job = job to finalize. 345 346 ***************************************************************************/ 347 348 private static void finalizeRead (Job* job) 349 { 350 if (job.ret_val !is null) 351 { 352 *job.ret_val = job.return_value; 353 } 354 355 if (job.return_value >= 0) 356 { 357 auto dest = (job.user_buffer.ptr)[0..job.return_value]; 358 copy(dest, job.recv_buffer[0..job.return_value]); 359 } 360 } 361 362 /*************************************************************************** 363 364 Set of non-blocking methods. The user is responsible to suspend the fiber, 365 and AsyncIO will not resume it. Instead, the callback will be called 366 where the user can do whatever is required. 367 368 ***************************************************************************/ 369 370 public final class Nonblocking 371 { 372 /************************************************************************** 373 374 Issues a pread request, filling the buffer as much as possible, 375 expecting the user to suspend the caller manually. 376 377 This will read buf.length number of bytes from fd to buf, starting 378 from offset. 379 380 Params: 381 buf = buffer to fill 382 ret_val = return value to fill 383 fd = file descriptor to read from 384 offset = offset in the file to read from 385 finish_callback_dg = method to call when the request has finished, 386 passing the return value of the pread call 387 suspended_job = suspended job to resume upon finishing the 388 IO operation and calling finish_callback_dg 389 390 Returns: 391 Job that's scheduled 392 393 Throws: 394 ErrnoException with appropriate errno set in case of failure 395 396 **************************************************************************/ 397 398 public Job* pread (void[] buf, 399 int fd, size_t offset, 400 JobNotification suspended_job) 401 { 402 auto job = this.outer.jobs.reserveJobSlot(&lock_mutex, 403 &unlock_mutex); 404 405 job.recv_buffer.length = buf.length; 406 assumeSafeAppend(job.recv_buffer); 407 408 job.fd = fd; 409 job.offset = offset; 410 job.cmd = Job.Command.Read; 411 job.user_buffer = buf; 412 job.finalize_results = &finalizeRead; 413 job.suspended_job = suspended_job; 414 suspended_job.register(job, &this.outer.scheduler.discardResults); 415 416 // Let the threads waiting on the semaphore know that they 417 // can start doing single read 418 post_semaphore(&this.outer.jobs.jobs_available); 419 420 return job; 421 } 422 } 423 424 /// Ditto 425 public Nonblocking nonblocking; 426 427 /// Task wrapper 428 public final class TaskBlocking 429 { 430 import ocean.util.aio.TaskJobNotification; 431 import ocean.task.Task; 432 433 public size_t write (void[] buf, int fd) 434 { 435 assert (Task.getThis() !is null); 436 scope JobNotification notification = new TaskJobNotification; 437 return this.outer.write(buf, fd, notification); 438 } 439 440 public size_t pread (void[] buf, int fd, size_t offset) 441 { 442 assert (Task.getThis() !is null); 443 scope JobNotification notification = new TaskJobNotification; 444 return this.outer.pread(buf, fd, offset, notification); 445 } 446 447 public void callDelegate (scope void delegate(AsyncIO.Context) user_delegate) 448 { 449 assert (Task.getThis() !is null); 450 scope JobNotification notification = new TaskJobNotification; 451 this.outer.callDelegate(user_delegate, notification); 452 } 453 } 454 455 /// ditto 456 public TaskBlocking blocking; 457 458 /************************************************************************** 459 460 Issues a fsync request, blocking the fiber connected to the provided 461 suspended_job until the request finishes. 462 463 Synchronize a file's in-core state with storage device. 464 465 Params: 466 fd = file descriptor to perform fsync on 467 suspended_job = JobNotification instance to 468 block the fiber on 469 470 Throws: 471 ErrnoException with appropriate errno set in the case of failure 472 473 **************************************************************************/ 474 475 public void fsync (int fd, 476 JobNotification suspended_job) 477 { 478 long ret_val; 479 int errno_val; 480 481 auto job = this.jobs.reserveJobSlot(&lock_mutex, 482 &unlock_mutex); 483 484 job.fd = fd; 485 job.suspended_job = suspended_job; 486 job.cmd = Job.Command.Fsync; 487 job.ret_val = &ret_val; 488 job.errno_val = &errno_val; 489 job.finalize_results = null; 490 491 // Let the threads waiting on the semaphore that they 492 // can perform fsync 493 post_semaphore(&this.jobs.jobs_available); 494 495 // Block the fiber 496 suspended_job.wait(job, &this.scheduler.discardResults); 497 498 // At this point, fiber is resumed, 499 // check the return value and throw if needed 500 if (ret_val == -1) 501 { 502 throw this.exception.set(errno_val, 503 "fsync"); 504 } 505 } 506 507 /************************************************************************** 508 509 Issues a close request, blocking the fiber connected to the provided 510 suspendable request handler until the request finishes. 511 512 Synchronize a file's in-core state with storage device. 513 514 Params: 515 fd = file descriptor to close 516 suspended_job = JobNotification instance to 517 block the caller on 518 519 Throws: 520 ErrnoException with appropriate errno set in the case of failure 521 522 **************************************************************************/ 523 524 public void close (int fd, 525 JobNotification suspended_job) 526 { 527 long ret_val; 528 int errno_val; 529 530 auto job = this.jobs.reserveJobSlot(&lock_mutex, 531 &unlock_mutex); 532 533 job.fd = fd; 534 job.suspended_job = suspended_job; 535 job.cmd = Job.Command.Close; 536 job.ret_val = &ret_val; 537 job.errno_val = &errno_val; 538 job.finalize_results = null; 539 540 post_semaphore(&this.jobs.jobs_available); 541 542 // Block the fiber 543 suspended_job.wait(job, &this.scheduler.discardResults); 544 545 // At this point, fiber is resumed, 546 // check the return value and throw if needed 547 if (ret_val == -1) 548 { 549 throw this.exception.set(errno_val, 550 "close"); 551 } 552 } 553 554 /********************************************************************* 555 556 Destroys entire AsyncIO object. 557 It's unusable after this point. 558 559 NOTE: this blocks the calling thread 560 561 Throws: 562 ErrnoException if one of the underlying system calls 563 failed 564 565 *********************************************************************/ 566 567 public void destroy () 568 { 569 assert(!this.destroyed); 570 571 // Stop all workers 572 // and wait for all threads to exit 573 this.join(); 574 575 this.jobs.destroy(this.exception); 576 this.destroyed = true; 577 } 578 579 /************************************************************************** 580 581 Indicate worker threads not to take any more jobs. 582 583 Throws: 584 ErrnoException if one of the underlying system calls 585 failed 586 587 **************************************************************************/ 588 589 private void stopAll () 590 { 591 this.jobs.stop(&lock_mutex, 592 &unlock_mutex); 593 594 // Let all potential threads blocked on semaphore 595 // move forward and exit 596 for (int i; i < this.threads.length; i++) 597 { 598 post_semaphore(&this.jobs.jobs_available); 599 } 600 } 601 602 /************************************************************************** 603 604 Waits for all threads to finish and checks the exit codes. 605 606 Throws: 607 ErrnoException if one of the underlying system calls 608 failed 609 610 **************************************************************************/ 611 612 private void join () 613 { 614 // We need to tell threads actually to stop working 615 this.stopAll(); 616 617 for (int i = 0; i < this.threads.length; i++) 618 { 619 // Note: no need for mutex guarding this 620 // as this is just an array of ids which 621 // will not change during the program's lifetime 622 void* ret_from_thread; 623 int ret = pthread_join(this.threads[i], &ret_from_thread); 624 625 switch (ret) 626 { 627 case 0: 628 break; 629 default: 630 throw this.exception.set(ret, "pthread_join"); 631 case EDEADLK: 632 assert(false, "Deadlock was detected"); 633 case EINVAL: 634 assert(false, "Join performed on non-joinable thread" ~ 635 " or another thread is already waiting on it"); 636 case ESRCH: 637 assert(false, "No thread with this tid can be found"); 638 } 639 640 // Check the return value from the thread routine 641 if (cast(intptr_t)ret_from_thread != 0) 642 { 643 throw this.exception.set(cast(int)ret_from_thread, 644 "thread_method"); 645 } 646 } 647 } 648 649 /********************************************************************* 650 651 Helper function for posting the semaphore value 652 and checking for the return value 653 654 Params: 655 sem = pointer to the semaphore handle 656 657 658 *********************************************************************/ 659 660 private void post_semaphore (sem_t* sem) 661 { 662 int ret = sem_post(sem); 663 664 switch (ret) 665 { 666 case 0: 667 break; 668 default: 669 throw this.exception.set(ret, "sem_post"); 670 case EINVAL: 671 assert(false, "The semaphore is not valid"); 672 } 673 } 674 } 675 676 /// Example showing the task-blocking API 677 unittest 678 { 679 /// Per-thread context. Can be anything, but it needs to inherit 680 // from AsyncIO.Context 681 class AioContext: AsyncIO.Context 682 { 683 int i; 684 } 685 686 // Creates the per thread context. This is executed inside 687 // each worker thread. Useful to initialize C libraries (e.g. 688 // curl_easy_init) 689 // This method is synchronized, so everything here is thread safe 690 // One must only pay attention not to call methods that need 691 // thread-local state 692 AsyncIO.Context makeContext() 693 { 694 auto ctx = new AioContext; 695 696 // set some things 697 ctx.i = 0; 698 699 return ctx; 700 } 701 /// Callback called from another thread to set the counter 702 void setCounter (AsyncIO.Context ctx) 703 { 704 // cast the per-thread context 705 auto myctx = cast(AioContext)ctx; 706 myctx.i++; 707 } 708 709 void example() 710 { 711 auto async_io = new AsyncIO(theScheduler.epoll, 10, &makeContext); 712 713 // open a new file 714 auto f = new File("var/output.txt", File.ReadWriteAppending); 715 716 // write a file in another thread 717 char[] buf = "Hello darkness, my old friend.".dup; 718 async_io.blocking.write(buf, f.fileHandle()); 719 720 // read the file from another thread 721 buf[] = '\0'; 722 async_io.blocking.pread(buf, f.fileHandle(), 0); 723 724 test!("==")(buf[], "Hello darkness, my old friend."); 725 test!("==")(f.length, buf.length); 726 727 // call the delegate from another thread 728 async_io.blocking.callDelegate(&setCounter); 729 } 730 }