1 /*******************************************************************************
2 
3     Helper classes to manage the situations where a set of objects implementing
4     ISuspendable should be throttled based on a count of pending items of
5     some kind. For example, one common situation of this type is as follows:
6 
7         1. You are streaming data from one or more ISuspendable sources.
8         2. For each chunk of data received you wish to do some processing which
9            will not finish immediately. (Thus the received data need to be kept
10            around in some way, forming a set of 'pending items'.)
11         3. The ISuspendables which are providing the input data must be
12            throttled (i.e. suspended and resumed) based on the number of pending
13            items.
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24 *******************************************************************************/
25 
26 module ocean.io.model.SuspendableThrottlerCount;
27 
28 
29 
30 import ocean.meta.types.Qualifiers;
31 import ocean.core.Verify;
32 
33 import ocean.io.model.ISuspendable,
34        ocean.io.model.ISuspendableThrottler,
35        ocean.util.container.AppendBuffer;
36 
37 import ocean.core.Array : contains;
38 
39 debug import ocean.io.Stdout;
40 
41 
42 /*******************************************************************************
43 
44     Simple suspendable throttler which just counts the number of pending items,
45     and throttles the suspendables based on that count. No data other than the
46     pending count is stored.
47 
48 *******************************************************************************/
49 
50 public class SuspendableThrottlerCount : ISuspendableThrottlerCount
51 {
52     /***************************************************************************
53 
54         Number of items pending.
55 
56     ***************************************************************************/
57 
58     private size_t count;
59 
60 
61     /***************************************************************************
62 
63         Constructor.
64 
65         Params:
66             suspend_point = point at which the suspendables are suspended
67             resume_point = point at which the suspendables are resumed
68 
69     ***************************************************************************/
70 
71     public this ( size_t suspend_point, size_t resume_point )
72     {
73         super(suspend_point, resume_point);
74     }
75 
76 
77     /***************************************************************************
78 
79         Increases the count of pending items and throttles the suspendables.
80 
81     ***************************************************************************/
82 
83     public void inc ( )
84     {
85         verify(this.count < this.count.max);
86 
87         this.count++;
88         super.throttledSuspend();
89     }
90 
91     /// ditto
92     public void opUnary ( string op ) ( ) if (op == "++")
93     {
94         return this.inc();
95     }
96 
97     // WORKAROUND: DMD 2.068 had difficulties  resolving multiple overloads of
98     // `add` when one came from alias and other was "native" method. Creating
99     // distinct name (`inc`) allowed to disambugate it manually
100     public alias inc add;
101 
102 
103     /***************************************************************************
104 
105         Increases the count of pending items and throttles the suspendables.
106 
107         Params:
108             n = number of pending items to add
109 
110     ***************************************************************************/
111 
112     public void add ( size_t n )
113     {
114         verify(this.count <= this.count.max - n);
115 
116         this.count += n;
117         super.throttledSuspend();
118     }
119 
120     /// ditto
121     public void opOpAssign ( string op ) ( size_t n ) if (op == "+")
122     {
123         this.add(n);
124     }
125 
126 
127     /***************************************************************************
128 
129         Decreases the count of pending items and throttles the suspendables.
130 
131     ***************************************************************************/
132 
133     public void dec ( )
134     {
135         verify(this.count > 0);
136 
137         this.count--;
138         super.throttledResume();
139     }
140 
141     /// ditto
142     public void opUnary ( string op ) ( ) if (op == "--")
143     {
144         return this.dec();
145     }
146 
147     // WORKAROUND: DMD 2.068 had difficulties  resolving multiple overloads of
148     // `remove` when one came from alias and other was "native" method. Creating
149     // distinct name (`dec`) allowed to disambugate it manually
150     public alias dec remove;
151 
152 
153     /***************************************************************************
154 
155         Decreases the count of pending items and throttles the suspendables.
156 
157         Params:
158             n = number of pending items to remove
159 
160     ***************************************************************************/
161 
162     public void remove ( size_t n )
163     {
164         verify(this.count >= n);
165         this.count -= n;
166         super.throttledResume();
167     }
168 
169     /// ditto
170     public void opOpAssign ( string op ) ( size_t n ) if (op == "-")
171     {
172         this.remove(n);
173     }
174 
175 
176     /***************************************************************************
177 
178         Returns:
179             the number of pending items
180 
181     ***************************************************************************/
182 
183     override public size_t length ( )
184     {
185         return this.count;
186     }
187 }
188 
189 unittest
190 {
191     import ocean.core.Test : test;
192 
193     // Helper class to allow us to test operator overloads
194     // without touching other parts of the class logic
195     static class OpsTest : SuspendableThrottlerCount
196     {
197         this ()
198         {
199             size_t suspend_point = 8;
200             size_t resume_point = 3;
201             super(suspend_point, resume_point);
202         }
203 
204         override void inc () { this.count++; }
205         override void dec () { this.count--; }
206 
207         override void add (size_t n) { this.count += n; }
208         override void remove (size_t n) { this.count -= n; }
209     }
210 
211     scope ops_test = new OpsTest;
212     test!"=="(ops_test.length, 0);
213 
214     ops_test++;
215     test!"=="(ops_test.length, 1);
216     test(!ops_test.suspend);
217     test(ops_test.resume);
218 
219     ops_test += 7;
220     test!"=="(ops_test.length, 8);
221     test(ops_test.suspend);
222     test(!ops_test.resume);
223 
224     ops_test--;
225     test!"=="(ops_test.length, 7);
226     test(!ops_test.suspend);
227     test(!ops_test.resume);
228 
229     ops_test -= 4;
230     test!"=="(ops_test.length, 3);
231     test(!ops_test.suspend);
232     test(ops_test.resume);
233 }
234 
235 
236 /*******************************************************************************
237 
238     SuspendableThrottlerCount unittest.
239 
240 *******************************************************************************/
241 
242 unittest
243 {
244     static class SuspendableThrottlerCount_Test : ISuspendableThrottlerCount_Test
245     {
246         private SuspendableThrottlerCount count;
247 
248         this ( )
249         {
250             this.count = new SuspendableThrottlerCount(this.suspend, this.resume);
251             super(this.count);
252         }
253 
254         override void inc ( )
255         {
256             this.count++;
257         }
258 
259         override void dec ( )
260         {
261             this.count--;
262         }
263     }
264 
265     scope test = new SuspendableThrottlerCount_Test;
266 }
267 
268 
269 /*******************************************************************************
270 
271     Abstract base class for suspendable throttlers which throttle based on a
272     count of pending items of some kind.
273 
274     Provides the following additional functionality:
275         * An abstract length() method which determines the current count of
276           pending items.
277         * suspend() and resume() methods which suspend or resume the
278           ISuspendables based on the count of pending items and the suspend and
279           resume points defined in the constructor.
280 
281 *******************************************************************************/
282 
283 abstract public class ISuspendableThrottlerCount : ISuspendableThrottler
284 {
285     /***************************************************************************
286 
287         When the number of pending items reaches this value or greater, the
288         suspendables will be suspended.
289 
290     ***************************************************************************/
291 
292     public const(size_t) suspend_point;
293 
294 
295     /***************************************************************************
296 
297         When the number of pending items reaches this value or less, the
298         suspendables will be resumed.
299 
300     ***************************************************************************/
301 
302     public const(size_t) resume_point;
303 
304 
305     /***************************************************************************
306 
307         Constructor.
308 
309         Params:
310             suspend_point = point at which the suspendables are suspended
311             resume_point = point at which the suspendables are resumed
312 
313     ***************************************************************************/
314 
315     public this ( size_t suspend_point, size_t resume_point )
316     {
317         verify(suspend_point > resume_point);
318 
319         this.suspend_point = suspend_point;
320         this.resume_point = resume_point;
321     }
322 
323 
324     /***************************************************************************
325 
326         Returns:
327             the number of pending items
328 
329     ***************************************************************************/
330 
331     abstract public size_t length ( );
332 
333 
334     /***************************************************************************
335 
336         Decides whether the suspendables should be suspended. Called by
337         throttle() when not suspended.
338 
339         If the number of pending items is greater than the suspend point
340         specified in the constructor, then the suspendables are suspended,
341         stopping the input.
342 
343         Returns:
344             true if the suspendables should be suspeneded
345 
346     ***************************************************************************/
347 
348     override protected bool suspend ( )
349     {
350         return this.length >= this.suspend_point;
351     }
352 
353 
354     /***************************************************************************
355 
356         Decides whether the suspendables should be resumed. Called by
357         throttle() when suspended.
358 
359         If the number of pending items is less than the resume point specified
360         in the constructor, then the suspendables are resumed, restarting the
361         input.
362 
363         Returns:
364             true if the suspendables should be resumed
365 
366     ***************************************************************************/
367 
368     override protected bool resume ( )
369     {
370         return this.length <= this.resume_point;
371     }
372 }
373 
374 
375 /*******************************************************************************
376 
377     Abstract base class which tests an ISuspendableThrottlerCount instance. The
378     abstract inc() and dec() methods (which increment and decrement the count)
379     must be implemented.
380 
381 *******************************************************************************/
382 
383 version (unittest)
384 {
385     private import ocean.core.Test;
386 
387     abstract class ISuspendableThrottlerCount_Test
388     {
389         static immutable suspend = 10;
390         static immutable resume = 2;
391 
392         protected ISuspendableThrottlerCount throttler;
393 
394         this ( ISuspendableThrottlerCount throttler )
395         {
396             this.throttler = throttler;
397 
398             // Fill up throttler to one before suspension
399             for ( int i; i < suspend - 1; i++ )
400             {
401                 this.inc();
402                 test(!this.throttler.suspended);
403             }
404 
405             // Next increment should suspend
406             this.inc();
407             test(this.throttler.suspended);
408 
409             // Empty throttler to one before resumption
410             static immutable diff = suspend - resume;
411             for ( int i; i < diff - 1; i++ )
412             {
413                 this.dec();
414                 test(this.throttler.suspended);
415             }
416 
417             // Next decrement should resume
418             this.dec();
419             test(!this.throttler.suspended);
420         }
421 
422         abstract void inc ( );
423         abstract void dec ( );
424     }
425 }