1 /****************************************************************************** 2 3 Wraps a Fiber allowing to pass a message on suspending/resuming and to kill 4 the fiber. 5 6 Allows passing a message from suspend() to resume() and vice versa. 7 Provides a kill() method where kill() resumes suspend() and suspend() throws 8 a KilledException when it was resumed by kill(). 9 10 suspend() and resume() require you to pass a 'token' parameter to them 11 which must be the same for each suspend/resume pair. This prevents that 12 a fiber is resumed from a part of the code that wasn't intended to do so. 13 14 Still, sometimes the correct position in a code could resume a fiber 15 that was waiting for a resume from another instance of the same code 16 (for example, a fiber is being resumed from a wrong class instance). 17 To catch these cases, a runtime-identifier parameter was added, 18 which is just an Object reference. If another object is resuming a fiber 19 an exception is thrown. 20 21 See also the documentation of suspend/resume. 22 23 Debugging: 24 You can use -debug=MessageFiber to print the identifiers that 25 were used in the suspend/resume calls. It uses the FirstNames 26 functions to print pointers as names. 27 28 You can use -debug=MessageFiberDump to enable a function called 29 'dumpFibers' which can be called from within gdb using 30 'call dumpFibers()'. The performance impact should be relatively low. 31 It will output a list on STDERR listing all fibers and some 32 informations about their state. 33 34 Example output: 35 36 Tomsen: State: TERM; Token: GroupRequest; LastSuspend: 1364929361 (157s ago); Addr: 7ff6c9ec8f00; Suspender: Actor0 37 Marine: State: TERM; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9eef100; Suspender: Actor1 38 Robert: State: TERM; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94a00; Suspender: Actor1 39 Batman: State: HOLD; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94300; Suspender: Actor1 40 Mary: State: TERM; Token: event_fired; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7c00; Suspender: Actor3 41 Methew: State: HOLD; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7500; Suspender: Actor1 42 Superman: State: HOLD; Token: DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40800; Suspender: Actor2 43 Methew: State: HOLD; Token: DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40600; Suspender: Actor2 44 45 46 Copyright: 47 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 48 All rights reserved. 49 50 License: 51 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 52 Alternatively, this file may be distributed under the terms of the Tango 53 3-Clause BSD License (see LICENSE_BSD.txt for details). 54 55 ******************************************************************************/ 56 57 module ocean.core.MessageFiber; 58 59 import ocean.meta.types.Qualifiers; 60 import ocean.core.Verify; 61 import ocean.core.Array: copy; 62 import ocean.core.SmartUnion; 63 import ocean.io.digest.Fnv1; 64 import core.thread : Fiber; 65 66 debug ( MessageFiber ) 67 { 68 import ocean.io.Stdout; 69 debug = MessageFiberToken; 70 } 71 72 debug ( MessageFiberDump ) 73 { 74 import ocean.time.Clock; 75 import ocean.core.Array; 76 import core.memory; 77 import ocean.io.Stdout; 78 debug = MessageFiberToken; 79 } 80 81 debug ( MessageFiberToken ) 82 { 83 import ocean.io.digest.FirstName; 84 } 85 86 87 /****************************************************************************** 88 89 Dump information about fibers to STDERR 90 91 ******************************************************************************/ 92 93 debug ( MessageFiberDump ) extern(C) void dumpFibers() 94 { 95 istring[] state_str = [ "HOLD" ,"EXEC" ,"TERM" ]; 96 97 void* wpFiber = MessageFiber.last_fiber; 98 99 while ( wpFiber !is null ) 100 { 101 auto fiber = cast(MessageFiber) GC.weakPointerGet(wpFiber); 102 103 wpFiber = null; 104 105 if ( fiber !is null ) 106 { 107 Stderr.formatln("{,12}: State: {}; Token: {,12}; LastSuspend: {} ({}s ago); " 108 ~ "Addr: {}; Suspender: {}", 109 FirstName(fiber), state_str[fiber.fiber.state], 110 fiber.last.str, fiber.time, 111 fiber.time > 0 ? Clock.now().unix().seconds() - fiber.time : 0, 112 cast(void*) fiber, 113 fiber.suspender); 114 115 wpFiber= fiber.prev_fiber; 116 } 117 } 118 } 119 120 /******************************************************************************/ 121 122 class MessageFiber 123 { 124 debug ( MessageFiberDump ) 125 { 126 /********************************************************************** 127 128 Most recent token 129 130 **********************************************************************/ 131 132 private Token last; 133 134 /********************************************************************** 135 136 Last time suspend was called 137 138 **********************************************************************/ 139 140 private size_t time; 141 142 /********************************************************************** 143 144 Name of the suspender, taken from the identifier parameter 145 Empty string if identifier wasn't given. 146 147 **********************************************************************/ 148 149 private istring suspender; 150 151 /********************************************************************** 152 153 List of weakpointers to all fibers 154 155 **********************************************************************/ 156 157 static private void* last_fiber; 158 159 private void* next_fiber, prev_fiber; 160 } 161 162 /************************************************************************** 163 164 Token struct passed to suspend() and resume() methods in order to ensure 165 that the fiber is resumed for the same reason as it was suspended. The 166 token is constructed from a string, via the static opCall() method. 167 168 Usage example: 169 170 --- 171 172 auto token = MessageFiber.Token("something_happened"); 173 174 fiber.suspend(token); 175 176 // Later on, somewhere else 177 fiber.resume(token); 178 179 --- 180 181 **************************************************************************/ 182 183 public struct Token 184 { 185 /*********************************************************************** 186 187 String used to construct the token. In debug builds this is used for 188 nice message output. 189 190 ***********************************************************************/ 191 192 private debug (MessageFiberToken) istring str; 193 194 /*********************************************************************** 195 196 Hash of string used to construct the token. 197 198 ***********************************************************************/ 199 200 private hash_t hash; 201 202 /*********************************************************************** 203 204 Constructs a token. 205 206 Params: 207 s = string 208 209 Returns: 210 token constructed from the passed string 211 212 ***********************************************************************/ 213 214 public static Token opCall ( istring s ) 215 { 216 Token token; 217 token.hash = Fnv1a64(s); 218 debug (MessageFiberToken) token.str = s; 219 return token; 220 } 221 } 222 223 /************************************************************************** 224 225 Message union 226 227 **************************************************************************/ 228 229 private union Message_ 230 { 231 int num; 232 void* ptr; 233 Object obj; 234 Exception exc; 235 } 236 237 public alias SmartUnion!(Message_) Message; 238 239 /************************************************************************** 240 241 Alias for fiber state 242 243 **************************************************************************/ 244 245 public alias Fiber.State State; 246 247 /************************************************************************** 248 249 KilledException; thrown by suspend() when resumed by kill() 250 251 **************************************************************************/ 252 253 static class KilledException : Exception 254 { 255 this ( ) {super("Fiber killed");} 256 257 void set ( istring file, long line ) 258 { 259 super.file = file; 260 super.line = line; 261 } 262 } 263 264 /************************************************************************** 265 266 ResumedException; thrown by suspend() when resumed with the wrong 267 identifier 268 269 **************************************************************************/ 270 271 static class ResumeException : Exception 272 { 273 this ( ) {super("Resumed with invalid identifier!");} 274 275 ResumeException set ( istring file, long line ) 276 { 277 super.file = file; 278 super.line = line; 279 280 return this; 281 } 282 } 283 284 /************************************************************************** 285 286 Fiber instance. (Protected but can be accessed via the public rawFiber() 287 and reset() methods. The fiber is not simply made public, as derived 288 classes may need to add special behaviour upon getting/setting it.) 289 290 **************************************************************************/ 291 292 protected Fiber fiber; 293 294 /************************************************************************** 295 296 Identifier 297 298 **************************************************************************/ 299 300 private ulong identifier; 301 302 /************************************************************************** 303 304 Message passed between suspend() and resume() 305 306 **************************************************************************/ 307 308 private Message msg; 309 310 /************************************************************************** 311 312 KilledException instance 313 314 **************************************************************************/ 315 316 private KilledException e_killed; 317 318 /************************************************************************** 319 320 ResumeException instance 321 322 **************************************************************************/ 323 324 private ResumeException e_resume; 325 326 /************************************************************************** 327 328 "killed" flag, set by kill() and cleared by resume(). 329 330 **************************************************************************/ 331 332 private bool killed = false; 333 334 /************************************************************************** 335 336 Constructor 337 338 Params: 339 fiber = already created core.thread.Fiber 340 341 **************************************************************************/ 342 343 public this ( Fiber fiber ) 344 { 345 this.fiber = fiber; 346 347 this.e_killed = new KilledException; 348 this.e_resume = new ResumeException; 349 this.msg.num = 0; 350 351 debug(MessageFiberDump) addToList(); 352 353 debug (MessageFiber) Stdout.formatln("--FIBER {} CREATED (fiber ptr {}) --", 354 FirstName(this), cast(void*) this.fiber).flush(); 355 } 356 357 /************************************************************************** 358 359 Constructor 360 361 Params: 362 coroutine = fiber coroutine 363 364 **************************************************************************/ 365 366 public this ( scope void delegate ( ) coroutine ) 367 { 368 this(new Fiber(coroutine)); 369 } 370 371 /************************************************************************** 372 373 Constructor 374 375 Params: 376 coroutine = fiber coroutine 377 sz = fiber stack size 378 379 **************************************************************************/ 380 381 public this ( scope void delegate ( ) coroutine, size_t sz ) 382 { 383 this(new Fiber(coroutine, sz)); 384 } 385 386 /************************************************************************** 387 388 Destructor 389 390 Removes the Fiber from the linked list and destroys its weak pointer 391 392 **************************************************************************/ 393 394 debug(MessageFiberDump) ~this ( ) 395 { 396 auto next = cast(MessageFiber) GC.weakPointerGet(next_fiber); 397 auto prev = cast(MessageFiber) GC.weakPointerGet(prev_fiber); 398 void* me; 399 400 if ( next !is null ) 401 { 402 if ( MessageFiber.last_fiber == next.prev_fiber ) 403 { 404 MessageFiber.last_fiber = next_fiber; 405 } 406 407 me = next.prev_fiber; 408 next.prev_fiber = prev_fiber; 409 } 410 411 if ( prev !is null ) 412 { 413 me = prev.next_fiber; 414 prev.next_fiber = next_fiber; 415 } 416 417 GC.weakPointerDestroy(me); 418 } 419 420 /************************************************************************** 421 422 Adds this fiber to the linked list of Fibers 423 424 **************************************************************************/ 425 426 debug(MessageFiberDump) void addToList ( ) 427 { 428 auto me = GC.weakPointerCreate(this); 429 430 if ( last_fiber !is null ) 431 { 432 auto l = cast(MessageFiber) GC.weakPointerGet(MessageFiber.last_fiber); 433 434 assert ( l !is null ); 435 436 l.next_fiber = me; 437 prev_fiber = last_fiber; 438 } 439 440 last_fiber = me; 441 } 442 443 /************************************************************************** 444 445 Starts the fiber coroutine and waits until it is suspended or finishes. 446 447 Params: 448 msg = message to be returned by the next suspend() call. 449 450 Returns: 451 When the fiber is suspended, the message passed to that suspend() 452 call. It has always an active member, by default num but never exc. 453 454 Throws: 455 Exception if the fiber is suspended by suspendThrow(). 456 457 In: 458 The fiber must not be running (but waiting or finished). 459 460 **************************************************************************/ 461 462 public Message start ( Message msg = Message.init ) 463 out (_msg_out) 464 { 465 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out; 466 467 auto a = msg_out.active; 468 assert (a); 469 assert (a != a.exc); 470 } 471 do 472 { 473 verify( 474 this.fiber.state != this.fiber.State.EXEC, 475 "attempt to start an active fiber" 476 ); 477 478 debug (MessageFiber) 479 { 480 Stdout.formatln( 481 "--FIBER {} (fiber ptr {}) STARTED (from fiber ptr {})", 482 FirstName(this), cast(void*) this.fiber, 483 cast(void*) Fiber.getThis() 484 ).flush(); 485 } 486 487 if (this.fiber.state == this.fiber.State.TERM) 488 { 489 this.fiber.reset(); 490 this.msg.num = 0; 491 } 492 493 Token token; 494 return this.resume(token, null, msg); 495 } 496 497 /************************************************************************** 498 499 Suspends the fiber coroutine and waits until it is resumed or killed. If 500 the active member of msg is msg.exc, exc will be thrown by the resuming 501 start()/resume() call. 502 503 Params: 504 token = token expected to be passed to resume() 505 identifier = reference to the object causing the suspend, use null 506 to not pass anything. The caller to resume must 507 pass the same object reference or else a ResumeException 508 will be thrown inside the fiber 509 msg = message to be returned by the next start()/resume() call. 510 511 Returns: 512 the message passed to the resume() call which made this call resume. 513 Its active member may be exc; for compatibility reasons this method 514 does not throw in this case in contrast to resume(). 515 516 Throws: 517 KilledException if the fiber is killed. 518 519 In: 520 The fiber must be running (not waiting or finished). 521 If the active member of msg is msg.exc, it must not be null. 522 523 **************************************************************************/ 524 525 public Message suspend ( Token token, Object identifier = null, Message msg = Message.init ) 526 out (_msg_out) 527 { 528 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out; 529 assert(msg_out.active); 530 } 531 do 532 { 533 verify ( 534 this.fiber.state == this.fiber.State.EXEC, 535 "attempt to suspend an inactive fiber" 536 ); 537 538 verify(msg.active != msg.active.exc || msg.exc !is null); 539 540 if (!msg.active) 541 { 542 msg.num = 0; 543 } 544 545 scope (exit) 546 { 547 this.msg = msg; 548 549 debug (MessageFiber) 550 { 551 Stdout.formatln( 552 "--FIBER {} (fiber ptr {}) SUSPENDED (from fiber ptr {}) -- ({}:{})", 553 FirstName(this), cast(void*) this.fiber, 554 cast(void*) Fiber.getThis(), token.str, FirstName(identifier) 555 ).flush(); 556 } 557 558 debug ( MessageFiberDump ) 559 { 560 this.last = token; 561 this.time = Clock.now().unix().seconds(); 562 this.suspender = identifier !is null ? identifier.classinfo.name : ""; 563 } 564 565 this.suspend_(); 566 567 if ( this.identifier != this.createIdentifier(token.hash, identifier) ) 568 { 569 throw this.e_resume.set(__FILE__, __LINE__); 570 } 571 } 572 573 return this.msg; 574 } 575 576 /************************************************************************** 577 578 Suspends the fiber coroutine, makes the resuming start()/resume() call 579 throw e and waits until the fiber is resumed or killed. 580 581 Params: 582 token = token expected to be passed to resume() 583 e = Exception instance to be thrown by the next start()/resume() 584 call. 585 586 Returns: 587 the message passed to the resume() call which made this call resume. 588 Its active member may be exc; for compatibility reasons this method 589 does not throw in this case in contrast to resume(). 590 591 Throws: 592 KilledException if the fiber is killed. 593 594 In: 595 e must not be null and the fiber must be running (not waiting or 596 finished). 597 598 **************************************************************************/ 599 600 public Message suspend ( Token token, Exception e ) 601 { 602 verify(e !is null); 603 return this.suspend(token, null, Message(e)); 604 } 605 606 /************************************************************************** 607 608 Resumes the fiber coroutine and waits until it is suspended or 609 terminates. 610 611 Params: 612 token = token expected to have been passed to suspend() 613 identifier = reference to the object causing the resume, use null 614 to not pass anything. Must be the same reference 615 that was used in the suspend call, or else a 616 ResumeException will be thrown inside the fiber. 617 msg = message to be returned by the next suspend() call. 618 619 Returns: 620 The message passed to the suspend() call which made this call 621 resume. 622 623 Throws: 624 if an Exception instance was passed to the suspend() call which made 625 this call be resumed, that Exception instance. 626 627 In: 628 The fiber must be waiting (not running or finished). 629 630 **************************************************************************/ 631 632 public Message resume ( Token token, Object identifier = null, Message msg = Message.init ) 633 out (_msg_out) 634 { 635 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out; 636 637 auto a = msg_out.active; 638 assert (a); 639 assert (a != a.exc); 640 } 641 do 642 { 643 verify( 644 this.fiber.state == this.fiber.State.HOLD, 645 "attempt to resume a non-held fiber" 646 ); 647 648 if (!msg.active) 649 { 650 msg.num = 0; 651 } 652 653 this.identifier = this.createIdentifier(token.hash, identifier); 654 655 debug (MessageFiber) 656 { 657 Stdout.formatln( 658 "--FIBER {} (fiber ptr {}) RESUMED (from fiber ptr {}) -- ({}:{})", 659 FirstName(this), cast(void*) this.fiber, cast(void*) Fiber.getThis(), 660 token.str, FirstName(identifier) 661 ).flush(); 662 } 663 664 scope (exit) this.msg = msg; 665 this.fiber.call(); 666 667 if (this.msg.active == this.msg.active.exc) 668 { 669 throw this.msg.exc; 670 } 671 else 672 { 673 return this.msg; 674 } 675 } 676 677 /************************************************************************** 678 679 Kills the fiber coroutine. That is, resumes it and makes resume() throw 680 a KilledException. 681 682 Param: 683 file = source file (passed to the exception) 684 line = source code line (passed to the exception) 685 686 Returns: 687 When the fiber is suspended by suspend() or finishes. 688 689 In: 690 The fiber must be waiting (not running or finished). 691 692 **************************************************************************/ 693 694 public void kill ( istring file = __FILE__, long line = __LINE__ ) 695 { 696 verify( 697 this.fiber.state == this.fiber.State.HOLD, 698 "attempt to kill a non-held fiber" 699 ); 700 verify(!this.killed); 701 702 debug (MessageFiber) 703 { 704 Stdout.formatln( 705 "--FIBER {} (fiber ptr {}) KILLED (from fiber ptr {}) -- ({}:{})", 706 FirstName(this), cast(void*) this.fiber, 707 cast(void*) Fiber.getThis(), file, line 708 ).flush(); 709 } 710 711 this.killed = true; 712 this.e_killed.set(file, line); 713 714 this.fiber.call(Fiber.Rethrow.no); 715 } 716 717 /************************************************************************** 718 719 Returns: 720 true if the fiber is waiting or false otherwise. 721 722 **************************************************************************/ 723 724 public bool waiting ( ) 725 { 726 return this.fiber.state == this.fiber.State.HOLD; 727 } 728 729 /************************************************************************** 730 731 Returns: 732 true if the fiber is running or false otherwise. 733 734 **************************************************************************/ 735 736 public bool running ( ) 737 { 738 return this.fiber.state == this.fiber.State.EXEC; 739 } 740 741 /************************************************************************** 742 743 Returns: 744 true if the fiber is finished or false otherwise. 745 746 **************************************************************************/ 747 748 public bool finished ( ) 749 { 750 return this.fiber.state == this.fiber.State.TERM; 751 } 752 753 754 /************************************************************************** 755 756 Resets the fiber 757 758 **************************************************************************/ 759 760 public void reset ( ) 761 { 762 this.fiber.reset(); 763 } 764 765 /************************************************************************** 766 767 Resets the fiber and change the coroutine 768 769 Params: 770 coroutine = fiber coroutine function 771 772 **************************************************************************/ 773 774 public void reset ( void function() coroutine ) 775 { 776 this.fiber.reset(coroutine); 777 } 778 779 /************************************************************************** 780 781 Resets the fiber and change the coroutine 782 783 Params: 784 coroutine = fiber coroutine delegate 785 786 **************************************************************************/ 787 788 public void reset ( scope void delegate() coroutine ) 789 { 790 this.fiber.reset(coroutine); 791 } 792 793 794 /************************************************************************** 795 796 Returns: 797 fiber state 798 799 **************************************************************************/ 800 801 public State state ( ) 802 { 803 return this.fiber.state; 804 } 805 806 /************************************************************************** 807 808 Suspends the fiber. 809 810 Throws: 811 suspendThrow() exception if pending 812 813 In: 814 The fiber must be running (not waiting or finished). 815 816 **************************************************************************/ 817 818 private void suspend_ ( ) 819 { 820 verify( 821 this.fiber.state == this.fiber.State.EXEC, 822 "attempt to suspend a non-active fiber" 823 ); 824 verify( 825 this.fiber is Fiber.getThis, 826 "attempt to suspend fiber externally" 827 ); 828 829 this.fiber.yield(); 830 831 if (this.killed) 832 { 833 this.killed = false; 834 throw this.e_killed; 835 } 836 } 837 838 /************************************************************************** 839 840 Direct read-only access to fiber that is used internally by 841 MessageFiber. Most useful for debugging or for sanity checks in 842 contracts. 843 844 Returns: 845 Underlying core.thread.Fiber instance 846 847 **************************************************************************/ 848 849 public Fiber getRawFiber ( ) 850 { 851 return this.fiber; 852 } 853 854 /************************************************************************** 855 856 Allows to change underlying core.thread.Fiber instance 857 858 Params: 859 fiber = new fiber instance to use 860 861 **************************************************************************/ 862 863 public void reset ( Fiber fiber ) 864 { 865 this.fiber = fiber; 866 } 867 868 /************************************************************************** 869 870 Generates an identifier from a hash and an object reference. 871 872 Params: 873 hash = hash to generate identifier from 874 obj = object reference to generate identifier from 875 876 Returns: 877 identifier based on provided hash and object reference (the two are 878 XORed together) 879 880 **************************************************************************/ 881 882 static private ulong createIdentifier ( hash_t hash, Object obj ) 883 { 884 return hash ^ cast(ulong)cast(void*)obj; 885 } 886 }