1 /******************************************************************************* 2 3 Abstract base classes for suspendable throttlers. 4 5 Provides a simple mechanism for throttling a set of one or more suspendable 6 processes based on some condition (as defined by a derived class). 7 8 Copyright: 9 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 10 All rights reserved. 11 12 License: 13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 14 Alternatively, this file may be distributed under the terms of the Tango 15 3-Clause BSD License (see LICENSE_BSD.txt for details). 16 17 *******************************************************************************/ 18 19 module ocean.io.model.ISuspendableThrottler; 20 21 import ocean.meta.types.Qualifiers; 22 23 24 /******************************************************************************* 25 26 Abstract base class for suspendable throttlers. 27 28 Provides the following functionality: 29 * Maintains a set of ISuspendables which are suspended / resumed 30 together. 31 * A throttle() method, to be called when the suspension state should be 32 updated / reassessed. 33 * Abstract suspend() and resume() methods which define the conditions 34 for suspension and resumption of the set of ISuspendables. 35 * A suspended() method to tell whether the ISuspendables are suspended. 36 37 *******************************************************************************/ 38 39 abstract public class ISuspendableThrottler 40 { 41 import ocean.io.model.ISuspendable; 42 43 import ocean.core.array.Mutation : moveToEnd; 44 import ocean.core.array.Search : contains; 45 46 /*************************************************************************** 47 48 List of suspendables which are to be throttled. Suspendables are added 49 to the list with the addSuspendable() method, and can be cleared by clear(). 50 51 ***************************************************************************/ 52 53 private ISuspendable[] suspendables; 54 55 56 /*************************************************************************** 57 58 Flag set to true when the suspendables are suspended. 59 60 ***************************************************************************/ 61 62 private bool suspended_; 63 64 65 /*************************************************************************** 66 67 Adds a suspendable to the list of suspendables which are to be 68 throttled if it's not already in there. 69 Also ensures that the state of the added suspendable is consistent 70 with the state of the throttler. 71 72 Params: 73 s = suspendable 74 75 ***************************************************************************/ 76 77 public void addSuspendable ( ISuspendable s ) 78 { 79 if (!this.suspendables.contains(s)) 80 { 81 this.suspendables ~= s; 82 } 83 84 if (this.suspended_ != s.suspended()) 85 { 86 if (this.suspended_) 87 s.suspend(); 88 else 89 s.resume(); 90 } 91 } 92 93 unittest 94 { 95 import ocean.core.Test; 96 97 static class SuspendableThrottler : ISuspendableThrottler 98 { 99 override protected bool suspend ( ) 100 { 101 return false; 102 } 103 104 override protected bool resume ( ) 105 { 106 return false; 107 } 108 } 109 110 static class Suspendable : ISuspendable 111 { 112 bool suspended_ = false; 113 114 public void suspend ( ) 115 { 116 this.suspended_ = true; 117 } 118 119 public void resume ( ) 120 { 121 this.suspended_ = false; 122 } 123 124 public bool suspended ( ) 125 { 126 return this.suspended_; 127 } 128 } 129 130 auto suspendable_throttler = new SuspendableThrottler; 131 auto suspendable = new Suspendable; 132 133 // add a suspended ISuspendable to a non suspended throttler 134 suspendable.suspend(); 135 suspendable_throttler.addSuspendable(suspendable); 136 137 test!("==")(suspendable.suspended(), false, 138 "Suspended suspendable not resumed."); 139 140 suspendable_throttler.suspendAll(); 141 142 // add a resumed ISuspendable to a suspended throttler 143 suspendable.resume(); 144 suspendable_throttler.addSuspendable(suspendable); 145 146 test!("==")(suspendable.suspended(), true, 147 "Resumed suspendable not suspended."); 148 } 149 150 /*************************************************************************** 151 152 Removes a suspendable from the list of suspendables if it exists. 153 154 Params: 155 s = suspendable 156 157 ***************************************************************************/ 158 159 public void removeSuspendable ( ISuspendable s ) 160 { 161 this.suspendables = 162 this.suspendables[0 .. this.suspendables.moveToEnd(s)]; 163 assumeSafeAppend(suspendables); 164 } 165 166 unittest 167 { 168 class Throttler : ISuspendableThrottler 169 { 170 override protected bool suspend ( ) 171 { 172 return false; 173 } 174 175 override protected bool resume ( ) 176 { 177 return true; 178 } 179 } 180 181 class Suspendable : ISuspendable 182 { 183 public void suspend ( ) { } 184 public void resume ( ) { } 185 public bool suspended ( ) 186 { 187 return false; 188 } 189 190 } 191 192 auto throttler = new Throttler; 193 auto suspendable = new Suspendable; 194 195 throttler.addSuspendable(suspendable); 196 throttler.removeSuspendable(suspendable); 197 throttler.addSuspendable(suspendable); 198 } 199 200 /*************************************************************************** 201 202 Returns: 203 true if the suspendables are currently suspended. 204 205 ***************************************************************************/ 206 207 public bool suspended ( ) 208 { 209 return this.suspended_; 210 } 211 212 213 /*************************************************************************** 214 215 Clears the list of suspendables. 216 217 ***************************************************************************/ 218 219 public void clear ( ) 220 { 221 this.suspendables.length = 0; 222 assumeSafeAppend(this.suspendables); 223 this.suspended_ = false; 224 } 225 226 /*************************************************************************** 227 228 Checks if the suspend limit has been reached and the suspendables need 229 to be suspended. 230 231 ***************************************************************************/ 232 233 public void throttledSuspend ( ) 234 { 235 if (!this.suspended_ && this.suspend()) 236 this.suspendAll(); 237 } 238 239 /*************************************************************************** 240 241 Checks if resume limit has been reached and the suspendables need to be 242 resumed. 243 244 ***************************************************************************/ 245 246 public void throttledResume ( ) 247 { 248 if (this.suspended_ && this.resume()) 249 this.resumeAll(); 250 } 251 252 /*************************************************************************** 253 254 Decides whether the suspendables should be suspended. Called by 255 throttledSuspend() when not suspended. 256 257 Returns: 258 true if the suspendables should be suspeneded 259 260 ***************************************************************************/ 261 262 abstract protected bool suspend ( ); 263 264 265 /*************************************************************************** 266 267 Decides whether the suspendables should be resumed. Called by 268 throttledResume() when suspended. 269 270 Returns: 271 true if the suspendables should be resumed 272 273 ***************************************************************************/ 274 275 abstract protected bool resume ( ); 276 277 278 /*************************************************************************** 279 280 Resumes all suspendables and sets the suspended_ flag to false. 281 282 Note that the suspended_ flag is set before resuming the suspendables 283 in order to avoid a race condition when the resumption of a suspendable 284 performs actions which would cause the throttle() method to be 285 called again. 286 287 ***************************************************************************/ 288 289 private void resumeAll ( ) 290 { 291 this.suspended_ = false; 292 foreach ( s; this.suspendables ) 293 { 294 s.resume(); 295 } 296 } 297 298 299 /*************************************************************************** 300 301 Suspends all suspendables and sets the suspended_ flag to true. 302 303 Note that the suspended_ flag is set before suspending the suspendables 304 in order to avoid a race condition when the suspending of a suspendable 305 performs actions which would cause the throttle() method to be 306 called again. 307 308 ***************************************************************************/ 309 310 private void suspendAll ( ) 311 { 312 this.suspended_ = true; 313 foreach ( s; this.suspendables ) 314 { 315 s.suspend(); 316 } 317 } 318 }