1 /*******************************************************************************
2 3 Helper classes to manage the situations where a set of objects implementing
4 ISuspendable should be throttled based on a count of pending items of
5 some kind. For example, one common situation of this type is as follows:
6 7 1. You are streaming data from one or more ISuspendable sources.
8 2. For each chunk of data received you wish to do some processing which
9 will not finish immediately. (Thus the received data need to be kept
10 around in some way, forming a set of 'pending items'.)
11 3. The ISuspendables which are providing the input data must be
12 throttled (i.e. suspended and resumed) based on the number of pending
13 items.
14 15 Copyright:
16 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17 All rights reserved.
18 19 License:
20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21 Alternatively, this file may be distributed under the terms of the Tango
22 3-Clause BSD License (see LICENSE_BSD.txt for details).
23 24 *******************************************************************************/25 26 moduleocean.io.model.SuspendableThrottlerCount;
27 28 29 30 importocean.meta.types.Qualifiers;
31 importocean.core.Verify;
32 33 importocean.io.model.ISuspendable,
34 ocean.io.model.ISuspendableThrottler,
35 ocean.util.container.AppendBuffer;
36 37 importocean.core.Array : contains;
38 39 debugimportocean.io.Stdout;
40 41 42 /*******************************************************************************
43 44 Simple suspendable throttler which just counts the number of pending items,
45 and throttles the suspendables based on that count. No data other than the
46 pending count is stored.
47 48 *******************************************************************************/49 50 publicclassSuspendableThrottlerCount : ISuspendableThrottlerCount51 {
52 /***************************************************************************
53 54 Number of items pending.
55 56 ***************************************************************************/57 58 privatesize_tcount;
59 60 61 /***************************************************************************
62 63 Constructor.
64 65 Params:
66 suspend_point = point at which the suspendables are suspended
67 resume_point = point at which the suspendables are resumed
68 69 ***************************************************************************/70 71 publicthis ( size_tsuspend_point, size_tresume_point )
72 {
73 super(suspend_point, resume_point);
74 }
75 76 77 /***************************************************************************
78 79 Increases the count of pending items and throttles the suspendables.
80 81 ***************************************************************************/82 83 publicvoidinc ( )
84 {
85 verify(this.count < this.count.max);
86 87 this.count++;
88 super.throttledSuspend();
89 }
90 91 /// ditto92 publicvoidopUnary ( stringop ) ( ) if (op == "++")
93 {
94 returnthis.inc();
95 }
96 97 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of98 // `add` when one came from alias and other was "native" method. Creating99 // distinct name (`inc`) allowed to disambugate it manually100 publicaliasincadd;
101 102 103 /***************************************************************************
104 105 Increases the count of pending items and throttles the suspendables.
106 107 Params:
108 n = number of pending items to add
109 110 ***************************************************************************/111 112 publicvoidadd ( size_tn )
113 {
114 verify(this.count <= this.count.max - n);
115 116 this.count += n;
117 super.throttledSuspend();
118 }
119 120 /// ditto121 publicvoidopOpAssign ( stringop ) ( size_tn ) if (op == "+")
122 {
123 this.add(n);
124 }
125 126 127 /***************************************************************************
128 129 Decreases the count of pending items and throttles the suspendables.
130 131 ***************************************************************************/132 133 publicvoiddec ( )
134 {
135 verify(this.count > 0);
136 137 this.count--;
138 super.throttledResume();
139 }
140 141 /// ditto142 publicvoidopUnary ( stringop ) ( ) if (op == "--")
143 {
144 returnthis.dec();
145 }
146 147 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of148 // `remove` when one came from alias and other was "native" method. Creating149 // distinct name (`dec`) allowed to disambugate it manually150 publicaliasdecremove;
151 152 153 /***************************************************************************
154 155 Decreases the count of pending items and throttles the suspendables.
156 157 Params:
158 n = number of pending items to remove
159 160 ***************************************************************************/161 162 publicvoidremove ( size_tn )
163 {
164 verify(this.count >= n);
165 this.count -= n;
166 super.throttledResume();
167 }
168 169 /// ditto170 publicvoidopOpAssign ( stringop ) ( size_tn ) if (op == "-")
171 {
172 this.remove(n);
173 }
174 175 176 /***************************************************************************
177 178 Returns:
179 the number of pending items
180 181 ***************************************************************************/182 183 overridepublicsize_tlength ( )
184 {
185 returnthis.count;
186 }
187 }
188 189 unittest190 {
191 importocean.core.Test : test;
192 193 // Helper class to allow us to test operator overloads194 // without touching other parts of the class logic195 staticclassOpsTest : SuspendableThrottlerCount196 {
197 this ()
198 {
199 size_tsuspend_point = 8;
200 size_tresume_point = 3;
201 super(suspend_point, resume_point);
202 }
203 204 overridevoidinc () { this.count++; }
205 overridevoiddec () { this.count--; }
206 207 overridevoidadd (size_tn) { this.count += n; }
208 overridevoidremove (size_tn) { this.count -= n; }
209 }
210 211 scopeops_test = newOpsTest;
212 test!"=="(ops_test.length, 0);
213 214 ops_test++;
215 test!"=="(ops_test.length, 1);
216 test(!ops_test.suspend);
217 test(ops_test.resume);
218 219 ops_test += 7;
220 test!"=="(ops_test.length, 8);
221 test(ops_test.suspend);
222 test(!ops_test.resume);
223 224 ops_test--;
225 test!"=="(ops_test.length, 7);
226 test(!ops_test.suspend);
227 test(!ops_test.resume);
228 229 ops_test -= 4;
230 test!"=="(ops_test.length, 3);
231 test(!ops_test.suspend);
232 test(ops_test.resume);
233 }
234 235 236 /*******************************************************************************
237 238 SuspendableThrottlerCount unittest.
239 240 *******************************************************************************/241 242 unittest243 {
244 staticclassSuspendableThrottlerCount_Test : ISuspendableThrottlerCount_Test245 {
246 privateSuspendableThrottlerCountcount;
247 248 this ( )
249 {
250 this.count = newSuspendableThrottlerCount(this.suspend, this.resume);
251 super(this.count);
252 }
253 254 overridevoidinc ( )
255 {
256 this.count++;
257 }
258 259 overridevoiddec ( )
260 {
261 this.count--;
262 }
263 }
264 265 scopetest = newSuspendableThrottlerCount_Test;
266 }
267 268 269 /*******************************************************************************
270 271 Abstract base class for suspendable throttlers which throttle based on a
272 count of pending items of some kind.
273 274 Provides the following additional functionality:
275 * An abstract length() method which determines the current count of
276 pending items.
277 * suspend() and resume() methods which suspend or resume the
278 ISuspendables based on the count of pending items and the suspend and
279 resume points defined in the constructor.
280 281 *******************************************************************************/282 283 abstractpublicclassISuspendableThrottlerCount : ISuspendableThrottler284 {
285 /***************************************************************************
286 287 When the number of pending items reaches this value or greater, the
288 suspendables will be suspended.
289 290 ***************************************************************************/291 292 publicconst(size_t) suspend_point;
293 294 295 /***************************************************************************
296 297 When the number of pending items reaches this value or less, the
298 suspendables will be resumed.
299 300 ***************************************************************************/301 302 publicconst(size_t) resume_point;
303 304 305 /***************************************************************************
306 307 Constructor.
308 309 Params:
310 suspend_point = point at which the suspendables are suspended
311 resume_point = point at which the suspendables are resumed
312 313 ***************************************************************************/314 315 publicthis ( size_tsuspend_point, size_tresume_point )
316 {
317 verify(suspend_point > resume_point);
318 319 this.suspend_point = suspend_point;
320 this.resume_point = resume_point;
321 }
322 323 324 /***************************************************************************
325 326 Returns:
327 the number of pending items
328 329 ***************************************************************************/330 331 abstractpublicsize_tlength ( );
332 333 334 /***************************************************************************
335 336 Decides whether the suspendables should be suspended. Called by
337 throttle() when not suspended.
338 339 If the number of pending items is greater than the suspend point
340 specified in the constructor, then the suspendables are suspended,
341 stopping the input.
342 343 Returns:
344 true if the suspendables should be suspeneded
345 346 ***************************************************************************/347 348 overrideprotectedboolsuspend ( )
349 {
350 returnthis.length >= this.suspend_point;
351 }
352 353 354 /***************************************************************************
355 356 Decides whether the suspendables should be resumed. Called by
357 throttle() when suspended.
358 359 If the number of pending items is less than the resume point specified
360 in the constructor, then the suspendables are resumed, restarting the
361 input.
362 363 Returns:
364 true if the suspendables should be resumed
365 366 ***************************************************************************/367 368 overrideprotectedboolresume ( )
369 {
370 returnthis.length <= this.resume_point;
371 }
372 }
373 374 375 /*******************************************************************************
376 377 Abstract base class which tests an ISuspendableThrottlerCount instance. The
378 abstract inc() and dec() methods (which increment and decrement the count)
379 must be implemented.
380 381 *******************************************************************************/382 383 version (unittest)
384 {
385 privateimportocean.core.Test;
386 387 abstractclassISuspendableThrottlerCount_Test388 {
389 staticimmutablesuspend = 10;
390 staticimmutableresume = 2;
391 392 protectedISuspendableThrottlerCountthrottler;
393 394 this ( ISuspendableThrottlerCountthrottler )
395 {
396 this.throttler = throttler;
397 398 // Fill up throttler to one before suspension399 for ( inti; i < suspend - 1; i++ )
400 {
401 this.inc();
402 test(!this.throttler.suspended);
403 }
404 405 // Next increment should suspend406 this.inc();
407 test(this.throttler.suspended);
408 409 // Empty throttler to one before resumption410 staticimmutablediff = suspend - resume;
411 for ( inti; i < diff - 1; i++ )
412 {
413 this.dec();
414 test(this.throttler.suspended);
415 }
416 417 // Next decrement should resume418 this.dec();
419 test(!this.throttler.suspended);
420 }
421 422 abstractvoidinc ( );
423 abstractvoiddec ( );
424 }
425 }