1 /****************************************************************************** 2 3 Module containing implementation of the request queue for AsyncIO. 4 5 Copyright: 6 Copyright (c) 2018 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE.txt for details. 11 12 ******************************************************************************/ 13 14 module ocean.util.aio.internal.JobQueue; 15 16 import ocean.meta.types.Qualifiers; 17 18 import core.stdc.errno; 19 import core.stdc.stdint; 20 import core.sys.posix.unistd; 21 import core.sys.posix.semaphore; 22 import core.sys.posix.pthread; 23 24 import ocean.sys.ErrnoException; 25 26 import ocean.util.aio.AsyncIO; 27 import ocean.util.aio.JobNotification; 28 import ocean.util.aio.internal.AioScheduler; 29 30 /********************************************************************** 31 32 Single job definition 33 34 **********************************************************************/ 35 36 public static struct Job 37 { 38 import ocean.core.array.Mutation: copy; 39 import ocean.util.aio.internal.MutexOps; 40 41 /****************************************************************** 42 43 Command to be executed for the current request. 44 45 *******************************************************************/ 46 47 public enum Command 48 { 49 Read, 50 Write, 51 Fsync, 52 Close, 53 CallDelegate, 54 } 55 56 /****************************************************************** 57 58 Command to run for this request 59 60 ******************************************************************/ 61 62 public Command cmd; 63 64 /**************************************************************** 65 66 File descriptor of the file to perform 67 the request on 68 69 ****************************************************************/ 70 71 public int fd; 72 73 /**************************************************************** 74 75 Offset from the file to perform the request 76 77 ****************************************************************/ 78 79 public size_t offset; 80 81 /**************************************************************** 82 83 User supplied delegate to call. 84 85 ****************************************************************/ 86 87 public void delegate (AsyncIO.Context) user_delegate; 88 89 /**************************************************************** 90 91 The field to store the return value of the system call. 92 93 ****************************************************************/ 94 95 public ssize_t return_value; 96 97 /**************************************************************** 98 99 Pointer to variable which should receive the return value of 100 the system call. 101 102 ****************************************************************/ 103 104 public ssize_t* ret_val; 105 106 /**************************************************************** 107 108 Pinter to variable which should receive the errno value 109 after the system call 110 111 ****************************************************************/ 112 113 public int* errno_val; 114 115 /**************************************************************** 116 117 JobNotification used to wake the job. 118 119 ****************************************************************/ 120 121 public JobNotification suspended_job; 122 123 /**************************************************************** 124 125 Indicates if the job is being handled within one 126 of the threads 127 128 ****************************************************************/ 129 130 private bool is_taken; 131 132 /**************************************************************** 133 134 Indicates if this slot in the queue is empty 135 and can be taken 136 137 ****************************************************************/ 138 139 private bool is_slot_free; 140 141 /**************************************************************** 142 143 Buffer to be filled by the worker thread. When the job is 144 finalised, the contents are copied into user_buffer. 145 146 ****************************************************************/ 147 148 public void[] recv_buffer; 149 150 /*************************************************************** 151 152 User buffer to copy the results to. NOTE: the reason to have 153 recv_buffer and user_buffer separate is because it might happen 154 that user discards the job and the buffer, while thread is 155 writing into it, in which case the contents of recv_buffer 156 are discarded. 157 158 ****************************************************************/ 159 160 public void[] user_buffer; 161 162 /*************************************************************************** 163 164 Prepares the results, called upon the completion of the request, 165 if the jobs has not been cancelled while waiting for the completion. 166 167 ***************************************************************************/ 168 169 public void function(Job* job) finalize_results; 170 171 /*************************************************************************** 172 173 User-provided method to call when the job has been completed. 174 175 ***************************************************************************/ 176 177 public void delegate(ssize_t)[] finish_callback_dgs; 178 179 /*************************************************************************** 180 181 Job queue where this jobs is currently resident. Used for recycling. 182 183 ***************************************************************************/ 184 185 private JobQueue owner_queue; 186 187 /*************************************************************************** 188 189 Performs any actions at the end of the AIO operation. 190 191 ***************************************************************************/ 192 193 public void finished () 194 { 195 if (this.finalize_results) 196 { 197 this.finalize_results(&this); 198 } 199 200 if (this.finish_callback_dgs.length) 201 { 202 foreach (dg; this.finish_callback_dgs) 203 { 204 dg(this.return_value); 205 } 206 } 207 } 208 209 /*************************************************************************** 210 211 Registers the callback to call after finishing the job. 212 213 Params: 214 dg = delegate to call when the job is finished. 215 216 ***************************************************************************/ 217 218 public Job* registerCallback (scope void delegate(ssize_t) dg) return 219 { 220 this.finish_callback_dgs ~= dg; 221 return &this; 222 } 223 224 /*************************************************************************** 225 226 Recycles the job and marks it free to use by the AIO for the next 227 operation. 228 229 ***************************************************************************/ 230 231 public void recycle () 232 { 233 this.owner_queue.recycleJob(&this, &lock_mutex, &unlock_mutex); 234 } 235 } 236 237 /************************************************************************** 238 239 Pending jobs queue. 240 241 **************************************************************************/ 242 243 public static class JobQueue 244 { 245 import ocean.util.container.LinkedList; 246 247 248 /******************************************************************** 249 250 List containing requests. 251 Note that, since the worker threads are taking 252 pointers to jobs, moving container must not be used 253 as that would invalidate pointers to the exiting jobs 254 held by other threads 255 256 *********************************************************************/ 257 258 private LinkedList!(Job*) jobs; 259 260 261 /******************************************************************** 262 263 Mutex protecting job queue. 264 265 ********************************************************************/ 266 267 private pthread_mutex_t jobs_mutex; 268 269 270 /********************************************************************* 271 272 Indicator if workers should 273 stop doing more work 274 275 *********************************************************************/ 276 277 private bool cancel_further_jobs; 278 279 280 /********************************************************************* 281 282 AioScheduler used to wake the ready jobs. 283 284 *********************************************************************/ 285 286 private AioScheduler scheduler; 287 288 /********************************************************************* 289 290 Constructor 291 292 Params: 293 exception = ErrnoException instance to throw in case 294 initialization failed 295 scheduler = AioScheduler to schedule waking the jobs on 296 297 *********************************************************************/ 298 299 public this (ErrnoException exception, AioScheduler scheduler) 300 { 301 this.jobs = new typeof(this.jobs); 302 this.scheduler = scheduler; 303 304 exception.enforceRetCode!(pthread_mutex_init).call( 305 &this.jobs_mutex, null); 306 307 exception.enforceRetCode!(sem_init).call 308 (&this.jobs_available, 0, 0); // No jobs are ready initially 309 } 310 311 312 /********************************************************************* 313 314 Takes the first jobs in the queue that's not being 315 served by any other thread. 316 317 Params: 318 MutexOp = function or delegate mutex accepting the pointer to the 319 mutex. Used so this method works both with delegate and 320 function. 321 lock_mutex = method to be called to lock a mutex and perform 322 error checking 323 unlock_mutex = method to be called to unlock a mutex and perform 324 error checking 325 326 *********************************************************************/ 327 328 public Job* takeFirstNonTakenJob(MutexOp)(MutexOp lock_mutex, 329 MutexOp unlock_mutex) 330 { 331 lock_mutex(&this.jobs_mutex); 332 scope (exit) 333 { 334 unlock_mutex(&this.jobs_mutex); 335 } 336 337 if (this.cancel_further_jobs) 338 { 339 return null; 340 } 341 342 foreach (ref job; this.jobs) 343 { 344 if (job.is_slot_free == false && job.is_taken == false) 345 { 346 job.is_taken = true; 347 return job; 348 } 349 } 350 351 return null; 352 } 353 354 /********************************************************************* 355 356 Reserves a job slot in the queue. It either reuses 357 existing slot, or allocates a new one if all 358 existing slots are occupied 359 360 Params: 361 MutexOp = function or delegate mutex accepting the pointer to the 362 mutex. Used so this method works both with delegate and 363 function. 364 lock_mutex = method to be called to lock a mutex and perform 365 error checking 366 unlock_mutex = method to be called to unlock a mutex and perform 367 error checking 368 369 Returns: 370 pointer to the job slot in the queue 371 372 *********************************************************************/ 373 374 public Job* reserveJobSlot(MutexOp)(MutexOp lock_mutex, 375 MutexOp unlock_mutex) 376 { 377 lock_mutex(&this.jobs_mutex); 378 scope (exit) 379 { 380 unlock_mutex(&this.jobs_mutex); 381 } 382 383 Job* free_job = null; 384 385 foreach (ref job; this.jobs) 386 { 387 if (job.is_slot_free && !job.is_taken) 388 { 389 free_job = job; 390 break; 391 } 392 } 393 394 if (!free_job) 395 { 396 // adds at the beginning 397 auto new_job = new Job(); 398 this.jobs.add(new_job); 399 free_job = this.jobs.get(0); 400 } 401 402 free_job.is_taken = false; 403 free_job.is_slot_free = false; 404 free_job.owner_queue = this; 405 free_job.finish_callback_dgs.length = 0; 406 assumeSafeAppend(free_job.finish_callback_dgs); 407 free_job.ret_val = null; 408 free_job.errno_val = null; 409 410 return free_job; 411 } 412 413 /********************************************************************* 414 415 Marks the job as ready and schedules the routine to be waken 416 up by the scheduler. 417 418 Params: 419 job = job that has completed 420 lock_mutex = function or delegate to lock the mutex 421 unlock_mutex = function or delegate to unlock the mutex 422 423 *********************************************************************/ 424 425 public void markJobReady(MutexOp) ( Job* job, 426 MutexOp lock_mutex, MutexOp unlock_mutex ) 427 { 428 this.scheduler.requestReady(job, 429 lock_mutex, unlock_mutex); 430 } 431 432 /********************************************************************* 433 434 Recycles the job slot and checks if there are any more jobs to 435 be served. 436 437 Params: 438 MutexOp = function or delegate mutex accepting the pointer to the 439 mutex. Used so this method works both with delegate and 440 function. 441 job = job to recycle 442 lock_mutex = method to be called to lock a mutex and perform 443 error checking 444 unlock_mutex = method to be called to unlock a mutex and perform 445 error checking 446 447 Returns: 448 true if there will be more jobs, false otherwise 449 450 451 *********************************************************************/ 452 453 public bool recycleJob(MutexOp) (Job* job, MutexOp lock_mutex, 454 MutexOp unlock_mutex) 455 { 456 if (this.cancel_further_jobs) 457 return false; 458 459 lock_mutex(&this.jobs_mutex); 460 scope(exit) 461 { 462 unlock_mutex(&this.jobs_mutex); 463 } 464 465 job.is_taken = false; 466 job.is_slot_free = true; 467 468 return !this.cancel_further_jobs; 469 } 470 471 /********************************************************************* 472 473 Tells the queue to stop serving more jobs to workers. 474 475 Params: 476 MutexOp = function or delegate mutex accepting the pointer to the 477 mutex. Used so this method works both with delegate and 478 function. 479 lock_mutex = method to be called to lock a mutex and perform 480 error checking 481 unlock_mutex = method to be called to unlock a mutex and perform 482 error checking 483 484 485 *********************************************************************/ 486 487 public void stop(MutexOp) (MutexOp lock_mutex, 488 MutexOp unlock_mutex) 489 { 490 lock_mutex(&this.jobs_mutex); 491 scope(exit) 492 { 493 unlock_mutex(&this.jobs_mutex); 494 } 495 496 this.cancel_further_jobs = true; 497 } 498 499 /********************************************************************* 500 501 Destructor 502 503 Params: 504 exception = ErrnoException instance to throw in case destruction 505 failed 506 507 *********************************************************************/ 508 509 public void destroy (ErrnoException exception) 510 { 511 // Can only fail if the mutex is still held somewhere. Since users 512 // should already be joined on all threads, that should not be 513 // possible. 514 auto ret = pthread_mutex_destroy(&this.jobs_mutex); 515 516 switch (ret) 517 { 518 case 0: 519 break; 520 default: 521 throw exception.set(ret, "pthread_mutex_destroy"); 522 case EBUSY: 523 assert(false, "Mutex still held"); 524 case EINVAL: 525 assert(false, "Mutex reference is invalid"); 526 } 527 528 ret = sem_destroy(&this.jobs_available); 529 530 switch (ret) 531 { 532 case 0: 533 break; 534 default: 535 throw exception.set(ret, "sem_destroy"); 536 case EINVAL: 537 assert(false, "Semaphore is not valid."); 538 } 539 } 540 541 /******************************************************************** 542 543 Semaphore indicating number of jobs in the request queue. 544 545 ********************************************************************/ 546 547 public sem_t jobs_available; 548 }