1 /******************************************************************************
2
3 Wraps a Fiber allowing to pass a message on suspending/resuming and to kill
4 the fiber.
5
6 Allows passing a message from suspend() to resume() and vice versa.
7 Provides a kill() method where kill() resumes suspend() and suspend() throws
8 a KilledException when it was resumed by kill().
9
10 suspend() and resume() require you to pass a 'token' parameter to them
11 which must be the same for each suspend/resume pair. This prevents that
12 a fiber is resumed from a part of the code that wasn't intended to do so.
13
14 Still, sometimes the correct position in a code could resume a fiber
15 that was waiting for a resume from another instance of the same code
16 (for example, a fiber is being resumed from a wrong class instance).
17 To catch these cases, a runtime-identifier parameter was added,
18 which is just an Object reference. If another object is resuming a fiber
19 an exception is thrown.
20
21 See also the documentation of suspend/resume.
22
23 Debugging:
24 You can use -debug=MessageFiber to print the identifiers that
25 were used in the suspend/resume calls. It uses the FirstNames
26 functions to print pointers as names.
27
28 You can use -debug=MessageFiberDump to enable a function called
29 'dumpFibers' which can be called from within gdb using
30 'call dumpFibers()'. The performance impact should be relatively low.
31 It will output a list on STDERR listing all fibers and some
32 informations about their state.
33
34 Example output:
35
36 Tomsen: State: TERM; Token: GroupRequest; LastSuspend: 1364929361 (157s ago); Addr: 7ff6c9ec8f00; Suspender: Actor0
37 Marine: State: TERM; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9eef100; Suspender: Actor1
38 Robert: State: TERM; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94a00; Suspender: Actor1
39 Batman: State: HOLD; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94300; Suspender: Actor1
40 Mary: State: TERM; Token: event_fired; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7c00; Suspender: Actor3
41 Methew: State: HOLD; Token: io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7500; Suspender: Actor1
42 Superman: State: HOLD; Token: DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40800; Suspender: Actor2
43 Methew: State: HOLD; Token: DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40600; Suspender: Actor2
44
45
46 Copyright:
47 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
48 All rights reserved.
49
50 License:
51 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
52 Alternatively, this file may be distributed under the terms of the Tango
53 3-Clause BSD License (see LICENSE_BSD.txt for details).
54
55 ******************************************************************************/
56
57 module ocean.core.MessageFiber;
58
59 import ocean.meta.types.Qualifiers;
60 import ocean.core.Verify;
61 import ocean.core.Array: copy;
62 import ocean.core.SmartUnion;
63 import ocean.io.digest.Fnv1;
64 import core.thread : Fiber;
65
66 debug ( MessageFiber )
67 {
68 import ocean.io.Stdout;
69 debug = MessageFiberToken;
70 }
71
72 debug ( MessageFiberDump )
73 {
74 import ocean.time.Clock;
75 import ocean.core.Array;
76 import core.memory;
77 import ocean.io.Stdout;
78 debug = MessageFiberToken;
79 }
80
81 debug ( MessageFiberToken )
82 {
83 import ocean.io.digest.FirstName;
84 }
85
86
87 /******************************************************************************
88
89 Dump information about fibers to STDERR
90
91 ******************************************************************************/
92
93 debug ( MessageFiberDump ) extern(C) void dumpFibers()
94 {
95 istring[] state_str = [ "HOLD" ,"EXEC" ,"TERM" ];
96
97 void* wpFiber = MessageFiber.last_fiber;
98
99 while ( wpFiber !is null )
100 {
101 auto fiber = cast(MessageFiber) GC.weakPointerGet(wpFiber);
102
103 wpFiber = null;
104
105 if ( fiber !is null )
106 {
107 Stderr.formatln("{,12}: State: {}; Token: {,12}; LastSuspend: {} ({}s ago); "
108 ~ "Addr: {}; Suspender: {}",
109 FirstName(fiber), state_str[fiber.fiber.state],
110 fiber.last.str, fiber.time,
111 fiber.time > 0 ? Clock.now().unix().seconds() - fiber.time : 0,
112 cast(void*) fiber,
113 fiber.suspender);
114
115 wpFiber= fiber.prev_fiber;
116 }
117 }
118 }
119
120 /******************************************************************************/
121
122 class MessageFiber
123 {
124 debug ( MessageFiberDump )
125 {
126 /**********************************************************************
127
128 Most recent token
129
130 **********************************************************************/
131
132 private Token last;
133
134 /**********************************************************************
135
136 Last time suspend was called
137
138 **********************************************************************/
139
140 private size_t time;
141
142 /**********************************************************************
143
144 Name of the suspender, taken from the identifier parameter
145 Empty string if identifier wasn't given.
146
147 **********************************************************************/
148
149 private istring suspender;
150
151 /**********************************************************************
152
153 List of weakpointers to all fibers
154
155 **********************************************************************/
156
157 static private void* last_fiber;
158
159 private void* next_fiber, prev_fiber;
160 }
161
162 /**************************************************************************
163
164 Token struct passed to suspend() and resume() methods in order to ensure
165 that the fiber is resumed for the same reason as it was suspended. The
166 token is constructed from a string, via the static opCall() method.
167
168 Usage example:
169
170 ---
171
172 auto token = MessageFiber.Token("something_happened");
173
174 fiber.suspend(token);
175
176 // Later on, somewhere else
177 fiber.resume(token);
178
179 ---
180
181 **************************************************************************/
182
183 public struct Token
184 {
185 /***********************************************************************
186
187 String used to construct the token. In debug builds this is used for
188 nice message output.
189
190 ***********************************************************************/
191
192 private debug (MessageFiberToken) istring str;
193
194 /***********************************************************************
195
196 Hash of string used to construct the token.
197
198 ***********************************************************************/
199
200 private hash_t hash;
201
202 /***********************************************************************
203
204 Constructs a token.
205
206 Params:
207 s = string
208
209 Returns:
210 token constructed from the passed string
211
212 ***********************************************************************/
213
214 public static Token opCall ( istring s )
215 {
216 Token token;
217 token.hash = Fnv1a64(s);
218 debug (MessageFiberToken) token.str = s;
219 return token;
220 }
221 }
222
223 /**************************************************************************
224
225 Message union
226
227 **************************************************************************/
228
229 private union Message_
230 {
231 int num;
232 void* ptr;
233 Object obj;
234 Exception exc;
235 }
236
237 public alias SmartUnion!(Message_) Message;
238
239 /**************************************************************************
240
241 Alias for fiber state
242
243 **************************************************************************/
244
245 public alias Fiber.State State;
246
247 /**************************************************************************
248
249 KilledException; thrown by suspend() when resumed by kill()
250
251 **************************************************************************/
252
253 static class KilledException : Exception
254 {
255 this ( ) {super("Fiber killed");}
256
257 void set ( istring file, long line )
258 {
259 super.file = file;
260 super.line = line;
261 }
262 }
263
264 /**************************************************************************
265
266 ResumedException; thrown by suspend() when resumed with the wrong
267 identifier
268
269 **************************************************************************/
270
271 static class ResumeException : Exception
272 {
273 this ( ) {super("Resumed with invalid identifier!");}
274
275 ResumeException set ( istring file, long line )
276 {
277 super.file = file;
278 super.line = line;
279
280 return this;
281 }
282 }
283
284 /**************************************************************************
285
286 Fiber instance. (Protected but can be accessed via the public rawFiber()
287 and reset() methods. The fiber is not simply made public, as derived
288 classes may need to add special behaviour upon getting/setting it.)
289
290 **************************************************************************/
291
292 protected Fiber fiber;
293
294 /**************************************************************************
295
296 Identifier
297
298 **************************************************************************/
299
300 private ulong identifier;
301
302 /**************************************************************************
303
304 Message passed between suspend() and resume()
305
306 **************************************************************************/
307
308 private Message msg;
309
310 /**************************************************************************
311
312 KilledException instance
313
314 **************************************************************************/
315
316 private KilledException e_killed;
317
318 /**************************************************************************
319
320 ResumeException instance
321
322 **************************************************************************/
323
324 private ResumeException e_resume;
325
326 /**************************************************************************
327
328 "killed" flag, set by kill() and cleared by resume().
329
330 **************************************************************************/
331
332 private bool killed = false;
333
334 /**************************************************************************
335
336 Constructor
337
338 Params:
339 fiber = already created core.thread.Fiber
340
341 **************************************************************************/
342
343 public this ( Fiber fiber )
344 {
345 this.fiber = fiber;
346
347 this.e_killed = new KilledException;
348 this.e_resume = new ResumeException;
349 this.msg.num = 0;
350
351 debug(MessageFiberDump) addToList();
352
353 debug (MessageFiber) Stdout.formatln("--FIBER {} CREATED (fiber ptr {}) --",
354 FirstName(this), cast(void*) this.fiber).flush();
355 }
356
357 /**************************************************************************
358
359 Constructor
360
361 Params:
362 coroutine = fiber coroutine
363
364 **************************************************************************/
365
366 public this ( scope void delegate ( ) coroutine )
367 {
368 this(new Fiber(coroutine));
369 }
370
371 /**************************************************************************
372
373 Constructor
374
375 Params:
376 coroutine = fiber coroutine
377 sz = fiber stack size
378
379 **************************************************************************/
380
381 public this ( scope void delegate ( ) coroutine, size_t sz )
382 {
383 this(new Fiber(coroutine, sz));
384 }
385
386 /**************************************************************************
387
388 Destructor
389
390 Removes the Fiber from the linked list and destroys its weak pointer
391
392 **************************************************************************/
393
394 debug(MessageFiberDump) ~this ( )
395 {
396 auto next = cast(MessageFiber) GC.weakPointerGet(next_fiber);
397 auto prev = cast(MessageFiber) GC.weakPointerGet(prev_fiber);
398 void* me;
399
400 if ( next !is null )
401 {
402 if ( MessageFiber.last_fiber == next.prev_fiber )
403 {
404 MessageFiber.last_fiber = next_fiber;
405 }
406
407 me = next.prev_fiber;
408 next.prev_fiber = prev_fiber;
409 }
410
411 if ( prev !is null )
412 {
413 me = prev.next_fiber;
414 prev.next_fiber = next_fiber;
415 }
416
417 GC.weakPointerDestroy(me);
418 }
419
420 /**************************************************************************
421
422 Adds this fiber to the linked list of Fibers
423
424 **************************************************************************/
425
426 debug(MessageFiberDump) void addToList ( )
427 {
428 auto me = GC.weakPointerCreate(this);
429
430 if ( last_fiber !is null )
431 {
432 auto l = cast(MessageFiber) GC.weakPointerGet(MessageFiber.last_fiber);
433
434 assert ( l !is null );
435
436 l.next_fiber = me;
437 prev_fiber = last_fiber;
438 }
439
440 last_fiber = me;
441 }
442
443 /**************************************************************************
444
445 Starts the fiber coroutine and waits until it is suspended or finishes.
446
447 Params:
448 msg = message to be returned by the next suspend() call.
449
450 Returns:
451 When the fiber is suspended, the message passed to that suspend()
452 call. It has always an active member, by default num but never exc.
453
454 Throws:
455 Exception if the fiber is suspended by suspendThrow().
456
457 In:
458 The fiber must not be running (but waiting or finished).
459
460 **************************************************************************/
461
462 public Message start ( Message msg = Message.init )
463 out (_msg_out)
464 {
465 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
466
467 auto a = msg_out.active;
468 assert (a);
469 assert (a != a.exc);
470 }
471 do
472 {
473 verify(
474 this.fiber.state != this.fiber.State.EXEC,
475 "attempt to start an active fiber"
476 );
477
478 debug (MessageFiber)
479 {
480 Stdout.formatln(
481 "--FIBER {} (fiber ptr {}) STARTED (from fiber ptr {})",
482 FirstName(this), cast(void*) this.fiber,
483 cast(void*) Fiber.getThis()
484 ).flush();
485 }
486
487 if (this.fiber.state == this.fiber.State.TERM)
488 {
489 this.fiber.reset();
490 this.msg.num = 0;
491 }
492
493 Token token;
494 return this.resume(token, null, msg);
495 }
496
497 /**************************************************************************
498
499 Suspends the fiber coroutine and waits until it is resumed or killed. If
500 the active member of msg is msg.exc, exc will be thrown by the resuming
501 start()/resume() call.
502
503 Params:
504 token = token expected to be passed to resume()
505 identifier = reference to the object causing the suspend, use null
506 to not pass anything. The caller to resume must
507 pass the same object reference or else a ResumeException
508 will be thrown inside the fiber
509 msg = message to be returned by the next start()/resume() call.
510
511 Returns:
512 the message passed to the resume() call which made this call resume.
513 Its active member may be exc; for compatibility reasons this method
514 does not throw in this case in contrast to resume().
515
516 Throws:
517 KilledException if the fiber is killed.
518
519 In:
520 The fiber must be running (not waiting or finished).
521 If the active member of msg is msg.exc, it must not be null.
522
523 **************************************************************************/
524
525 public Message suspend ( Token token, Object identifier = null, Message msg = Message.init )
526 out (_msg_out)
527 {
528 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
529 assert(msg_out.active);
530 }
531 do
532 {
533 verify (
534 this.fiber.state == this.fiber.State.EXEC,
535 "attempt to suspend an inactive fiber"
536 );
537
538 verify(msg.active != msg.active.exc || msg.exc !is null);
539
540 if (!msg.active)
541 {
542 msg.num = 0;
543 }
544
545 scope (exit)
546 {
547 this.msg = msg;
548
549 debug (MessageFiber)
550 {
551 Stdout.formatln(
552 "--FIBER {} (fiber ptr {}) SUSPENDED (from fiber ptr {}) -- ({}:{})",
553 FirstName(this), cast(void*) this.fiber,
554 cast(void*) Fiber.getThis(), token.str, FirstName(identifier)
555 ).flush();
556 }
557
558 debug ( MessageFiberDump )
559 {
560 this.last = token;
561 this.time = Clock.now().unix().seconds();
562 this.suspender = identifier !is null ? identifier.classinfo.name : "";
563 }
564
565 this.suspend_();
566
567 if ( this.identifier != this.createIdentifier(token.hash, identifier) )
568 {
569 throw this.e_resume.set(__FILE__, __LINE__);
570 }
571 }
572
573 return this.msg;
574 }
575
576 /**************************************************************************
577
578 Suspends the fiber coroutine, makes the resuming start()/resume() call
579 throw e and waits until the fiber is resumed or killed.
580
581 Params:
582 token = token expected to be passed to resume()
583 e = Exception instance to be thrown by the next start()/resume()
584 call.
585
586 Returns:
587 the message passed to the resume() call which made this call resume.
588 Its active member may be exc; for compatibility reasons this method
589 does not throw in this case in contrast to resume().
590
591 Throws:
592 KilledException if the fiber is killed.
593
594 In:
595 e must not be null and the fiber must be running (not waiting or
596 finished).
597
598 **************************************************************************/
599
600 public Message suspend ( Token token, Exception e )
601 {
602 verify(e !is null);
603 return this.suspend(token, null, Message(e));
604 }
605
606 /**************************************************************************
607
608 Resumes the fiber coroutine and waits until it is suspended or
609 terminates.
610
611 Params:
612 token = token expected to have been passed to suspend()
613 identifier = reference to the object causing the resume, use null
614 to not pass anything. Must be the same reference
615 that was used in the suspend call, or else a
616 ResumeException will be thrown inside the fiber.
617 msg = message to be returned by the next suspend() call.
618
619 Returns:
620 The message passed to the suspend() call which made this call
621 resume.
622
623 Throws:
624 if an Exception instance was passed to the suspend() call which made
625 this call be resumed, that Exception instance.
626
627 In:
628 The fiber must be waiting (not running or finished).
629
630 **************************************************************************/
631
632 public Message resume ( Token token, Object identifier = null, Message msg = Message.init )
633 out (_msg_out)
634 {
635 auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
636
637 auto a = msg_out.active;
638 assert (a);
639 assert (a != a.exc);
640 }
641 do
642 {
643 verify(
644 this.fiber.state == this.fiber.State.HOLD,
645 "attempt to resume a non-held fiber"
646 );
647
648 if (!msg.active)
649 {
650 msg.num = 0;
651 }
652
653 this.identifier = this.createIdentifier(token.hash, identifier);
654
655 debug (MessageFiber)
656 {
657 Stdout.formatln(
658 "--FIBER {} (fiber ptr {}) RESUMED (from fiber ptr {}) -- ({}:{})",
659 FirstName(this), cast(void*) this.fiber, cast(void*) Fiber.getThis(),
660 token.str, FirstName(identifier)
661 ).flush();
662 }
663
664 scope (exit) this.msg = msg;
665 this.fiber.call();
666
667 if (this.msg.active == this.msg.active.exc)
668 {
669 throw this.msg.exc;
670 }
671 else
672 {
673 return this.msg;
674 }
675 }
676
677 /**************************************************************************
678
679 Kills the fiber coroutine. That is, resumes it and makes resume() throw
680 a KilledException.
681
682 Param:
683 file = source file (passed to the exception)
684 line = source code line (passed to the exception)
685
686 Returns:
687 When the fiber is suspended by suspend() or finishes.
688
689 In:
690 The fiber must be waiting (not running or finished).
691
692 **************************************************************************/
693
694 public void kill ( istring file = __FILE__, long line = __LINE__ )
695 {
696 verify(
697 this.fiber.state == this.fiber.State.HOLD,
698 "attempt to kill a non-held fiber"
699 );
700 verify(!this.killed);
701
702 debug (MessageFiber)
703 {
704 Stdout.formatln(
705 "--FIBER {} (fiber ptr {}) KILLED (from fiber ptr {}) -- ({}:{})",
706 FirstName(this), cast(void*) this.fiber,
707 cast(void*) Fiber.getThis(), file, line
708 ).flush();
709 }
710
711 this.killed = true;
712 this.e_killed.set(file, line);
713
714 this.fiber.call(Fiber.Rethrow.no);
715 }
716
717 /**************************************************************************
718
719 Returns:
720 true if the fiber is waiting or false otherwise.
721
722 **************************************************************************/
723
724 public bool waiting ( )
725 {
726 return this.fiber.state == this.fiber.State.HOLD;
727 }
728
729 /**************************************************************************
730
731 Returns:
732 true if the fiber is running or false otherwise.
733
734 **************************************************************************/
735
736 public bool running ( )
737 {
738 return this.fiber.state == this.fiber.State.EXEC;
739 }
740
741 /**************************************************************************
742
743 Returns:
744 true if the fiber is finished or false otherwise.
745
746 **************************************************************************/
747
748 public bool finished ( )
749 {
750 return this.fiber.state == this.fiber.State.TERM;
751 }
752
753
754 /**************************************************************************
755
756 Resets the fiber
757
758 **************************************************************************/
759
760 public void reset ( )
761 {
762 this.fiber.reset();
763 }
764
765 /**************************************************************************
766
767 Resets the fiber and change the coroutine
768
769 Params:
770 coroutine = fiber coroutine function
771
772 **************************************************************************/
773
774 public void reset ( void function() coroutine )
775 {
776 this.fiber.reset(coroutine);
777 }
778
779 /**************************************************************************
780
781 Resets the fiber and change the coroutine
782
783 Params:
784 coroutine = fiber coroutine delegate
785
786 **************************************************************************/
787
788 public void reset ( scope void delegate() coroutine )
789 {
790 this.fiber.reset(coroutine);
791 }
792
793
794 /**************************************************************************
795
796 Returns:
797 fiber state
798
799 **************************************************************************/
800
801 public State state ( )
802 {
803 return this.fiber.state;
804 }
805
806 /**************************************************************************
807
808 Suspends the fiber.
809
810 Throws:
811 suspendThrow() exception if pending
812
813 In:
814 The fiber must be running (not waiting or finished).
815
816 **************************************************************************/
817
818 private void suspend_ ( )
819 {
820 verify(
821 this.fiber.state == this.fiber.State.EXEC,
822 "attempt to suspend a non-active fiber"
823 );
824 verify(
825 this.fiber is Fiber.getThis,
826 "attempt to suspend fiber externally"
827 );
828
829 this.fiber.yield();
830
831 if (this.killed)
832 {
833 this.killed = false;
834 throw this.e_killed;
835 }
836 }
837
838 /**************************************************************************
839
840 Direct read-only access to fiber that is used internally by
841 MessageFiber. Most useful for debugging or for sanity checks in
842 contracts.
843
844 Returns:
845 Underlying core.thread.Fiber instance
846
847 **************************************************************************/
848
849 public Fiber getRawFiber ( )
850 {
851 return this.fiber;
852 }
853
854 /**************************************************************************
855
856 Allows to change underlying core.thread.Fiber instance
857
858 Params:
859 fiber = new fiber instance to use
860
861 **************************************************************************/
862
863 public void reset ( Fiber fiber )
864 {
865 this.fiber = fiber;
866 }
867
868 /**************************************************************************
869
870 Generates an identifier from a hash and an object reference.
871
872 Params:
873 hash = hash to generate identifier from
874 obj = object reference to generate identifier from
875
876 Returns:
877 identifier based on provided hash and object reference (the two are
878 XORed together)
879
880 **************************************************************************/
881
882 static private ulong createIdentifier ( hash_t hash, Object obj )
883 {
884 return hash ^ cast(ulong)cast(void*)obj;
885 }
886 }