1 /*******************************************************************************
2 
3     Queue that provides a notification mechanism for when new items were added
4 
5     Generic interfaces and logic for RequestQueues and related classes.
6 
7     Genericly speaking, a request handler delegate is being registered at
8     the queue (ready()). The notifying queue will then call notify() to inform
9     it about a new item added, notify() is expected to call pop() to receive
10     that item. It should keep calling pop() until no items are left
11     and then re-register at the queue and wait for another call to notify().
12     In other words:
13 
14         1. NotifyingQueue.ready(&notify)
15 
16         2. NotifyingQueue.ready calls notify()
17 
18         3. notify() calls NotifyingQueue.pop();
19 
20             * pop() returned a request: notify() processes data, back to 3.
21 
22             * pop() returned null: continue to 4.
23 
24         4. notify() calls NotifyingQueue.ready(&notify)
25 
26     A more simple solution like this was considered:
27 
28         1. NotifyingQueue.ready(&notify)
29 
30         2. NotifyingQueue calls notify(Request)
31 
32         3. notify() processes, back to 1.
33 
34     But was decided against because it would cause a stack overflow for fibers,
35     as a RequestHandler needs to call RequestQueue.ready() and if fibers are
36     involved that call will be issued from within the fiber.
37     If ready() calls notify again another processing of a request in the fiber
38     will happen, causing another call to ready() leading to a recursion.
39 
40     Now we require that the fiber calls pop in a loop.
41 
42     Usage example for a hypothetical client who writes numbers to a socket
43     ---
44         module NotifyingQueueExample;
45 
46         import ocean.util.container.queue.NotifyingQueue;
47 
48         void main ( )
49         {
50             auto notifying_queue = new NotifyingByteQueue(1024 * 40);
51 
52             void notee ( )
53             {
54                 while (true)
55                 {
56                     auto popped = cast(char[]) notifying_queue.pop()
57 
58                     if ( popped !is null )
59                     {
60                         Stdout.formatln("Popped from the queue: {}", popped);
61                     }
62                     else break;
63                 }
64 
65                 notifying_queue.ready(&notee);
66             }
67 
68             notifying_queue.ready(&notee);
69 
70             numbers.sendNumber(23);
71             numbers.sendNumber(85);
72             numbers.sendNumber(42);
73         }
74     ---
75 
76     Copyright:
77         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
78         All rights reserved.
79 
80     License:
81         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
82         Alternatively, this file may be distributed under the terms of the Tango
83         3-Clause BSD License (see LICENSE_BSD.txt for details).
84 
85 *******************************************************************************/
86 
87 module ocean.util.container.queue.NotifyingQueue;
88 
89 
90 import ocean.util.container.queue.FlexibleRingQueue;
91 
92 import ocean.util.container.queue.model.IByteQueue;
93 
94 import ocean.util.container.queue.model.IQueueInfo;
95 
96 import ocean.io.model.ISuspendable;
97 
98 import ocean.util.serialize.contiguous.Contiguous;
99 
100 import ocean.util.serialize.contiguous.Serializer;
101 
102 import ocean.util.serialize.contiguous.Deserializer;
103 
104 import ocean.core.Array;
105 
106 import ocean.transition;
107 
108 import ocean.util.container.AppendBuffer;
109 
110 import ocean.io.serialize.StructSerializer;
111 
112 version ( UnitTest )
113 {
114     import ocean.core.Test;
115 }
116 
117 /*******************************************************************************
118 
119     Request Queue implementation and logic.
120 
121     A concrete client will probably prefer to use the templated version
122 
123 *******************************************************************************/
124 
125 class NotifyingByteQueue : ISuspendable, IQueueInfo
126 {
127     import ocean.core.Verify;
128 
129     /***************************************************************************
130 
131         Type of the delegate used for notifications
132 
133     ***************************************************************************/
134 
135     public alias void delegate ( ) NotificationDg;
136 
137     /***************************************************************************
138 
139         Queue being used
140 
141     ***************************************************************************/
142 
143     private IByteQueue queue;
144 
145     /***************************************************************************
146 
147         Whether the queue is enabled or not. Set/unset by the suspend() /
148         resume() methods. When enabled is false, the queue behaves as if it is
149         empty (no waiting notification delegates will be called).
150 
151     ***************************************************************************/
152 
153     private bool enabled = true;
154 
155     /***************************************************************************
156 
157         Array of delegates waiting for notification of data in queue
158 
159     ***************************************************************************/
160 
161     private AppendBuffer!(NotificationDg) notifiers;
162 
163     /***************************************************************************
164 
165         Constructor
166 
167         Params:
168             max_bytes = size of the queue in bytes
169 
170     ***************************************************************************/
171 
172     public this ( size_t max_bytes )
173     {
174         this(new FlexibleByteRingQueue(max_bytes));
175     }
176 
177 
178     /***************************************************************************
179 
180         Constructor
181 
182         Params:
183             queue = instance of the queue implementation that will be used
184 
185     ***************************************************************************/
186 
187     public this ( IByteQueue queue )
188     {
189         this.queue = queue;
190 
191         this.notifiers = new AppendBuffer!(NotificationDg);
192     }
193 
194 
195     /***************************************************************************
196 
197         Finds out whether the provided number of bytes will fit in the queue.
198         Also considers the need of wrapping.
199 
200         Note that this method internally adds on the extra bytes required for
201         the item header, so it is *not* necessary for the end-user to first
202         calculate the item's push size.
203 
204         Params:
205             bytes = size of item to check
206 
207         Returns:
208             true if the bytes fits, else false
209 
210     ***************************************************************************/
211 
212     public bool willFit ( size_t bytes )
213     {
214         return this.queue.willFit(bytes);
215     }
216 
217 
218 
219     /***************************************************************************
220 
221         Returns:
222             total number of bytes used by queue (used space + free space)
223 
224     ***************************************************************************/
225 
226     public ulong total_space ( )
227     {
228         return this.queue.total_space();
229     }
230 
231 
232     /***************************************************************************
233 
234         Returns:
235             number of bytes stored in queue
236 
237     ***************************************************************************/
238 
239     public ulong used_space ( )
240     {
241         return this.queue.used_space();
242     }
243 
244 
245     /***************************************************************************
246 
247         Returns:
248             number of bytes free in queue
249 
250     ***************************************************************************/
251 
252     public ulong free_space ( )
253     {
254         return this.queue.free_space();
255     }
256 
257 
258     /***************************************************************************
259 
260         Returns:
261             the number of items in the queue
262 
263     ***************************************************************************/
264 
265     public size_t length ( )
266     {
267         return this.queue.length();
268     }
269 
270 
271     /***************************************************************************
272 
273         Tells whether the queue is empty.
274 
275         Returns:
276             true if the queue is empty
277 
278     ***************************************************************************/
279 
280     public bool is_empty ( )
281     {
282         return this.queue.is_empty();
283     }
284 
285 
286     /***************************************************************************
287 
288         register an handler as available
289 
290         Params:
291             notifier = handler that is now available
292 
293         Returns:
294             false if the handler was called right away without
295             even registering
296             true if the handler was just added to the queue
297 
298     ***************************************************************************/
299 
300     public bool ready ( scope NotificationDg notifier )
301     {
302         debug foreach ( waiting_notifier; this.notifiers[] )
303         {
304             verify (waiting_notifier !is notifier,
305                     "RequestQueue.ready: notifier already registered");
306         }
307 
308         if (!this.is_empty() && this.enabled)
309         {
310             notifier();
311             return false;
312         }
313         else
314         {
315             this.notifiers ~= notifier;
316             return true;
317         }
318     }
319 
320     /***************************************************************************
321 
322         Check whether the provided notifier is already registered.
323         This allows the code to avoid calling ready() with the same notifier,
324         which may throw or add duplicate notifiers.
325 
326         Note: This is an O(n) search, however it should not have a
327         performance impact in most cases since the number of registered
328         notifiers is typically very low.
329 
330         Params:
331             notifier = the callback to check for
332 
333         Returns:
334             true if the notifier is registered
335 
336     ***************************************************************************/
337 
338     final public bool isRegistered ( scope NotificationDg notifier )
339     {
340         foreach (wait_notifier; this.notifiers[])
341         {
342             if (notifier is wait_notifier)
343                 return true;
344         }
345 
346         return false;
347     }
348 
349 
350     /***************************************************************************
351 
352         Returns:
353             how many notification delegates are waiting for data
354 
355     ***************************************************************************/
356 
357     final public size_t waiting ( )
358     {
359         return this.notifiers.length;
360     }
361 
362 
363     /***************************************************************************
364 
365         Push an item into the queue and notify the next waiting notification
366         delegate about it.
367 
368         Params:
369           data = array of data that the item consists of
370 
371         Returns:
372           true if push was successful
373           false if not
374 
375    **************************************************************************/
376 
377     public bool push ( in void[] data )
378     {
379         if ( !this.queue.push(data) ) return false;
380 
381         this.notify();
382 
383         return true;
384     }
385 
386 
387     /***************************************************************************
388 
389         Push an item into the queue and notify the next waiting handler about
390         it.
391 
392         Params:
393             size   = size of the item to push
394             filler = delegate that will be called with that item to fill in the
395                      actual data
396 
397         Returns:
398             true if push was successful
399             false if not
400 
401     ***************************************************************************/
402 
403     public bool push ( size_t size, scope void delegate ( void[] ) filler )
404     {
405         auto target = this.queue.push(size);
406 
407         if (target is null) return false;
408 
409         filler(target);
410 
411         this.notify();
412 
413         return true;
414     }
415 
416 
417     /***************************************************************************
418 
419         suspend consuming of the queue
420 
421     ***************************************************************************/
422 
423     public void suspend ( )
424     {
425         if (this.enabled == false)
426         {
427             return;
428         }
429 
430         this.enabled = false;
431     }
432 
433 
434     /***************************************************************************
435 
436         Returns true if the queue is suspended, else false
437 
438     ***************************************************************************/
439 
440     public bool suspended ( )
441     {
442         return this.enabled == false;
443     }
444 
445 
446     /***************************************************************************
447 
448         resume consuming of the queue
449 
450     ***************************************************************************/
451 
452     public void resume ( )
453     {
454         if (this.enabled == true)
455         {
456             return;
457         }
458 
459         this.enabled = true;
460 
461         foreach (notifier; this.notifiers[])
462         {
463             this.notify();
464         }
465     }
466 
467 
468     /***************************************************************************
469 
470         pops an element if the queue is enabled
471 
472     ***************************************************************************/
473 
474     public void[] pop ( )
475     {
476         if ( !this.enabled )
477         {
478             return null;
479         }
480 
481         return this.queue.pop();
482     }
483 
484 
485     /***************************************************************************
486 
487         Calls the next waiting notification delegate, if queue is enabled.
488 
489     ***************************************************************************/
490 
491     private void notify ( )
492     {
493         if ( this.notifiers.length > 0 && this.enabled )
494         {
495             auto dg = notifiers.cut();
496 
497             dg();
498         }
499     }
500 }
501 
502 
503 /*******************************************************************************
504 
505     Templated Notifying Queue implementation
506 
507     A concrete client should have an instance of this class and use it
508     to manage the connections and requests
509 
510     Note: the stored type T is automatically de/serialized using the
511     StructSerializer. This performs a deep serialization of sub-structs and
512     array members. Union members are shallowly serialized. Delegate and class
513     members cannot be serialized.
514 
515     Params:
516         T = type that the queue should store. If it's a struct it is stored
517             using the struct serializer, else it is storing it directly. Note
518             that by default the memory is not gc-aware so you reference
519             something from only the stored object, the gc could collect it. If
520             you desire different behavior pass your own queue instance to the
521             constructor
522 
523 *******************************************************************************/
524 
525 class NotifyingQueue ( T ) : NotifyingByteQueue
526 {
527     // add to overload set
528     alias NotifyingByteQueue.push push;
529 
530     /***************************************************************************
531 
532         Constructor
533 
534         Params:
535             max_bytes = size of the queue in bytes
536 
537     ***************************************************************************/
538 
539     public this ( size_t max_bytes )
540     {
541         super(max_bytes);
542     }
543 
544 
545     /***************************************************************************
546 
547         Constructor
548 
549         Params:
550             queue = instance of the queue implementation that will be used
551 
552     ***************************************************************************/
553 
554     public this ( IByteQueue queue )
555     {
556         super(queue);
557     }
558 
559 
560     /***************************************************************************
561 
562         Push a new request on the queue
563 
564         Params:
565             request = The request to push
566 
567         Returns:
568             true if push was successful
569             false if not
570 
571     ***************************************************************************/
572 
573     bool push ( ref T request )
574     {
575         static if ( is(T == struct) )
576             auto length = Serializer.countRequiredSize(request);
577         else
578             auto length = request.sizeof;
579 
580         void filler ( void[] target )
581         {
582             static if ( is(T == struct) )
583                 Serializer.serialize(request, target);
584             else
585                 target[] = (&request)[0..1];
586         }
587 
588         return super.push(length, &filler);
589     }
590 
591     static if ( is(T == struct) )
592     {
593         /***********************************************************************
594 
595             Pops a Request instance from the queue
596 
597             Params:
598                 cont_buffer = contiguous buffer to deserialize to
599 
600             Returns:
601                 pointer to the deserialized struct, completely allocated in the
602                 given buffer
603 
604         ***********************************************************************/
605 
606         T* pop ( ref Contiguous!(T) cont_buffer )
607         {
608             if ( !this.enabled ) return null;
609 
610             T* instance;
611 
612             auto data = super.pop();
613 
614             if (data is null)
615             {
616                 return null;
617             }
618 
619             Const!(void[]) void_buffer = data;
620 
621             Deserializer.deserialize(void_buffer, cont_buffer);
622 
623             return cont_buffer.ptr;
624         }
625     }
626     else
627     {
628         /***********************************************************************
629 
630             Pops a Request instance from the queue
631 
632             Params:
633                 buffer = deserialisation buffer to use
634 
635             Returns:
636                 pointer to the deserialized item, completely allocated in the
637                 given buffer
638 
639         ***********************************************************************/
640 
641         T* pop ( ref void[] buffer )
642         {
643             if ( !this.enabled ) return null;
644 
645             T* instance;
646 
647             auto data = super.pop();
648 
649             if (data is null)
650             {
651                 return null;
652             }
653 
654             buffer.copy(data);
655 
656             return cast(T*)buffer.ptr;
657         }
658     }
659 }
660 
661 unittest
662 {
663     void dg ( ) { }
664 
665     auto queue = new NotifyingByteQueue(1024);
666     test(!queue.isRegistered(&dg));
667 
668     queue.ready(&dg);
669     test(queue.isRegistered(&dg));
670 }
671 
672 /// NotifyingQueue with a non-struct type
673 unittest
674 {
675     auto queue = new NotifyingQueue!(char[])(1024);
676 
677     char[][] arr = ["foo".dup, "bar".dup];
678 
679     queue.push(arr[0]);
680     queue.push(arr[1]);
681 
682     void[] buffer_1;
683 
684     auto str_0 = queue.pop(buffer_1);
685 
686     test!("==")(*str_0, "foo");
687 
688     void[] buffer_2;
689 
690     auto str_1 = queue.pop(buffer_2);
691 
692     test!("==")(*str_0, "foo");  // ensure there was no overwrite
693     test!("==")(*str_1, "bar");
694 }
695 
696 /// NotifyingQueue with a struct
697 unittest
698 {
699     struct S { char[] value; }
700 
701     S[2] arr = [S("foo".dup), S("bar".dup)];
702 
703     auto queue = new NotifyingQueue!(S)(1024);
704 
705     queue.push(arr[0]);
706     queue.push(arr[1]);
707 
708     Contiguous!(S) ctg_1;
709 
710     auto s0 = queue.pop(ctg_1);
711 
712     test!("==")(s0.value, "foo");
713 
714     Contiguous!(S) ctg_2;
715 
716     auto s1 = queue.pop(ctg_2);
717 
718     test!("==")(s0.value, "foo");  // ensure there was no overwrite
719     test!("==")(s1.value, "bar");
720 }
721 
722 // Make sure NotifyingQueue template is instantinated & compiled
723 unittest
724 {
725     struct Dummy
726     {
727         int a;
728         int b;
729         char[] c;
730     }
731 
732     void dg ( ) { }
733 
734     auto queue = new NotifyingQueue!(Dummy)(1024);
735     test(!queue.isRegistered(&dg));
736 
737     queue.ready(&dg);
738     test(queue.isRegistered(&dg));
739 }
740 
741 
742 unittest
743 {
744     auto q = new NotifyingQueue!(char)(256);
745 }
746