1 /******************************************************************************* 2 3 Helper classes to manage the situations where a set of objects implementing 4 ISuspendable should be throttled based on a count of pending items of 5 some kind. For example, one common situation of this type is as follows: 6 7 1. You are streaming data from one or more ISuspendable sources. 8 2. For each chunk of data received you wish to do some processing which 9 will not finish immediately. (Thus the received data need to be kept 10 around in some way, forming a set of 'pending items'.) 11 3. The ISuspendables which are providing the input data must be 12 throttled (i.e. suspended and resumed) based on the number of pending 13 items. 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 *******************************************************************************/ 25 26 module ocean.io.model.SuspendableThrottlerCount; 27 28 29 30 import ocean.meta.types.Qualifiers; 31 import ocean.core.Verify; 32 33 import ocean.io.model.ISuspendable, 34 ocean.io.model.ISuspendableThrottler, 35 ocean.util.container.AppendBuffer; 36 37 import ocean.core.Array : contains; 38 39 debug import ocean.io.Stdout; 40 41 42 /******************************************************************************* 43 44 Simple suspendable throttler which just counts the number of pending items, 45 and throttles the suspendables based on that count. No data other than the 46 pending count is stored. 47 48 *******************************************************************************/ 49 50 public class SuspendableThrottlerCount : ISuspendableThrottlerCount 51 { 52 /*************************************************************************** 53 54 Number of items pending. 55 56 ***************************************************************************/ 57 58 private size_t count; 59 60 61 /*************************************************************************** 62 63 Constructor. 64 65 Params: 66 suspend_point = point at which the suspendables are suspended 67 resume_point = point at which the suspendables are resumed 68 69 ***************************************************************************/ 70 71 public this ( size_t suspend_point, size_t resume_point ) 72 { 73 super(suspend_point, resume_point); 74 } 75 76 77 /*************************************************************************** 78 79 Increases the count of pending items and throttles the suspendables. 80 81 ***************************************************************************/ 82 83 public void inc ( ) 84 { 85 verify(this.count < this.count.max); 86 87 this.count++; 88 super.throttledSuspend(); 89 } 90 91 /// ditto 92 public void opUnary ( string op ) ( ) if (op == "++") 93 { 94 return this.inc(); 95 } 96 97 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of 98 // `add` when one came from alias and other was "native" method. Creating 99 // distinct name (`inc`) allowed to disambugate it manually 100 public alias inc add; 101 102 103 /*************************************************************************** 104 105 Increases the count of pending items and throttles the suspendables. 106 107 Params: 108 n = number of pending items to add 109 110 ***************************************************************************/ 111 112 public void add ( size_t n ) 113 { 114 verify(this.count <= this.count.max - n); 115 116 this.count += n; 117 super.throttledSuspend(); 118 } 119 120 /// ditto 121 public void opOpAssign ( string op ) ( size_t n ) if (op == "+") 122 { 123 this.add(n); 124 } 125 126 127 /*************************************************************************** 128 129 Decreases the count of pending items and throttles the suspendables. 130 131 ***************************************************************************/ 132 133 public void dec ( ) 134 { 135 verify(this.count > 0); 136 137 this.count--; 138 super.throttledResume(); 139 } 140 141 /// ditto 142 public void opUnary ( string op ) ( ) if (op == "--") 143 { 144 return this.dec(); 145 } 146 147 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of 148 // `remove` when one came from alias and other was "native" method. Creating 149 // distinct name (`dec`) allowed to disambugate it manually 150 public alias dec remove; 151 152 153 /*************************************************************************** 154 155 Decreases the count of pending items and throttles the suspendables. 156 157 Params: 158 n = number of pending items to remove 159 160 ***************************************************************************/ 161 162 public void remove ( size_t n ) 163 { 164 verify(this.count >= n); 165 this.count -= n; 166 super.throttledResume(); 167 } 168 169 /// ditto 170 public void opOpAssign ( string op ) ( size_t n ) if (op == "-") 171 { 172 this.remove(n); 173 } 174 175 176 /*************************************************************************** 177 178 Returns: 179 the number of pending items 180 181 ***************************************************************************/ 182 183 override public size_t length ( ) 184 { 185 return this.count; 186 } 187 } 188 189 unittest 190 { 191 import ocean.core.Test : test; 192 193 // Helper class to allow us to test operator overloads 194 // without touching other parts of the class logic 195 static class OpsTest : SuspendableThrottlerCount 196 { 197 this () 198 { 199 size_t suspend_point = 8; 200 size_t resume_point = 3; 201 super(suspend_point, resume_point); 202 } 203 204 override void inc () { this.count++; } 205 override void dec () { this.count--; } 206 207 override void add (size_t n) { this.count += n; } 208 override void remove (size_t n) { this.count -= n; } 209 } 210 211 scope ops_test = new OpsTest; 212 test!"=="(ops_test.length, 0); 213 214 ops_test++; 215 test!"=="(ops_test.length, 1); 216 test(!ops_test.suspend); 217 test(ops_test.resume); 218 219 ops_test += 7; 220 test!"=="(ops_test.length, 8); 221 test(ops_test.suspend); 222 test(!ops_test.resume); 223 224 ops_test--; 225 test!"=="(ops_test.length, 7); 226 test(!ops_test.suspend); 227 test(!ops_test.resume); 228 229 ops_test -= 4; 230 test!"=="(ops_test.length, 3); 231 test(!ops_test.suspend); 232 test(ops_test.resume); 233 } 234 235 236 /******************************************************************************* 237 238 SuspendableThrottlerCount unittest. 239 240 *******************************************************************************/ 241 242 unittest 243 { 244 static class SuspendableThrottlerCount_Test : ISuspendableThrottlerCount_Test 245 { 246 private SuspendableThrottlerCount count; 247 248 this ( ) 249 { 250 this.count = new SuspendableThrottlerCount(this.suspend, this.resume); 251 super(this.count); 252 } 253 254 override void inc ( ) 255 { 256 this.count++; 257 } 258 259 override void dec ( ) 260 { 261 this.count--; 262 } 263 } 264 265 scope test = new SuspendableThrottlerCount_Test; 266 } 267 268 269 /******************************************************************************* 270 271 Abstract base class for suspendable throttlers which throttle based on a 272 count of pending items of some kind. 273 274 Provides the following additional functionality: 275 * An abstract length() method which determines the current count of 276 pending items. 277 * suspend() and resume() methods which suspend or resume the 278 ISuspendables based on the count of pending items and the suspend and 279 resume points defined in the constructor. 280 281 *******************************************************************************/ 282 283 abstract public class ISuspendableThrottlerCount : ISuspendableThrottler 284 { 285 /*************************************************************************** 286 287 When the number of pending items reaches this value or greater, the 288 suspendables will be suspended. 289 290 ***************************************************************************/ 291 292 public const(size_t) suspend_point; 293 294 295 /*************************************************************************** 296 297 When the number of pending items reaches this value or less, the 298 suspendables will be resumed. 299 300 ***************************************************************************/ 301 302 public const(size_t) resume_point; 303 304 305 /*************************************************************************** 306 307 Constructor. 308 309 Params: 310 suspend_point = point at which the suspendables are suspended 311 resume_point = point at which the suspendables are resumed 312 313 ***************************************************************************/ 314 315 public this ( size_t suspend_point, size_t resume_point ) 316 { 317 verify(suspend_point > resume_point); 318 319 this.suspend_point = suspend_point; 320 this.resume_point = resume_point; 321 } 322 323 324 /*************************************************************************** 325 326 Returns: 327 the number of pending items 328 329 ***************************************************************************/ 330 331 abstract public size_t length ( ); 332 333 334 /*************************************************************************** 335 336 Decides whether the suspendables should be suspended. Called by 337 throttle() when not suspended. 338 339 If the number of pending items is greater than the suspend point 340 specified in the constructor, then the suspendables are suspended, 341 stopping the input. 342 343 Returns: 344 true if the suspendables should be suspeneded 345 346 ***************************************************************************/ 347 348 override protected bool suspend ( ) 349 { 350 return this.length >= this.suspend_point; 351 } 352 353 354 /*************************************************************************** 355 356 Decides whether the suspendables should be resumed. Called by 357 throttle() when suspended. 358 359 If the number of pending items is less than the resume point specified 360 in the constructor, then the suspendables are resumed, restarting the 361 input. 362 363 Returns: 364 true if the suspendables should be resumed 365 366 ***************************************************************************/ 367 368 override protected bool resume ( ) 369 { 370 return this.length <= this.resume_point; 371 } 372 } 373 374 375 /******************************************************************************* 376 377 Abstract base class which tests an ISuspendableThrottlerCount instance. The 378 abstract inc() and dec() methods (which increment and decrement the count) 379 must be implemented. 380 381 *******************************************************************************/ 382 383 version (unittest) 384 { 385 private import ocean.core.Test; 386 387 abstract class ISuspendableThrottlerCount_Test 388 { 389 static immutable suspend = 10; 390 static immutable resume = 2; 391 392 protected ISuspendableThrottlerCount throttler; 393 394 this ( ISuspendableThrottlerCount throttler ) 395 { 396 this.throttler = throttler; 397 398 // Fill up throttler to one before suspension 399 for ( int i; i < suspend - 1; i++ ) 400 { 401 this.inc(); 402 test(!this.throttler.suspended); 403 } 404 405 // Next increment should suspend 406 this.inc(); 407 test(this.throttler.suspended); 408 409 // Empty throttler to one before resumption 410 static immutable diff = suspend - resume; 411 for ( int i; i < diff - 1; i++ ) 412 { 413 this.dec(); 414 test(this.throttler.suspended); 415 } 416 417 // Next decrement should resume 418 this.dec(); 419 test(!this.throttler.suspended); 420 } 421 422 abstract void inc ( ); 423 abstract void dec ( ); 424 } 425 }