1 /******************************************************************************* 2 3 Defines the fundamental task abstraction. 4 5 `Task`s are responsible for: 6 - defining the actual function to execute as a task 7 - defining suspend/resume semantics on top of the core Fiber semantics 8 9 The `TaskWith` class, derived from `Task`, provides the facility of tasks 10 with customised suspend/resume semantics, as specified by one of more 11 extensions. 12 13 Copyright: 14 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 15 All rights reserved. 16 17 License: 18 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 19 Alternatively, this file may be distributed under the terms of the Tango 20 3-Clause BSD License (see LICENSE_BSD.txt for details). 21 22 *******************************************************************************/ 23 24 module ocean.task.Task; 25 26 27 static import core.thread; 28 29 import ocean.meta.types.Qualifiers; 30 import ocean.core.array.Mutation : moveToEnd, reverse; 31 import ocean.core.Buffer; 32 import ocean.core.Verify; 33 import ocean.task.internal.TaskExtensionMixins; 34 import ocean.task.IScheduler; 35 36 version (unittest) 37 { 38 import ocean.core.Test; 39 } 40 41 debug (TaskScheduler) 42 { 43 import ocean.io.Stdout; 44 } 45 46 /******************************************************************************* 47 48 Fiber sub-class used by the scheduler to run tasks in. In addition to the 49 functionality of the base fiber, it also: 50 1. stores a reference to the task currently being executed 51 2. can be stored in an object pool 52 53 *******************************************************************************/ 54 55 public class WorkerFiber : core.thread.Fiber 56 { 57 /*************************************************************************** 58 59 Allows smooth integration of WorkerFiber with object pool 60 61 ***************************************************************************/ 62 63 public size_t object_pool_index; 64 65 /*************************************************************************** 66 67 If not null, refers to the Task object currently being executed in this 68 fiber 69 70 ***************************************************************************/ 71 72 package Task active_task; 73 74 /*************************************************************************** 75 76 Returns: 77 the task object currently being executed in this fiber or null if 78 there isn't one 79 80 ***************************************************************************/ 81 82 public Task activeTask ( ) 83 { 84 return this.active_task; 85 } 86 87 /*************************************************************************** 88 89 Terminates itself by killing underlying task 90 91 ***************************************************************************/ 92 93 public void kill ( ) 94 { 95 this.active_task.kill(); 96 } 97 98 /*************************************************************************** 99 100 Constructor 101 102 Worker fibers are always created with a dummy empty function as an 103 entry point and are intented to be reset this later to the real task to 104 be executed. 105 106 Params: 107 stack_size = stack size of the new fiber to allocate 108 109 ***************************************************************************/ 110 111 public this ( size_t stack_size ) 112 { 113 super(() {} , stack_size); 114 // Calls itself once to get into the TERM state. The D2 runtime doesn't 115 // allow creating a fiber with no function attached and neither runtime 116 // allows resetting a fiber which is not in the TERM state 117 this.call(); 118 assert (this.state() == core.thread.Fiber.State.TERM); 119 } 120 } 121 122 /******************************************************************************* 123 124 Exception class that indicates that current task must be terminated. It 125 can be used to forcefully kill the task while still properly cleaning 126 current stack frame. 127 128 *******************************************************************************/ 129 130 public class TaskKillException : Exception 131 { 132 public this ( istring file = __FILE__, int line = __LINE__ ) 133 { 134 super("Task was killed", file, line); 135 } 136 } 137 138 /******************************************************************************* 139 140 Minimal usable Task class. 141 142 Serves as a base for all other task classes. Provides the following 143 functionality: 144 * assigning a task to an arbitrary fiber (`assignTo()`) 145 * wraps the abstract `run()` method in a try/catch statement which 146 rethrows any unhandled exceptions 147 148 *******************************************************************************/ 149 150 public abstract class Task 151 { 152 /*************************************************************************** 153 154 Thrown from within the task to force early termination. One instance 155 is used by all tasks as this exception must never be caught. 156 157 ***************************************************************************/ 158 159 private static TaskKillException kill_exception; 160 161 /*************************************************************************** 162 163 Static constructor. Initializes the exception that indicates the task 164 must be terminated. 165 166 ***************************************************************************/ 167 168 public static this ( ) 169 { 170 Task.kill_exception = new TaskKillException; 171 } 172 173 /*************************************************************************** 174 175 Bitmask representing internal task state 176 177 ***************************************************************************/ 178 179 private TaskState state_bitmask; 180 181 /*************************************************************************** 182 183 List of hooks that needs to be fired after Task has been terminated. 184 185 Delegates will be called both if task terminates routinely and if 186 it terminates dues to unhandled exception / gets killed. 187 188 ***************************************************************************/ 189 190 private Buffer!(void delegate()) termination_hooks; 191 192 /*************************************************************************** 193 194 Reserved index field which ensures that any Task derivative can be 195 used with ObjectPool. That comes at minor cost of one unused size_t 196 per Task instance if not needed which is not a problem. 197 198 ***************************************************************************/ 199 200 public size_t object_pool_index; 201 202 /*************************************************************************** 203 204 Fiber this task executes in 205 206 This field is declared as public because qualified package protection 207 is only available in D2. Please don't use it in applications 208 directly. 209 210 ***************************************************************************/ 211 212 /* package(ocean.task) */ 213 public WorkerFiber fiber; 214 215 /*************************************************************************** 216 217 Returns: 218 current task reference if there is one running, null otherwise 219 220 ***************************************************************************/ 221 222 public static Task getThis ( ) 223 { 224 auto worker = cast(WorkerFiber) core.thread.Fiber.getThis(); 225 if (worker !is null) 226 return worker.activeTask(); 227 else 228 return null; 229 } 230 231 /*************************************************************************** 232 233 Constructor. Used only to insert debug trace message. 234 235 ***************************************************************************/ 236 237 public this ( ) 238 { 239 debug_trace("'{}' <{}> has been created", this.classinfo.name, 240 cast(void*) this); 241 } 242 243 /*************************************************************************** 244 245 Assigns the task to a fiber. In most cases you need to use 246 `Scheduler.schedule` instead. 247 248 In simple applications there tends to be 1-to-1 relation between task 249 and fiber it executes in. However in highly concurrent server apps 250 it may be necessary to maintain a separate task queue because of 251 memory consumption reasons (fiber has to allocate a stack for itself 252 which doesn't allow having too many of them). Such functionality 253 is provided by `ocean.task.Scheduler`. 254 255 Params: 256 fiber = the fiber to assign the task to 257 entry_point = optional custom entry point to replace 258 `this.entryPoint` 259 260 ***************************************************************************/ 261 262 public void assignTo ( WorkerFiber fiber, 263 scope void delegate() entry_point = null ) 264 { 265 this.state_bitmask = TaskState.None; 266 267 this.fiber = fiber; 268 this.fiber.active_task = this; 269 if (fiber.state == fiber.state.TERM) 270 { 271 // cast to ignore return value 272 this.fiber.reset(entry_point ? entry_point 273 : cast(void delegate()) &this.entryPoint); 274 } 275 } 276 277 /*************************************************************************** 278 279 Suspends execution of this task. 280 281 ***************************************************************************/ 282 283 public void suspend ( ) 284 { 285 assert (this.fiber !is null); 286 verify (this.fiber is core.thread.Fiber.getThis()); 287 verify (this.fiber.state == this.fiber.state.EXEC); 288 289 debug_trace("<{}> is suspending itself", cast(void*) this); 290 this.fiber.yield(); 291 292 if (this.state_bitmask & TaskState.ToKill) 293 throw Task.kill_exception; 294 } 295 296 /*************************************************************************** 297 298 Resumes execution of this task. If task has not been started yet, 299 starts it. 300 301 ***************************************************************************/ 302 303 public void resume ( ) 304 { 305 assert (this.fiber !is null); 306 assert (this.fiber.state != this.fiber.state.EXEC); 307 308 auto resumer = Task.getThis(); 309 assert (resumer !is this); 310 verify( 311 resumer is null || !resumer.finished(), 312 "Use `theScheduler.resume(task)` to resume a task from termination" 313 ~ " hook of another task" 314 ); 315 316 debug (TaskScheduler) 317 { 318 if (resumer is null) 319 { 320 debug_trace("<{}> has been resumed by main thread or event loop", 321 cast(void*) this); 322 } 323 else 324 { 325 debug_trace("<{}> has been resumed by <{}>", 326 cast(void*) this, cast(void*) resumer); 327 } 328 } 329 330 this.fiber.call(); 331 } 332 333 /*************************************************************************** 334 335 Registers a termination hook that will be executed when the Task is 336 killed. 337 338 Params: 339 hook = delegate to be called after the task terminates 340 341 ***************************************************************************/ 342 343 public void terminationHook (scope void delegate() hook) 344 { 345 this.termination_hooks ~= hook; 346 } 347 348 /*************************************************************************** 349 350 Unregisters a termination hook that would be executed when the Task is 351 killed. 352 353 Params: 354 hook = delegate that would be called when the task terminates 355 356 ***************************************************************************/ 357 358 public void removeTerminationHook (scope void delegate() hook) 359 { 360 this.termination_hooks.length = .moveToEnd(this.termination_hooks[], hook); 361 } 362 363 /*************************************************************************** 364 365 Returns: 366 true if the fiber is suspended 367 368 ***************************************************************************/ 369 370 final public bool suspended ( ) 371 { 372 if (this.fiber is null) 373 return false; 374 return this.fiber.state() == core.thread.Fiber.State.HOLD; 375 } 376 377 /*************************************************************************** 378 379 Returns: 380 true if this task was started and reached end of `run` method 381 382 ***************************************************************************/ 383 384 final public bool finished ( ) 385 { 386 return (this.state_bitmask & TaskState.Finished) > 0; 387 } 388 389 /*************************************************************************** 390 391 Forces abnormal termination for the task by throwing special 392 exception instance. 393 394 ***************************************************************************/ 395 396 public void kill ( istring file = __FILE__, int line = __LINE__ ) 397 { 398 Task.kill_exception.file = file; 399 Task.kill_exception.line = line; 400 401 if (this is Task.getThis()) 402 throw Task.kill_exception; 403 else 404 { 405 this.state_bitmask |= TaskState.ToKill; 406 this.resume(); 407 } 408 } 409 410 /*************************************************************************** 411 412 Method that will be run by scheduler when task finishes. Must be 413 overridden by specific task class to reset reusable resources. 414 415 It is public so that both scheduler can access it and derivatives can 416 override it. No one but scheduler must call this method. 417 418 ***************************************************************************/ 419 420 public void recycle ( ) { } 421 422 /*************************************************************************** 423 424 Method that must be overridden in actual application/library task 425 classes to provide entry point. 426 427 ***************************************************************************/ 428 429 protected abstract void run ( ); 430 431 /*************************************************************************** 432 433 Internal wrapper around `this.run()` which is used as primary fiber 434 entry point and ensures any uncaught exception propagates to the 435 context that has started this task. 436 437 Returns: 438 'true' if the task terminated with unhandled exception, 'false' 439 otherwise 440 441 ***************************************************************************/ 442 443 /* package(ocean.task) */ 444 public final bool entryPoint ( ) 445 { 446 debug_trace("<{}> start of main function", cast(void*) this); 447 448 scope(exit) 449 { 450 this.state_bitmask |= TaskState.Finished; 451 452 if (this.termination_hooks.length) 453 { 454 debug_trace("Calling {} termination_hooks for task <{}>", 455 this.termination_hooks.length, cast(void*) this); 456 457 auto hooks = reverse(this.termination_hooks[]); 458 this.termination_hooks.reset(); 459 460 foreach (hook; hooks) 461 { 462 hook(); 463 assert( 464 this.termination_hooks.length == 0, 465 "Adding new hooks while running existing " ~ 466 "ones is not supported" 467 ); 468 } 469 } 470 471 // allow task to recycle any shared resources it may have 472 // (or recycle task instance itself) 473 // 474 // NB: this must be the final part of the method and relies 475 // on assumption that no other fiber will have any chance 476 // to start executing this task until it actually reaches 477 // end of the method 478 debug_trace("Recycling task <{}>", cast(void*) this); 479 this.recycle(); 480 } 481 482 try 483 { 484 assert (this.fiber is core.thread.Fiber.getThis()); 485 assert (this is Task.getThis()); 486 this.run(); 487 } 488 catch (TaskKillException) 489 { 490 debug_trace("<{}> termination (killed)", cast(void*) this); 491 this.state_bitmask &= ~cast(int) TaskState.ToKill; 492 return false; 493 } 494 catch (Exception e) 495 { 496 debug_trace("<{}> termination (uncaught exception): {} ({}:{})", 497 cast(void*) this, e.message(), e.file, e.line); 498 499 // After yielding to rethrow exception this task has to be resumed 500 // by someone to finish cleanup and keep reusing its worker fiber. 501 // If scheduler is available, it can do it automatically, otherwise 502 // it becomes reponsibility of the caller. 503 504 if (isSchedulerUsed()) 505 theScheduler.delayedResume(this); 506 this.fiber.yieldAndThrow(e); 507 508 // if task was resumed with `kill` from 'pending cleanup' state 509 // just quit the method as if it was originally terminated by kill 510 if (this.state_bitmask & TaskState.ToKill) 511 { 512 debug_trace("<{}> termination (killed)", cast(void*) this); 513 return false; 514 } 515 else 516 return true; 517 } 518 519 debug_trace("<{}> termination (end of main function)", cast(void*) this); 520 return false; 521 } 522 } 523 524 /// 525 unittest 526 { 527 // represents some limited resource used by this task (e.g. memory or a 528 // file handle) 529 class LimitedResourceHandle { } 530 LimitedResourceHandle getResource ( ) { return null; } 531 void releaseResource ( LimitedResourceHandle ) { } 532 533 // example custom task class 534 class MyTask : Task 535 { 536 LimitedResourceHandle resource; 537 538 override public void run ( ) 539 { 540 this.resource = getResource(); 541 } 542 543 override public void recycle ( ) 544 { 545 releaseResource(this.resource); 546 } 547 } 548 549 // Example of running a task by manually spawning a worker fiber. 550 // More commonly, it is instead done via ocean.task.Scheduler. 551 auto task = new MyTask; 552 // a large stack size is important for debug traces to not crash tests: 553 task.assignTo(new WorkerFiber(10240)); 554 task.resume(); 555 } 556 557 unittest 558 { 559 // test killing 560 561 class MyTask : Task 562 { 563 bool clean_finish = false; 564 565 override protected void run ( ) 566 { 567 this.suspend(); 568 this.clean_finish = true; 569 } 570 } 571 572 auto task = new MyTask; 573 task.assignTo(new WorkerFiber(10240)); 574 task.resume(); 575 576 task.kill(); 577 test(!task.clean_finish); 578 test!("==")(task.fiber.state, core.thread.Fiber.State.TERM); 579 test(task.finished()); 580 } 581 582 unittest 583 { 584 // test context sanity 585 586 class TestTask : Task 587 { 588 Task task; 589 WorkerFiber fiber; 590 591 override public void run ( ) 592 { 593 this.fiber = cast(WorkerFiber) WorkerFiber.getThis(); 594 this.suspend(); 595 this.task = Task.getThis(); 596 } 597 } 598 599 test(Task.getThis() is null); // outside of task 600 601 auto task = new TestTask; 602 auto worker = new WorkerFiber(10240); 603 604 task.assignTo(worker); 605 test(worker.activeTask() is task); 606 607 task.resume(); 608 test(task.fiber is worker); 609 test(task.task is null); 610 611 task.resume(); 612 test(task.task is task); 613 } 614 615 unittest 616 { 617 // test exception forwarding semantics 618 619 // If the scheduler was initialized before, it will have one pending task from 620 // the thrown exception. Drop it before proceeding. 621 622 dropScheduler(); 623 624 class ExceptionInternal : Exception 625 { 626 this ( ) 627 { 628 super("internal"); 629 } 630 } 631 632 class TestTask : Task 633 { 634 override public void run ( ) 635 { 636 throw new ExceptionInternal; 637 } 638 } 639 640 auto task = new TestTask; 641 auto worker = new WorkerFiber(10240); 642 643 task.assignTo(worker); 644 testThrown!(ExceptionInternal)(task.resume()); 645 test!("==")(task.fiber.state, core.thread.Fiber.State.HOLD); 646 task.resume(); 647 test!("==")(task.fiber.state, core.thread.Fiber.State.TERM); 648 } 649 650 unittest 651 { 652 // test TaskKillException 653 654 class TestTask : Task 655 { 656 override public void run ( ) 657 { 658 throw new TaskKillException; 659 } 660 } 661 662 auto task = new TestTask; 663 auto worker = new WorkerFiber(10240); 664 665 task.assignTo(worker); 666 task.resume(); 667 test!("==")(task.fiber.state, core.thread.Fiber.State.TERM); 668 } 669 670 /******************************************************************************* 671 672 `Task` descendant which supports extensions that alter the semantics of 673 suspending and resuming the task. An arbitrary number of extensions may be 674 specified (see Params). 675 676 Each extension must be a struct which defines one or more of the following 677 methods: 678 - void onBeforeSuspend ( ) 679 - void onBeforeResume ( ) 680 - void onResumed ( ) // called right after execution gets back to task 681 682 There is no `onSuspended` hook because it would be executed in the context 683 of the caller fiber, right after the current task yields. Such a context 684 tends to be neither well-defined nor useful in practice. 685 686 The relevant extension methods are called before `this.suspend` / 687 `this.resume` in the same order as they are supplied via the template 688 argument list. 689 690 The relevant extension methods are called after `this.suspend` / 691 `this.resume` in the reverse order that they are supplied via the template 692 argument list. 693 694 Params: 695 Extensions = variadic template argument list of extensions to use 696 697 *******************************************************************************/ 698 699 public class TaskWith ( Extensions... ) : Task 700 { 701 mixin (genExtensionAggregate!(Extensions)()); 702 703 /*************************************************************************** 704 705 Constructor 706 707 Allows extensions to get information about host task if they have 708 matching `reset` method defined. 709 710 ***************************************************************************/ 711 712 this ( ) 713 { 714 foreach (ref extension; this.extensions.tupleof) 715 { 716 static if (is(typeof(extension.reset(this)))) 717 extension.reset(this); 718 } 719 } 720 721 /*************************************************************************** 722 723 Suspends this task, calls extension methods before and after 724 suspending (if there are any). 725 726 ***************************************************************************/ 727 728 override public void suspend ( ) 729 { 730 foreach (ref extension; this.extensions.tupleof) 731 { 732 static if (is(typeof(extension.onBeforeSuspend()))) 733 extension.onBeforeSuspend(); 734 } 735 736 super.suspend(); 737 738 foreach_reverse (ref extension; this.extensions.tupleof) 739 { 740 static if (is(typeof(extension.onResumed()))) 741 extension.onResumed(); 742 } 743 } 744 745 /*************************************************************************** 746 747 Resumes this task, calls extension methods before resuming 748 (if there are any). 749 750 ***************************************************************************/ 751 752 override public void resume ( ) 753 { 754 foreach (ref extension; this.extensions.tupleof) 755 { 756 static if (is(typeof(extension.onBeforeResume()))) 757 extension.onBeforeResume(); 758 } 759 760 super.resume(); 761 } 762 763 /*************************************************************************** 764 765 Ensures extensions are reset to initial state when task is assigned 766 to new worker fiber. 767 768 ***************************************************************************/ 769 770 override public void assignTo ( WorkerFiber fiber, 771 scope void delegate() entry_point = null ) 772 { 773 super.assignTo(fiber, entry_point); 774 775 foreach (ref extension; this.extensions.tupleof) 776 { 777 static if (is(typeof(extension.reset(this)))) 778 extension.reset(this); 779 else 780 extension = extension.init; 781 } 782 } 783 } 784 785 /// 786 unittest 787 { 788 // see tests/examples in ocean.task.extensions.* 789 } 790 791 /// Values for task state bitmask 792 private enum TaskState : ubyte 793 { 794 None = 0b0000_0000, 795 796 /// If this flag is set, task will try to kill itself as soon at is 797 /// resumed by throwing TaskKillException. 798 ToKill = 0b0000_0001, 799 /// This flag is set when `run` method of the task finishes 800 Finished = 0b0000_0010, 801 } 802 803 private void debug_trace ( T... ) ( cstring format, T args ) 804 { 805 debug ( TaskScheduler ) 806 { 807 Stdout.formatln( "[ocean.task.Task] " ~ format, args ).flush(); 808 } 809 }