1 /******************************************************************************* 2 3 Helper classes to manage the situations where a set of objects implementing 4 ISuspendable should be throttled based on a count of pending items of 5 some kind. For example, one common situation of this type is as follows: 6 7 1. You are streaming data from one or more ISuspendable sources. 8 2. For each chunk of data received you wish to do some processing which 9 will not finish immediately. (Thus the received data need to be kept 10 around in some way, forming a set of 'pending items'.) 11 3. The ISuspendables which are providing the input data must be 12 throttled (i.e. suspended and resumed) based on the number of pending 13 items. 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 *******************************************************************************/ 25 26 module ocean.io.model.SuspendableThrottlerCount; 27 28 29 30 import ocean.transition; 31 import ocean.core.Verify; 32 33 import ocean.io.model.ISuspendable, 34 ocean.io.model.ISuspendableThrottler, 35 ocean.util.container.AppendBuffer; 36 37 import ocean.core.Array : contains; 38 39 debug import ocean.io.Stdout; 40 41 42 /******************************************************************************* 43 44 Simple suspendable throttler which just counts the number of pending items, 45 and throttles the suspendables based on that count. No data other than the 46 pending count is stored. 47 48 *******************************************************************************/ 49 50 public class SuspendableThrottlerCount : ISuspendableThrottlerCount 51 { 52 /*************************************************************************** 53 54 Number of items pending. 55 56 ***************************************************************************/ 57 58 private size_t count; 59 60 61 /*************************************************************************** 62 63 Constructor. 64 65 Params: 66 suspend_point = point at which the suspendables are suspended 67 resume_point = point at which the suspendables are resumed 68 69 ***************************************************************************/ 70 71 public this ( size_t suspend_point, size_t resume_point ) 72 { 73 super(suspend_point, resume_point); 74 } 75 76 77 /*************************************************************************** 78 79 Increases the count of pending items and throttles the suspendables. 80 81 Aliased as opPostInc. 82 83 ***************************************************************************/ 84 85 public void inc ( ) 86 { 87 verify(this.count < this.count.max); 88 89 this.count++; 90 super.throttledSuspend(); 91 } 92 93 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of 94 // `add` when one came from alias and other was "native" method. Creating 95 // distinct name (`inc`) allowed to disambugate it manually 96 public alias inc add; 97 public alias inc opPostInc; 98 99 100 /*************************************************************************** 101 102 Increases the count of pending items and throttles the suspendables. 103 104 Params: 105 n = number of pending items to add 106 107 Aliased as opAddAssign. 108 109 ***************************************************************************/ 110 111 public void add ( size_t n ) 112 { 113 verify(this.count <= this.count.max - n); 114 115 this.count += n; 116 super.throttledSuspend(); 117 } 118 119 public alias add opAddAssign; 120 121 122 /*************************************************************************** 123 124 Decreases the count of pending items and throttles the suspendables. 125 126 Aliased as opPostDec. 127 128 ***************************************************************************/ 129 130 public void dec ( ) 131 { 132 verify(this.count > 0); 133 134 this.count--; 135 super.throttledResume(); 136 } 137 138 // WORKAROUND: DMD 2.068 had difficulties resolving multiple overloads of 139 // `remove` when one came from alias and other was "native" method. Creating 140 // distinct name (`dec`) allowed to disambugate it manually 141 public alias dec remove; 142 public alias dec opPostDec; 143 144 145 /*************************************************************************** 146 147 Decreases the count of pending items and throttles the suspendables. 148 149 Params: 150 n = number of pending items to remove 151 152 Aliased as opSubAssign. 153 154 ***************************************************************************/ 155 156 public void remove ( size_t n ) 157 { 158 verify(this.count >= n); 159 this.count -= n; 160 super.throttledResume(); 161 } 162 163 public alias add opSubAssign; 164 165 166 /*************************************************************************** 167 168 Returns: 169 the number of pending items 170 171 ***************************************************************************/ 172 173 override public size_t length ( ) 174 { 175 return this.count; 176 } 177 } 178 179 180 /******************************************************************************* 181 182 SuspendableThrottlerCount unittest. 183 184 *******************************************************************************/ 185 186 unittest 187 { 188 scope class SuspendableThrottlerCount_Test : ISuspendableThrottlerCount_Test 189 { 190 private SuspendableThrottlerCount count; 191 192 this ( ) 193 { 194 this.count = new SuspendableThrottlerCount(this.suspend, this.resume); 195 super(this.count); 196 } 197 198 override void inc ( ) 199 { 200 this.count++; 201 } 202 203 override void dec ( ) 204 { 205 this.count--; 206 } 207 } 208 209 scope test = new SuspendableThrottlerCount_Test; 210 } 211 212 213 /******************************************************************************* 214 215 Abstract base class for suspendable throttlers which throttle based on a 216 count of pending items of some kind. 217 218 Provides the following additional functionality: 219 * An abstract length() method which determines the current count of 220 pending items. 221 * suspend() and resume() methods which suspend or resume the 222 ISuspendables based on the count of pending items and the suspend and 223 resume points defined in the constructor. 224 225 *******************************************************************************/ 226 227 abstract public class ISuspendableThrottlerCount : ISuspendableThrottler 228 { 229 /*************************************************************************** 230 231 When the number of pending items reaches this value or greater, the 232 suspendables will be suspended. 233 234 ***************************************************************************/ 235 236 public Const!(size_t) suspend_point; 237 238 239 /*************************************************************************** 240 241 When the number of pending items reaches this value or less, the 242 suspendables will be resumed. 243 244 ***************************************************************************/ 245 246 public Const!(size_t) resume_point; 247 248 249 /*************************************************************************** 250 251 Constructor. 252 253 Params: 254 suspend_point = point at which the suspendables are suspended 255 resume_point = point at which the suspendables are resumed 256 257 ***************************************************************************/ 258 259 public this ( size_t suspend_point, size_t resume_point ) 260 { 261 verify(suspend_point > resume_point); 262 263 this.suspend_point = suspend_point; 264 this.resume_point = resume_point; 265 } 266 267 268 /*************************************************************************** 269 270 Returns: 271 the number of pending items 272 273 ***************************************************************************/ 274 275 abstract public size_t length ( ); 276 277 278 /*************************************************************************** 279 280 Decides whether the suspendables should be suspended. Called by 281 throttle() when not suspended. 282 283 If the number of pending items is greater than the suspend point 284 specified in the constructor, then the suspendables are suspended, 285 stopping the input. 286 287 Returns: 288 true if the suspendables should be suspeneded 289 290 ***************************************************************************/ 291 292 override protected bool suspend ( ) 293 { 294 return this.length >= this.suspend_point; 295 } 296 297 298 /*************************************************************************** 299 300 Decides whether the suspendables should be resumed. Called by 301 throttle() when suspended. 302 303 If the number of pending items is less than the resume point specified 304 in the constructor, then the suspendables are resumed, restarting the 305 input. 306 307 Returns: 308 true if the suspendables should be resumed 309 310 ***************************************************************************/ 311 312 override protected bool resume ( ) 313 { 314 return this.length <= this.resume_point; 315 } 316 } 317 318 319 /******************************************************************************* 320 321 Abstract base class which tests an ISuspendableThrottlerCount instance. The 322 abstract inc() and dec() methods (which increment and decrement the count) 323 must be implemented. 324 325 *******************************************************************************/ 326 327 version ( UnitTest ) 328 { 329 private import ocean.core.Test; 330 331 abstract scope class ISuspendableThrottlerCount_Test 332 { 333 static immutable suspend = 10; 334 static immutable resume = 2; 335 336 protected ISuspendableThrottlerCount throttler; 337 338 this ( ISuspendableThrottlerCount throttler ) 339 { 340 this.throttler = throttler; 341 342 // Fill up throttler to one before suspension 343 for ( int i; i < suspend - 1; i++ ) 344 { 345 this.inc(); 346 test(!this.throttler.suspended); 347 } 348 349 // Next increment should suspend 350 this.inc(); 351 test(this.throttler.suspended); 352 353 // Empty throttler to one before resumption 354 static immutable diff = suspend - resume; 355 for ( int i; i < diff - 1; i++ ) 356 { 357 this.dec(); 358 test(this.throttler.suspended); 359 } 360 361 // Next decrement should resume 362 this.dec(); 363 test(!this.throttler.suspended); 364 } 365 366 abstract void inc ( ); 367 abstract void dec ( ); 368 } 369 }