1 /******************************************************************************
2 
3     Wraps a Fiber allowing to pass a message on suspending/resuming and to kill
4     the fiber.
5 
6     Allows passing a message from suspend() to resume() and vice versa.
7     Provides a kill() method where kill() resumes suspend() and suspend() throws
8     a KilledException when it was resumed by kill().
9 
10     suspend() and resume() require you to pass a 'token' parameter to them
11     which must be the same for each suspend/resume pair. This prevents that
12     a fiber is resumed from a part of the code that wasn't intended to do so.
13 
14     Still, sometimes the correct position in a code could resume a fiber
15     that was waiting for a resume from another instance of the same code
16     (for example, a fiber is being resumed from a wrong class instance).
17     To catch these cases, a runtime-identifier parameter was added,
18     which is just an Object reference. If another object is resuming a fiber
19     an exception is thrown.
20 
21     See also the documentation of suspend/resume.
22 
23     Debugging:
24     You can use -debug=MessageFiber to print the identifiers that
25     were used in the suspend/resume calls. It uses the FirstNames
26     functions to print pointers as names.
27 
28     You can use -debug=MessageFiberDump to enable a function called
29     'dumpFibers' which can be called from within gdb using
30     'call dumpFibers()'. The performance impact should be relatively low.
31     It will output a list on STDERR listing all fibers and some
32     informations about their state.
33 
34     Example output:
35 
36       Tomsen:   State: TERM; Token: GroupRequest; LastSuspend: 1364929361 (157s ago); Addr: 7ff6c9ec8f00; Suspender: Actor0
37       Marine:   State: TERM; Token:     io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9eef100; Suspender: Actor1
38       Robert:   State: TERM; Token:     io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94a00; Suspender: Actor1
39       Batman:   State: HOLD; Token:     io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9f94300; Suspender: Actor1
40       Mary:     State: TERM; Token:  event_fired; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7c00; Suspender: Actor3
41       Methew:   State: HOLD; Token:     io_ready; LastSuspend: 1364929357 (161s ago); Addr: 7ff6c9fc7500; Suspender: Actor1
42       Superman: State: HOLD; Token:  DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40800; Suspender: Actor2
43       Methew:   State: HOLD; Token:  DrizzleData; LastSuspend: 1364929515 (3s ago); Addr: 7ff6cad40600; Suspender: Actor2
44 
45 
46     Copyright:
47         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
48         All rights reserved.
49 
50     License:
51         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
52         Alternatively, this file may be distributed under the terms of the Tango
53         3-Clause BSD License (see LICENSE_BSD.txt for details).
54 
55  ******************************************************************************/
56 
57 module ocean.core.MessageFiber;
58 
59 import ocean.meta.types.Qualifiers;
60 import ocean.core.Verify;
61 import ocean.core.Array: copy;
62 import ocean.core.SmartUnion;
63 import ocean.io.digest.Fnv1;
64 import core.thread : Fiber;
65 
66 debug ( MessageFiber )
67 {
68     import ocean.io.Stdout;
69     debug = MessageFiberToken;
70 }
71 
72 debug ( MessageFiberDump )
73 {
74     import ocean.time.Clock;
75     import ocean.core.Array;
76     import core.memory;
77     import ocean.io.Stdout;
78     debug = MessageFiberToken;
79 }
80 
81 debug ( MessageFiberToken )
82 {
83     import ocean.io.digest.FirstName;
84 }
85 
86 
87 /******************************************************************************
88 
89     Dump information about fibers to STDERR
90 
91  ******************************************************************************/
92 
93 debug ( MessageFiberDump ) extern(C) void dumpFibers()
94 {
95     istring[] state_str = [ "HOLD" ,"EXEC" ,"TERM" ];
96 
97     void* wpFiber = MessageFiber.last_fiber;
98 
99     while ( wpFiber !is null )
100     {
101         auto fiber = cast(MessageFiber) GC.weakPointerGet(wpFiber);
102 
103         wpFiber = null;
104 
105         if ( fiber !is null )
106         {
107             Stderr.formatln("{,12}: State: {}; Token: {,12}; LastSuspend: {} ({}s ago); "
108                 ~ "Addr: {}; Suspender: {}",
109                 FirstName(fiber), state_str[fiber.fiber.state],
110                 fiber.last.str, fiber.time,
111                 fiber.time > 0 ? Clock.now().unix().seconds() - fiber.time : 0,
112                 cast(void*) fiber,
113                 fiber.suspender);
114 
115             wpFiber= fiber.prev_fiber;
116         }
117     }
118 }
119 
120 /******************************************************************************/
121 
122 class MessageFiber
123 {
124     debug ( MessageFiberDump )
125     {
126         /**********************************************************************
127 
128             Most recent token
129 
130         **********************************************************************/
131 
132         private Token last;
133 
134         /**********************************************************************
135 
136             Last time suspend was called
137 
138         **********************************************************************/
139 
140         private size_t time;
141 
142         /**********************************************************************
143 
144             Name of the suspender, taken from the identifier parameter
145             Empty string if identifier wasn't given.
146 
147         **********************************************************************/
148 
149         private istring suspender;
150 
151         /**********************************************************************
152 
153             List of weakpointers to all fibers
154 
155         **********************************************************************/
156 
157         static private void* last_fiber;
158 
159         private void* next_fiber, prev_fiber;
160     }
161 
162     /**************************************************************************
163 
164         Token struct passed to suspend() and resume() methods in order to ensure
165         that the fiber is resumed for the same reason as it was suspended. The
166         token is constructed from a string, via the static opCall() method.
167 
168         Usage example:
169 
170         ---
171 
172             auto token = MessageFiber.Token("something_happened");
173 
174             fiber.suspend(token);
175 
176             // Later on, somewhere else
177             fiber.resume(token);
178 
179         ---
180 
181      **************************************************************************/
182 
183     public struct Token
184     {
185         /***********************************************************************
186 
187             String used to construct the token. In debug builds this is used for
188             nice message output.
189 
190         ***********************************************************************/
191 
192         private debug (MessageFiberToken) istring str;
193 
194         /***********************************************************************
195 
196             Hash of string used to construct the token.
197 
198         ***********************************************************************/
199 
200         private hash_t hash;
201 
202         /***********************************************************************
203 
204             Constructs a token.
205 
206             Params:
207                 s = string
208 
209             Returns:
210                 token constructed from the passed string
211 
212         ***********************************************************************/
213 
214         public static Token opCall ( istring s )
215         {
216             Token token;
217             token.hash = Fnv1a64(s);
218             debug (MessageFiberToken) token.str = s;
219             return token;
220         }
221     }
222 
223     /**************************************************************************
224 
225         Message union
226 
227      **************************************************************************/
228 
229     private union Message_
230     {
231         int       num;
232         void*     ptr;
233         Object    obj;
234         Exception exc;
235     }
236 
237     public alias SmartUnion!(Message_) Message;
238 
239     /**************************************************************************
240 
241         Alias for fiber state
242 
243      **************************************************************************/
244 
245     public alias Fiber.State State;
246 
247     /**************************************************************************
248 
249         KilledException; thrown by suspend() when resumed by kill()
250 
251      **************************************************************************/
252 
253     static class KilledException : Exception
254     {
255         this ( )  {super("Fiber killed");}
256 
257         void set ( istring file, long line )
258         {
259             super.file = file;
260             super.line = line;
261         }
262     }
263 
264     /**************************************************************************
265 
266         ResumedException; thrown by suspend() when resumed with the wrong
267         identifier
268 
269      **************************************************************************/
270 
271     static class ResumeException : Exception
272     {
273         this ( )  {super("Resumed with invalid identifier!");}
274 
275         ResumeException set ( istring file, long line )
276         {
277             super.file = file;
278             super.line = line;
279 
280             return this;
281         }
282     }
283 
284     /**************************************************************************
285 
286         Fiber instance. (Protected but can be accessed via the public rawFiber()
287         and reset() methods. The fiber is not simply made public, as derived
288         classes may need to add special behaviour upon getting/setting it.)
289 
290      **************************************************************************/
291 
292     protected Fiber     fiber;
293 
294     /**************************************************************************
295 
296         Identifier
297 
298      **************************************************************************/
299 
300     private ulong           identifier;
301 
302     /**************************************************************************
303 
304         Message passed between suspend() and resume()
305 
306      **************************************************************************/
307 
308     private Message         msg;
309 
310     /**************************************************************************
311 
312         KilledException instance
313 
314      **************************************************************************/
315 
316     private KilledException e_killed;
317 
318     /**************************************************************************
319 
320         ResumeException instance
321 
322      **************************************************************************/
323 
324     private ResumeException e_resume;
325 
326     /**************************************************************************
327 
328         "killed" flag, set by kill() and cleared by resume().
329 
330      **************************************************************************/
331 
332     private bool killed = false;
333 
334     /**************************************************************************
335 
336         Constructor
337 
338         Params:
339             fiber = already created core.thread.Fiber
340 
341      **************************************************************************/
342 
343     public this ( Fiber fiber )
344     {
345         this.fiber = fiber;
346 
347         this.e_killed = new KilledException;
348         this.e_resume = new ResumeException;
349         this.msg.num = 0;
350 
351         debug(MessageFiberDump) addToList();
352 
353         debug (MessageFiber) Stdout.formatln("--FIBER {} CREATED (fiber ptr {}) --",
354             FirstName(this), cast(void*) this.fiber).flush();
355     }
356 
357     /**************************************************************************
358 
359         Constructor
360 
361         Params:
362             coroutine = fiber coroutine
363 
364      **************************************************************************/
365 
366     public this ( scope void delegate ( ) coroutine )
367     {
368         this(new Fiber(coroutine));
369     }
370 
371     /**************************************************************************
372 
373         Constructor
374 
375         Params:
376             coroutine = fiber coroutine
377             sz        = fiber stack size
378 
379      **************************************************************************/
380 
381     public this ( scope void delegate ( ) coroutine, size_t sz )
382     {
383         this(new Fiber(coroutine, sz));
384     }
385 
386     /**************************************************************************
387 
388         Destructor
389 
390         Removes the Fiber from the linked list and destroys its weak pointer
391 
392      **************************************************************************/
393 
394     debug(MessageFiberDump) ~this ( )
395     {
396         auto next = cast(MessageFiber) GC.weakPointerGet(next_fiber);
397         auto prev = cast(MessageFiber) GC.weakPointerGet(prev_fiber);
398         void* me;
399 
400         if ( next !is null )
401         {
402             if ( MessageFiber.last_fiber == next.prev_fiber )
403             {
404                 MessageFiber.last_fiber = next_fiber;
405             }
406 
407             me = next.prev_fiber;
408             next.prev_fiber = prev_fiber;
409         }
410 
411         if ( prev !is null )
412         {
413             me = prev.next_fiber;
414             prev.next_fiber = next_fiber;
415         }
416 
417         GC.weakPointerDestroy(me);
418     }
419 
420     /**************************************************************************
421 
422         Adds this fiber to the linked list of Fibers
423 
424      **************************************************************************/
425 
426     debug(MessageFiberDump) void addToList ( )
427     {
428         auto me = GC.weakPointerCreate(this);
429 
430         if ( last_fiber !is null )
431         {
432             auto l = cast(MessageFiber) GC.weakPointerGet(MessageFiber.last_fiber);
433 
434             assert ( l !is null );
435 
436             l.next_fiber = me;
437             prev_fiber = last_fiber;
438         }
439 
440         last_fiber = me;
441     }
442 
443     /**************************************************************************
444 
445         Starts the fiber coroutine and waits until it is suspended or finishes.
446 
447         Params:
448             msg = message to be returned by the next suspend() call.
449 
450         Returns:
451             When the fiber is suspended, the message passed to that suspend()
452             call. It has always an active member, by default num but never exc.
453 
454         Throws:
455             Exception if the fiber is suspended by suspendThrow().
456 
457         In:
458             The fiber must not be running (but waiting or finished).
459 
460      **************************************************************************/
461 
462     public Message start ( Message msg = Message.init )
463     out (_msg_out)
464     {
465         auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
466 
467         auto a = msg_out.active;
468         assert (a);
469         assert (a != a.exc);
470     }
471     do
472     {
473         verify(
474             this.fiber.state != this.fiber.State.EXEC,
475             "attempt to start an active fiber"
476         );
477 
478         debug (MessageFiber)
479         {
480             Stdout.formatln(
481                 "--FIBER {} (fiber ptr {}) STARTED (from fiber ptr {})",
482                 FirstName(this), cast(void*) this.fiber,
483                 cast(void*) Fiber.getThis()
484             ).flush();
485         }
486 
487         if (this.fiber.state == this.fiber.State.TERM)
488         {
489             this.fiber.reset();
490             this.msg.num = 0;
491         }
492 
493         Token token;
494         return this.resume(token, null, msg);
495     }
496 
497     /**************************************************************************
498 
499         Suspends the fiber coroutine and waits until it is resumed or killed. If
500         the active member of msg is msg.exc, exc will be thrown by the resuming
501         start()/resume() call.
502 
503         Params:
504             token = token expected to be passed to resume()
505             identifier = reference to the object causing the suspend, use null
506                          to not pass anything. The caller to resume must
507                          pass the same object reference or else a ResumeException
508                          will be thrown inside the fiber
509             msg = message to be returned by the next start()/resume() call.
510 
511         Returns:
512             the message passed to the resume() call which made this call resume.
513             Its active member may be exc; for compatibility reasons this method
514             does not throw in this case in contrast to resume().
515 
516         Throws:
517             KilledException if the fiber is killed.
518 
519         In:
520             The fiber must be running (not waiting or finished).
521             If the active member of msg is msg.exc, it must not be null.
522 
523      **************************************************************************/
524 
525     public Message suspend ( Token token, Object identifier = null, Message msg = Message.init )
526     out (_msg_out)
527     {
528         auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
529         assert(msg_out.active);
530     }
531     do
532     {
533         verify (
534             this.fiber.state == this.fiber.State.EXEC,
535             "attempt to suspend an inactive fiber"
536         );
537 
538         verify(msg.active != msg.active.exc || msg.exc !is null);
539 
540         if (!msg.active)
541         {
542             msg.num = 0;
543         }
544 
545         scope (exit)
546         {
547             this.msg = msg;
548 
549             debug (MessageFiber)
550             {
551                 Stdout.formatln(
552                     "--FIBER {} (fiber ptr {}) SUSPENDED (from fiber ptr {}) -- ({}:{})",
553                     FirstName(this), cast(void*) this.fiber,
554                     cast(void*) Fiber.getThis(), token.str, FirstName(identifier)
555                 ).flush();
556             }
557 
558             debug ( MessageFiberDump )
559             {
560                 this.last = token;
561                 this.time = Clock.now().unix().seconds();
562                 this.suspender = identifier !is null ? identifier.classinfo.name : "";
563             }
564 
565             this.suspend_();
566 
567             if ( this.identifier != this.createIdentifier(token.hash, identifier) )
568             {
569                 throw this.e_resume.set(__FILE__, __LINE__);
570             }
571         }
572 
573         return this.msg;
574     }
575 
576     /**************************************************************************
577 
578         Suspends the fiber coroutine, makes the resuming start()/resume() call
579         throw e and waits until the fiber is resumed or killed.
580 
581         Params:
582             token = token expected to be passed to resume()
583             e = Exception instance to be thrown by the next start()/resume()
584             call.
585 
586         Returns:
587             the message passed to the resume() call which made this call resume.
588             Its active member may be exc; for compatibility reasons this method
589             does not throw in this case in contrast to resume().
590 
591         Throws:
592             KilledException if the fiber is killed.
593 
594         In:
595             e must not be null and the fiber must be running (not waiting or
596             finished).
597 
598      **************************************************************************/
599 
600     public Message suspend ( Token token, Exception e )
601     {
602         verify(e !is null);
603         return this.suspend(token, null, Message(e));
604     }
605 
606     /**************************************************************************
607 
608         Resumes the fiber coroutine and waits until it is suspended or
609         terminates.
610 
611         Params:
612             token = token expected to have been passed to suspend()
613             identifier = reference to the object causing the resume, use null
614                          to not pass anything. Must be the same reference
615                          that was used in the suspend call, or else a
616                          ResumeException will be thrown inside the fiber.
617             msg = message to be returned by the next suspend() call.
618 
619         Returns:
620             The message passed to the suspend() call which made this call
621             resume.
622 
623         Throws:
624             if an Exception instance was passed to the suspend() call which made
625             this call be resumed, that Exception instance.
626 
627         In:
628             The fiber must be waiting (not running or finished).
629 
630      **************************************************************************/
631 
632     public Message resume ( Token token, Object identifier = null, Message msg = Message.init )
633     out (_msg_out)
634     {
635         auto msg_out = cast(Unqual!(typeof(_msg_out))) _msg_out;
636 
637         auto a = msg_out.active;
638         assert (a);
639         assert (a != a.exc);
640     }
641     do
642     {
643         verify(
644             this.fiber.state == this.fiber.State.HOLD,
645             "attempt to resume a non-held fiber"
646         );
647 
648         if (!msg.active)
649         {
650             msg.num = 0;
651         }
652 
653         this.identifier = this.createIdentifier(token.hash, identifier);
654 
655         debug (MessageFiber)
656         {
657             Stdout.formatln(
658                 "--FIBER {} (fiber ptr {}) RESUMED (from fiber ptr {}) -- ({}:{})",
659                 FirstName(this), cast(void*) this.fiber, cast(void*) Fiber.getThis(),
660                 token.str, FirstName(identifier)
661             ).flush();
662         }
663 
664         scope (exit) this.msg = msg;
665         this.fiber.call();
666 
667         if (this.msg.active == this.msg.active.exc)
668         {
669             throw this.msg.exc;
670         }
671         else
672         {
673             return this.msg;
674         }
675     }
676 
677     /**************************************************************************
678 
679         Kills the fiber coroutine. That is, resumes it and makes resume() throw
680         a KilledException.
681 
682         Param:
683             file = source file (passed to the exception)
684             line = source code line (passed to the exception)
685 
686         Returns:
687             When the fiber is suspended by suspend() or finishes.
688 
689         In:
690             The fiber must be waiting (not running or finished).
691 
692      **************************************************************************/
693 
694     public void kill ( istring file = __FILE__, long line = __LINE__ )
695     {
696         verify(
697             this.fiber.state == this.fiber.State.HOLD,
698             "attempt to kill a non-held fiber"
699         );
700         verify(!this.killed);
701 
702         debug (MessageFiber)
703         {
704             Stdout.formatln(
705                 "--FIBER {} (fiber ptr {}) KILLED (from fiber ptr {}) -- ({}:{})",
706                 FirstName(this), cast(void*) this.fiber,
707                 cast(void*) Fiber.getThis(), file, line
708             ).flush();
709         }
710 
711         this.killed = true;
712         this.e_killed.set(file, line);
713 
714         this.fiber.call(Fiber.Rethrow.no);
715     }
716 
717     /**************************************************************************
718 
719         Returns:
720             true if the fiber is waiting or false otherwise.
721 
722      **************************************************************************/
723 
724     public bool waiting ( )
725     {
726         return this.fiber.state == this.fiber.State.HOLD;
727     }
728 
729     /**************************************************************************
730 
731         Returns:
732             true if the fiber is running or false otherwise.
733 
734      **************************************************************************/
735 
736     public bool running ( )
737     {
738         return this.fiber.state == this.fiber.State.EXEC;
739     }
740 
741     /**************************************************************************
742 
743         Returns:
744             true if the fiber is finished or false otherwise.
745 
746      **************************************************************************/
747 
748     public bool finished ( )
749     {
750         return this.fiber.state == this.fiber.State.TERM;
751     }
752 
753 
754     /**************************************************************************
755 
756         Resets the fiber
757 
758      **************************************************************************/
759 
760     public void reset ( )
761     {
762         this.fiber.reset();
763     }
764 
765     /**************************************************************************
766 
767         Resets the fiber and change the coroutine
768 
769         Params:
770             coroutine = fiber coroutine function
771 
772      **************************************************************************/
773 
774     public void reset ( void function() coroutine )
775     {
776         this.fiber.reset(coroutine);
777     }
778 
779     /**************************************************************************
780 
781         Resets the fiber and change the coroutine
782 
783         Params:
784             coroutine = fiber coroutine delegate
785 
786      **************************************************************************/
787 
788     public void reset ( scope void delegate() coroutine )
789     {
790         this.fiber.reset(coroutine);
791     }
792 
793 
794     /**************************************************************************
795 
796         Returns:
797             fiber state
798 
799      **************************************************************************/
800 
801     public State state ( )
802     {
803         return this.fiber.state;
804     }
805 
806     /**************************************************************************
807 
808         Suspends the fiber.
809 
810         Throws:
811             suspendThrow() exception if pending
812 
813         In:
814             The fiber must be running (not waiting or finished).
815 
816      **************************************************************************/
817 
818     private void suspend_ ( )
819     {
820         verify(
821             this.fiber.state == this.fiber.State.EXEC,
822             "attempt to suspend a non-active fiber"
823         );
824         verify(
825             this.fiber is Fiber.getThis,
826             "attempt to suspend fiber externally"
827         );
828 
829         this.fiber.yield();
830 
831         if (this.killed)
832         {
833             this.killed = false;
834             throw this.e_killed;
835         }
836     }
837 
838     /**************************************************************************
839 
840         Direct read-only access to fiber that is used internally by
841         MessageFiber. Most useful for debugging or for sanity checks in
842         contracts.
843 
844         Returns:
845             Underlying core.thread.Fiber instance
846 
847     **************************************************************************/
848 
849     public Fiber getRawFiber ( )
850     {
851         return this.fiber;
852     }
853 
854     /**************************************************************************
855 
856         Allows to change underlying core.thread.Fiber instance
857 
858         Params:
859             fiber = new fiber instance to use
860 
861     **************************************************************************/
862 
863     public void reset ( Fiber fiber )
864     {
865         this.fiber = fiber;
866     }
867 
868     /**************************************************************************
869 
870         Generates an identifier from a hash and an object reference.
871 
872         Params:
873             hash = hash to generate identifier from
874             obj = object reference to generate identifier from
875 
876         Returns:
877             identifier based on provided hash and object reference (the two are
878             XORed together)
879 
880      **************************************************************************/
881 
882     static private ulong createIdentifier ( hash_t hash, Object obj )
883     {
884         return hash ^ cast(ulong)cast(void*)obj;
885     }
886 }