1 /*******************************************************************************
2 
3     Helper classes to manage the situations where a set of objects implementing
4     ISuspendable should be throttled based on a count of pending items of
5     some kind. For example, one common situation of this type is as follows:
6 
7         1. You are streaming data from one or more ISuspendable sources.
8         2. For each chunk of data received you wish to do some processing which
9            will not finish immediately. (Thus the received data need to be kept
10            around in some way, forming a set of 'pending items'.)
11         3. The ISuspendables which are providing the input data must be
12            throttled (i.e. suspended and resumed) based on the number of pending
13            items.
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24 *******************************************************************************/
25 
26 module ocean.io.model.SuspendableThrottlerCount;
27 
28 
29 
30 import ocean.transition;
31 import ocean.core.Verify;
32 
33 import ocean.io.model.ISuspendable,
34        ocean.io.model.ISuspendableThrottler,
35        ocean.util.container.AppendBuffer;
36 
37 import ocean.core.Array : contains;
38 
39 debug import ocean.io.Stdout;
40 
41 
42 /*******************************************************************************
43 
44     Simple suspendable throttler which just counts the number of pending items,
45     and throttles the suspendables based on that count. No data other than the
46     pending count is stored.
47 
48 *******************************************************************************/
49 
50 public class SuspendableThrottlerCount : ISuspendableThrottlerCount
51 {
52     /***************************************************************************
53 
54         Number of items pending.
55 
56     ***************************************************************************/
57 
58     private size_t count;
59 
60 
61     /***************************************************************************
62 
63         Constructor.
64 
65         Params:
66             suspend_point = point at which the suspendables are suspended
67             resume_point = point at which the suspendables are resumed
68 
69     ***************************************************************************/
70 
71     public this ( size_t suspend_point, size_t resume_point )
72     {
73         super(suspend_point, resume_point);
74     }
75 
76 
77     /***************************************************************************
78 
79         Increases the count of pending items and throttles the suspendables.
80 
81         Aliased as opPostInc.
82 
83     ***************************************************************************/
84 
85     public void inc ( )
86     {
87         verify(this.count < this.count.max);
88 
89         this.count++;
90         super.throttledSuspend();
91     }
92     
93     // WORKAROUND: DMD 2.068 had difficulties  resolving multiple overloads of
94     // `add` when one came from alias and other was "native" method. Creating
95     // distinct name (`inc`) allowed to disambugate it manually
96     public alias inc add;
97     public alias inc opPostInc;
98 
99 
100     /***************************************************************************
101 
102         Increases the count of pending items and throttles the suspendables.
103 
104         Params:
105             n = number of pending items to add
106 
107         Aliased as opAddAssign.
108 
109     ***************************************************************************/
110 
111     public void add ( size_t n )
112     {
113         verify(this.count <= this.count.max - n);
114 
115         this.count += n;
116         super.throttledSuspend();
117     }
118 
119     public alias add opAddAssign;
120 
121 
122     /***************************************************************************
123 
124         Decreases the count of pending items and throttles the suspendables.
125 
126         Aliased as opPostDec.
127 
128     ***************************************************************************/
129 
130     public void dec ( )
131     {
132         verify(this.count > 0);
133 
134         this.count--;
135         super.throttledResume();
136     }
137 
138     // WORKAROUND: DMD 2.068 had difficulties  resolving multiple overloads of
139     // `remove` when one came from alias and other was "native" method. Creating
140     // distinct name (`dec`) allowed to disambugate it manually
141     public alias dec remove;
142     public alias dec opPostDec;
143 
144 
145     /***************************************************************************
146 
147         Decreases the count of pending items and throttles the suspendables.
148 
149         Params:
150             n = number of pending items to remove
151 
152         Aliased as opSubAssign.
153 
154     ***************************************************************************/
155 
156     public void remove ( size_t n )
157     {
158         verify(this.count >= n);
159         this.count -= n;
160         super.throttledResume();
161     }
162 
163     public alias add opSubAssign;
164 
165 
166     /***************************************************************************
167 
168         Returns:
169             the number of pending items
170 
171     ***************************************************************************/
172 
173     override public size_t length ( )
174     {
175         return this.count;
176     }
177 }
178 
179 
180 /*******************************************************************************
181 
182     SuspendableThrottlerCount unittest.
183 
184 *******************************************************************************/
185 
186 unittest
187 {
188     scope class SuspendableThrottlerCount_Test : ISuspendableThrottlerCount_Test
189     {
190         private SuspendableThrottlerCount count;
191 
192         this ( )
193         {
194             this.count = new SuspendableThrottlerCount(this.suspend, this.resume);
195             super(this.count);
196         }
197 
198         override void inc ( )
199         {
200             this.count++;
201         }
202 
203         override void dec ( )
204         {
205             this.count--;
206         }
207     }
208 
209     scope test = new SuspendableThrottlerCount_Test;
210 }
211 
212 
213 /*******************************************************************************
214 
215     Abstract base class for suspendable throttlers which throttle based on a
216     count of pending items of some kind.
217 
218     Provides the following additional functionality:
219         * An abstract length() method which determines the current count of
220           pending items.
221         * suspend() and resume() methods which suspend or resume the
222           ISuspendables based on the count of pending items and the suspend and
223           resume points defined in the constructor.
224 
225 *******************************************************************************/
226 
227 abstract public class ISuspendableThrottlerCount : ISuspendableThrottler
228 {
229     /***************************************************************************
230 
231         When the number of pending items reaches this value or greater, the
232         suspendables will be suspended.
233 
234     ***************************************************************************/
235 
236     public Const!(size_t) suspend_point;
237 
238 
239     /***************************************************************************
240 
241         When the number of pending items reaches this value or less, the
242         suspendables will be resumed.
243 
244     ***************************************************************************/
245 
246     public Const!(size_t) resume_point;
247 
248 
249     /***************************************************************************
250 
251         Constructor.
252 
253         Params:
254             suspend_point = point at which the suspendables are suspended
255             resume_point = point at which the suspendables are resumed
256 
257     ***************************************************************************/
258 
259     public this ( size_t suspend_point, size_t resume_point )
260     {
261         verify(suspend_point > resume_point);
262 
263         this.suspend_point = suspend_point;
264         this.resume_point = resume_point;
265     }
266 
267 
268     /***************************************************************************
269 
270         Returns:
271             the number of pending items
272 
273     ***************************************************************************/
274 
275     abstract public size_t length ( );
276 
277 
278     /***************************************************************************
279 
280         Decides whether the suspendables should be suspended. Called by
281         throttle() when not suspended.
282 
283         If the number of pending items is greater than the suspend point
284         specified in the constructor, then the suspendables are suspended,
285         stopping the input.
286 
287         Returns:
288             true if the suspendables should be suspeneded
289 
290     ***************************************************************************/
291 
292     override protected bool suspend ( )
293     {
294         return this.length >= this.suspend_point;
295     }
296 
297 
298     /***************************************************************************
299 
300         Decides whether the suspendables should be resumed. Called by
301         throttle() when suspended.
302 
303         If the number of pending items is less than the resume point specified
304         in the constructor, then the suspendables are resumed, restarting the
305         input.
306 
307         Returns:
308             true if the suspendables should be resumed
309 
310     ***************************************************************************/
311 
312     override protected bool resume ( )
313     {
314         return this.length <= this.resume_point;
315     }
316 }
317 
318 
319 /*******************************************************************************
320 
321     Abstract base class which tests an ISuspendableThrottlerCount instance. The
322     abstract inc() and dec() methods (which increment and decrement the count)
323     must be implemented.
324 
325 *******************************************************************************/
326 
327 version ( UnitTest )
328 {
329     private import ocean.core.Test;
330 
331     abstract scope class ISuspendableThrottlerCount_Test
332     {
333         static immutable suspend = 10;
334         static immutable resume = 2;
335 
336         protected ISuspendableThrottlerCount throttler;
337 
338         this ( ISuspendableThrottlerCount throttler )
339         {
340             this.throttler = throttler;
341 
342             // Fill up throttler to one before suspension
343             for ( int i; i < suspend - 1; i++ )
344             {
345                 this.inc();
346                 test(!this.throttler.suspended);
347             }
348 
349             // Next increment should suspend
350             this.inc();
351             test(this.throttler.suspended);
352 
353             // Empty throttler to one before resumption
354             static immutable diff = suspend - resume;
355             for ( int i; i < diff - 1; i++ )
356             {
357                 this.dec();
358                 test(this.throttler.suspended);
359             }
360 
361             // Next decrement should resume
362             this.dec();
363             test(!this.throttler.suspended);
364         }
365 
366         abstract void inc ( );
367         abstract void dec ( );
368     }
369 }