1 /*******************************************************************************
2 
3     Queue that provides a notification mechanism for when new items were added
4 
5     Generic interfaces and logic for RequestQueues and related classes.
6 
7     Genericly speaking, a request handler delegate is being registered at
8     the queue (ready()). The notifying queue will then call notify() to inform
9     it about a new item added, notify() is expected to call pop() to receive
10     that item. It should keep calling pop() until no items are left
11     and then re-register at the queue and wait for another call to notify().
12     In other words:
13 
14         1. NotifyingQueue.ready(&notify)
15 
16         2. NotifyingQueue.ready calls notify()
17 
18         3. notify() calls NotifyingQueue.pop();
19 
20             * pop() returned a request: notify() processes data, back to 3.
21 
22             * pop() returned null: continue to 4.
23 
24         4. notify() calls NotifyingQueue.ready(&notify)
25 
26     A more simple solution like this was considered:
27 
28         1. NotifyingQueue.ready(&notify)
29 
30         2. NotifyingQueue calls notify(Request)
31 
32         3. notify() processes, back to 1.
33 
34     But was decided against because it would cause a stack overflow for fibers,
35     as a RequestHandler needs to call RequestQueue.ready() and if fibers are
36     involved that call will be issued from within the fiber.
37     If ready() calls notify again another processing of a request in the fiber
38     will happen, causing another call to ready() leading to a recursion.
39 
40     Now we require that the fiber calls pop in a loop.
41 
42     Usage example for a hypothetical client who writes numbers to a socket
43     ---
44     // Workaround: https://github.com/dlang/dub/issues/1761
45     module_ NotifyingQueueExample;
46 
47     import ocean.util.container.queue.NotifyingQueue;
48 
49     void main ( )
50     {
51         auto notifying_queue = new NotifyingByteQueue(1024 * 40);
52 
53         void notee ( )
54         {
55             while (true)
56             {
57                 auto popped = cast(char[]) notifying_queue.pop()
58 
59                 if ( popped !is null )
60                 {
61                     Stdout.formatln("Popped from the queue: {}", popped);
62                 }
63                 else break;
64             }
65 
66             notifying_queue.ready(&notee);
67         }
68 
69         notifying_queue.ready(&notee);
70 
71         numbers.sendNumber(23);
72         numbers.sendNumber(85);
73         numbers.sendNumber(42);
74     }
75     ---
76 
77     Copyright:
78         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
79         All rights reserved.
80 
81     License:
82         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
83         Alternatively, this file may be distributed under the terms of the Tango
84         3-Clause BSD License (see LICENSE_BSD.txt for details).
85 
86 *******************************************************************************/
87 
88 module ocean.util.container.queue.NotifyingQueue;
89 
90 
91 import ocean.util.container.queue.FlexibleRingQueue;
92 
93 import ocean.util.container.queue.model.IByteQueue;
94 
95 import ocean.util.container.queue.model.IQueueInfo;
96 
97 import ocean.io.model.ISuspendable;
98 
99 import ocean.util.serialize.contiguous.Contiguous;
100 
101 import ocean.util.serialize.contiguous.Serializer;
102 
103 import ocean.util.serialize.contiguous.Deserializer;
104 
105 import ocean.core.Array;
106 
107 import ocean.meta.types.Qualifiers;
108 
109 import ocean.util.container.AppendBuffer;
110 
111 import ocean.io.serialize.StructSerializer;
112 
113 version (unittest)
114 {
115     import ocean.core.Test;
116 }
117 
118 /*******************************************************************************
119 
120     Request Queue implementation and logic.
121 
122     A concrete client will probably prefer to use the templated version
123 
124 *******************************************************************************/
125 
126 class NotifyingByteQueue : ISuspendable, IQueueInfo
127 {
128     import ocean.core.Verify;
129 
130     /***************************************************************************
131 
132         Type of the delegate used for notifications
133 
134     ***************************************************************************/
135 
136     public alias void delegate ( ) NotificationDg;
137 
138     /***************************************************************************
139 
140         Queue being used
141 
142     ***************************************************************************/
143 
144     private IByteQueue queue;
145 
146     /***************************************************************************
147 
148         Whether the queue is enabled or not. Set/unset by the suspend() /
149         resume() methods. When enabled is false, the queue behaves as if it is
150         empty (no waiting notification delegates will be called).
151 
152     ***************************************************************************/
153 
154     private bool enabled = true;
155 
156     /***************************************************************************
157 
158         Array of delegates waiting for notification of data in queue
159 
160     ***************************************************************************/
161 
162     private AppendBuffer!(NotificationDg) notifiers;
163 
164     /***************************************************************************
165 
166         Constructor
167 
168         Params:
169             max_bytes = size of the queue in bytes
170 
171     ***************************************************************************/
172 
173     public this ( size_t max_bytes )
174     {
175         this(new FlexibleByteRingQueue(max_bytes));
176     }
177 
178 
179     /***************************************************************************
180 
181         Constructor
182 
183         Params:
184             queue = instance of the queue implementation that will be used
185 
186     ***************************************************************************/
187 
188     public this ( IByteQueue queue )
189     {
190         this.queue = queue;
191 
192         this.notifiers = new AppendBuffer!(NotificationDg);
193     }
194 
195 
196     /***************************************************************************
197 
198         Finds out whether the provided number of bytes will fit in the queue.
199         Also considers the need of wrapping.
200 
201         Note that this method internally adds on the extra bytes required for
202         the item header, so it is *not* necessary for the end-user to first
203         calculate the item's push size.
204 
205         Params:
206             bytes = size of item to check
207 
208         Returns:
209             true if the bytes fits, else false
210 
211     ***************************************************************************/
212 
213     public bool willFit ( size_t bytes )
214     {
215         return this.queue.willFit(bytes);
216     }
217 
218 
219 
220     /***************************************************************************
221 
222         Returns:
223             total number of bytes used by queue (used space + free space)
224 
225     ***************************************************************************/
226 
227     public ulong total_space ( )
228     {
229         return this.queue.total_space();
230     }
231 
232 
233     /***************************************************************************
234 
235         Returns:
236             number of bytes stored in queue
237 
238     ***************************************************************************/
239 
240     public ulong used_space ( )
241     {
242         return this.queue.used_space();
243     }
244 
245 
246     /***************************************************************************
247 
248         Returns:
249             number of bytes free in queue
250 
251     ***************************************************************************/
252 
253     public ulong free_space ( )
254     {
255         return this.queue.free_space();
256     }
257 
258 
259     /***************************************************************************
260 
261         Returns:
262             the number of items in the queue
263 
264     ***************************************************************************/
265 
266     public size_t length ( )
267     {
268         return this.queue.length();
269     }
270 
271 
272     /***************************************************************************
273 
274         Tells whether the queue is empty.
275 
276         Returns:
277             true if the queue is empty
278 
279     ***************************************************************************/
280 
281     public bool is_empty ( )
282     {
283         return this.queue.is_empty();
284     }
285 
286 
287     /***************************************************************************
288 
289         register an handler as available
290 
291         Params:
292             notifier = handler that is now available
293 
294         Returns:
295             false if the handler was called right away without
296             even registering
297             true if the handler was just added to the queue
298 
299     ***************************************************************************/
300 
301     public bool ready ( scope NotificationDg notifier )
302     {
303         debug foreach ( waiting_notifier; this.notifiers[] )
304         {
305             verify (waiting_notifier !is notifier,
306                     "RequestQueue.ready: notifier already registered");
307         }
308 
309         if (!this.is_empty() && this.enabled)
310         {
311             notifier();
312             return false;
313         }
314         else
315         {
316             this.notifiers ~= notifier;
317             return true;
318         }
319     }
320 
321     /***************************************************************************
322 
323         Check whether the provided notifier is already registered.
324         This allows the code to avoid calling ready() with the same notifier,
325         which may throw or add duplicate notifiers.
326 
327         Note: This is an O(n) search, however it should not have a
328         performance impact in most cases since the number of registered
329         notifiers is typically very low.
330 
331         Params:
332             notifier = the callback to check for
333 
334         Returns:
335             true if the notifier is registered
336 
337     ***************************************************************************/
338 
339     final public bool isRegistered ( scope NotificationDg notifier )
340     {
341         foreach (wait_notifier; this.notifiers[])
342         {
343             if (notifier is wait_notifier)
344                 return true;
345         }
346 
347         return false;
348     }
349 
350 
351     /***************************************************************************
352 
353         Returns:
354             how many notification delegates are waiting for data
355 
356     ***************************************************************************/
357 
358     final public size_t waiting ( )
359     {
360         return this.notifiers.length;
361     }
362 
363 
364     /***************************************************************************
365 
366         Push an item into the queue and notify the next waiting notification
367         delegate about it.
368 
369         Params:
370           data = array of data that the item consists of
371 
372         Returns:
373           true if push was successful
374           false if not
375 
376    **************************************************************************/
377 
378     public bool push ( in void[] data )
379     {
380         if ( !this.queue.push(data) ) return false;
381 
382         this.notify();
383 
384         return true;
385     }
386 
387 
388     /***************************************************************************
389 
390         Push an item into the queue and notify the next waiting handler about
391         it.
392 
393         Params:
394             size   = size of the item to push
395             filler = delegate that will be called with that item to fill in the
396                      actual data
397 
398         Returns:
399             true if push was successful
400             false if not
401 
402     ***************************************************************************/
403 
404     public bool push ( size_t size, scope void delegate ( void[] ) filler )
405     {
406         auto target = this.queue.push(size);
407 
408         if (target is null) return false;
409 
410         filler(target);
411 
412         this.notify();
413 
414         return true;
415     }
416 
417 
418     /***************************************************************************
419 
420         suspend consuming of the queue
421 
422     ***************************************************************************/
423 
424     public void suspend ( )
425     {
426         if (this.enabled == false)
427         {
428             return;
429         }
430 
431         this.enabled = false;
432     }
433 
434 
435     /***************************************************************************
436 
437         Returns true if the queue is suspended, else false
438 
439     ***************************************************************************/
440 
441     public bool suspended ( )
442     {
443         return this.enabled == false;
444     }
445 
446 
447     /***************************************************************************
448 
449         resume consuming of the queue
450 
451     ***************************************************************************/
452 
453     public void resume ( )
454     {
455         if (this.enabled == true)
456         {
457             return;
458         }
459 
460         this.enabled = true;
461 
462         foreach (notifier; this.notifiers[])
463         {
464             this.notify();
465         }
466     }
467 
468 
469     /***************************************************************************
470 
471         pops an element if the queue is enabled
472 
473     ***************************************************************************/
474 
475     public void[] pop ( )
476     {
477         if ( !this.enabled )
478         {
479             return null;
480         }
481 
482         return this.queue.pop();
483     }
484 
485 
486     /***************************************************************************
487 
488         Calls the next waiting notification delegate, if queue is enabled.
489 
490     ***************************************************************************/
491 
492     private void notify ( )
493     {
494         if ( this.notifiers.length > 0 && this.enabled )
495         {
496             auto dg = notifiers.cut();
497 
498             dg();
499         }
500     }
501 }
502 
503 
504 /*******************************************************************************
505 
506     Templated Notifying Queue implementation
507 
508     A concrete client should have an instance of this class and use it
509     to manage the connections and requests
510 
511     Note: the stored type T is automatically de/serialized using the
512     StructSerializer. This performs a deep serialization of sub-structs and
513     array members. Union members are shallowly serialized. Delegate and class
514     members cannot be serialized.
515 
516     Params:
517         T = type that the queue should store. If it's a struct it is stored
518             using the struct serializer, else it is storing it directly. Note
519             that by default the memory is not gc-aware so you reference
520             something from only the stored object, the gc could collect it. If
521             you desire different behavior pass your own queue instance to the
522             constructor
523 
524 *******************************************************************************/
525 
526 class NotifyingQueue ( T ) : NotifyingByteQueue
527 {
528     // add to overload set
529     alias NotifyingByteQueue.push push;
530 
531     /***************************************************************************
532 
533         Constructor
534 
535         Params:
536             max_bytes = size of the queue in bytes
537 
538     ***************************************************************************/
539 
540     public this ( size_t max_bytes )
541     {
542         super(max_bytes);
543     }
544 
545 
546     /***************************************************************************
547 
548         Constructor
549 
550         Params:
551             queue = instance of the queue implementation that will be used
552 
553     ***************************************************************************/
554 
555     public this ( IByteQueue queue )
556     {
557         super(queue);
558     }
559 
560 
561     /***************************************************************************
562 
563         Push a new request on the queue
564 
565         Params:
566             request = The request to push
567 
568         Returns:
569             true if push was successful
570             false if not
571 
572     ***************************************************************************/
573 
574     bool push ( ref T request )
575     {
576         static if ( is(T == struct) )
577             auto length = Serializer.countRequiredSize(request);
578         else
579             auto length = request.sizeof;
580 
581         void filler ( void[] target )
582         {
583             static if ( is(T == struct) )
584                 Serializer.serialize(request, target);
585             else
586                 target[] = (&request)[0..1];
587         }
588 
589         return super.push(length, &filler);
590     }
591 
592     static if ( is(T == struct) )
593     {
594         /***********************************************************************
595 
596             Pops a Request instance from the queue
597 
598             Params:
599                 cont_buffer = contiguous buffer to deserialize to
600 
601             Returns:
602                 pointer to the deserialized struct, completely allocated in the
603                 given buffer
604 
605         ***********************************************************************/
606 
607         T* pop ( ref Contiguous!(T) cont_buffer )
608         {
609             if ( !this.enabled ) return null;
610 
611             T* instance;
612 
613             auto data = super.pop();
614 
615             if (data is null)
616             {
617                 return null;
618             }
619 
620             const(void[]) void_buffer = data;
621 
622             Deserializer.deserialize(void_buffer, cont_buffer);
623 
624             return cont_buffer.ptr;
625         }
626     }
627     else
628     {
629         /***********************************************************************
630 
631             Pops a Request instance from the queue
632 
633             Params:
634                 buffer = deserialisation buffer to use
635 
636             Returns:
637                 pointer to the deserialized item, completely allocated in the
638                 given buffer
639 
640         ***********************************************************************/
641 
642         T* pop ( ref void[] buffer )
643         {
644             if ( !this.enabled ) return null;
645 
646             T* instance;
647 
648             auto data = super.pop();
649 
650             if (data is null)
651             {
652                 return null;
653             }
654 
655             buffer.copy(data);
656 
657             return cast(T*)buffer.ptr;
658         }
659     }
660 }
661 
662 unittest
663 {
664     void dg ( ) { }
665 
666     auto queue = new NotifyingByteQueue(1024);
667     test(!queue.isRegistered(&dg));
668 
669     queue.ready(&dg);
670     test(queue.isRegistered(&dg));
671 }
672 
673 /// NotifyingQueue with a non-struct type
674 unittest
675 {
676     auto queue = new NotifyingQueue!(char[])(1024);
677 
678     char[][] arr = ["foo".dup, "bar".dup];
679 
680     queue.push(arr[0]);
681     queue.push(arr[1]);
682 
683     void[] buffer_1;
684 
685     auto str_0 = queue.pop(buffer_1);
686 
687     test!("==")(*str_0, "foo");
688 
689     void[] buffer_2;
690 
691     auto str_1 = queue.pop(buffer_2);
692 
693     test!("==")(*str_0, "foo");  // ensure there was no overwrite
694     test!("==")(*str_1, "bar");
695 }
696 
697 /// NotifyingQueue with a struct
698 unittest
699 {
700     struct S { char[] value; }
701 
702     S[2] arr = [S("foo".dup), S("bar".dup)];
703 
704     auto queue = new NotifyingQueue!(S)(1024);
705 
706     queue.push(arr[0]);
707     queue.push(arr[1]);
708 
709     Contiguous!(S) ctg_1;
710 
711     auto s0 = queue.pop(ctg_1);
712 
713     test!("==")(s0.value, "foo");
714 
715     Contiguous!(S) ctg_2;
716 
717     auto s1 = queue.pop(ctg_2);
718 
719     test!("==")(s0.value, "foo");  // ensure there was no overwrite
720     test!("==")(s1.value, "bar");
721 }
722 
723 // Make sure NotifyingQueue template is instantinated & compiled
724 unittest
725 {
726     struct Dummy
727     {
728         int a;
729         int b;
730         char[] c;
731     }
732 
733     void dg ( ) { }
734 
735     auto queue = new NotifyingQueue!(Dummy)(1024);
736     test(!queue.isRegistered(&dg));
737 
738     queue.ready(&dg);
739     test(queue.isRegistered(&dg));
740 }
741 
742 
743 unittest
744 {
745     auto q = new NotifyingQueue!(char)(256);
746 }