1 /*******************************************************************************
2 3 Abstract base classes for suspendable throttlers.
4 5 Provides a simple mechanism for throttling a set of one or more suspendable
6 processes based on some condition (as defined by a derived class).
7 8 Copyright:
9 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
10 All rights reserved.
11 12 License:
13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
14 Alternatively, this file may be distributed under the terms of the Tango
15 3-Clause BSD License (see LICENSE_BSD.txt for details).
16 17 *******************************************************************************/18 19 moduleocean.io.model.ISuspendableThrottler;
20 21 importocean.meta.types.Qualifiers;
22 23 24 /*******************************************************************************
25 26 Abstract base class for suspendable throttlers.
27 28 Provides the following functionality:
29 * Maintains a set of ISuspendables which are suspended / resumed
30 together.
31 * A throttle() method, to be called when the suspension state should be
32 updated / reassessed.
33 * Abstract suspend() and resume() methods which define the conditions
34 for suspension and resumption of the set of ISuspendables.
35 * A suspended() method to tell whether the ISuspendables are suspended.
36 37 *******************************************************************************/38 39 abstractpublicclassISuspendableThrottler40 {
41 importocean.io.model.ISuspendable;
42 43 importocean.core.array.Mutation : moveToEnd;
44 importocean.core.array.Search : contains;
45 46 /***************************************************************************
47 48 List of suspendables which are to be throttled. Suspendables are added
49 to the list with the addSuspendable() method, and can be cleared by clear().
50 51 ***************************************************************************/52 53 privateISuspendable[] suspendables;
54 55 56 /***************************************************************************
57 58 Flag set to true when the suspendables are suspended.
59 60 ***************************************************************************/61 62 privateboolsuspended_;
63 64 65 /***************************************************************************
66 67 Adds a suspendable to the list of suspendables which are to be
68 throttled if it's not already in there.
69 Also ensures that the state of the added suspendable is consistent
70 with the state of the throttler.
71 72 Params:
73 s = suspendable
74 75 ***************************************************************************/76 77 publicvoidaddSuspendable ( ISuspendables )
78 {
79 if (!this.suspendables.contains(s))
80 {
81 this.suspendables ~= s;
82 }
83 84 if (this.suspended_ != s.suspended())
85 {
86 if (this.suspended_)
87 s.suspend();
88 else89 s.resume();
90 }
91 }
92 93 unittest94 {
95 importocean.core.Test;
96 97 staticclassSuspendableThrottler : ISuspendableThrottler98 {
99 overrideprotectedboolsuspend ( )
100 {
101 returnfalse;
102 }
103 104 overrideprotectedboolresume ( )
105 {
106 returnfalse;
107 }
108 }
109 110 staticclassSuspendable : ISuspendable111 {
112 boolsuspended_ = false;
113 114 publicvoidsuspend ( )
115 {
116 this.suspended_ = true;
117 }
118 119 publicvoidresume ( )
120 {
121 this.suspended_ = false;
122 }
123 124 publicboolsuspended ( )
125 {
126 returnthis.suspended_;
127 }
128 }
129 130 autosuspendable_throttler = newSuspendableThrottler;
131 autosuspendable = newSuspendable;
132 133 // add a suspended ISuspendable to a non suspended throttler134 suspendable.suspend();
135 suspendable_throttler.addSuspendable(suspendable);
136 137 test!("==")(suspendable.suspended(), false,
138 "Suspended suspendable not resumed.");
139 140 suspendable_throttler.suspendAll();
141 142 // add a resumed ISuspendable to a suspended throttler143 suspendable.resume();
144 suspendable_throttler.addSuspendable(suspendable);
145 146 test!("==")(suspendable.suspended(), true,
147 "Resumed suspendable not suspended.");
148 }
149 150 /***************************************************************************
151 152 Removes a suspendable from the list of suspendables if it exists.
153 154 Params:
155 s = suspendable
156 157 ***************************************************************************/158 159 publicvoidremoveSuspendable ( ISuspendables )
160 {
161 this.suspendables =
162 this.suspendables[0 .. this.suspendables.moveToEnd(s)];
163 assumeSafeAppend(suspendables);
164 }
165 166 unittest167 {
168 classThrottler : ISuspendableThrottler169 {
170 overrideprotectedboolsuspend ( )
171 {
172 returnfalse;
173 }
174 175 overrideprotectedboolresume ( )
176 {
177 returntrue;
178 }
179 }
180 181 classSuspendable : ISuspendable182 {
183 publicvoidsuspend ( ) { }
184 publicvoidresume ( ) { }
185 publicboolsuspended ( )
186 {
187 returnfalse;
188 }
189 190 }
191 192 autothrottler = newThrottler;
193 autosuspendable = newSuspendable;
194 195 throttler.addSuspendable(suspendable);
196 throttler.removeSuspendable(suspendable);
197 throttler.addSuspendable(suspendable);
198 }
199 200 /***************************************************************************
201 202 Returns:
203 true if the suspendables are currently suspended.
204 205 ***************************************************************************/206 207 publicboolsuspended ( )
208 {
209 returnthis.suspended_;
210 }
211 212 213 /***************************************************************************
214 215 Clears the list of suspendables.
216 217 ***************************************************************************/218 219 publicvoidclear ( )
220 {
221 this.suspendables.length = 0;
222 assumeSafeAppend(this.suspendables);
223 this.suspended_ = false;
224 }
225 226 /***************************************************************************
227 228 Checks if the suspend limit has been reached and the suspendables need
229 to be suspended.
230 231 ***************************************************************************/232 233 publicvoidthrottledSuspend ( )
234 {
235 if (!this.suspended_ && this.suspend())
236 this.suspendAll();
237 }
238 239 /***************************************************************************
240 241 Checks if resume limit has been reached and the suspendables need to be
242 resumed.
243 244 ***************************************************************************/245 246 publicvoidthrottledResume ( )
247 {
248 if (this.suspended_ && this.resume())
249 this.resumeAll();
250 }
251 252 /***************************************************************************
253 254 Decides whether the suspendables should be suspended. Called by
255 throttledSuspend() when not suspended.
256 257 Returns:
258 true if the suspendables should be suspeneded
259 260 ***************************************************************************/261 262 abstractprotectedboolsuspend ( );
263 264 265 /***************************************************************************
266 267 Decides whether the suspendables should be resumed. Called by
268 throttledResume() when suspended.
269 270 Returns:
271 true if the suspendables should be resumed
272 273 ***************************************************************************/274 275 abstractprotectedboolresume ( );
276 277 278 /***************************************************************************
279 280 Resumes all suspendables and sets the suspended_ flag to false.
281 282 Note that the suspended_ flag is set before resuming the suspendables
283 in order to avoid a race condition when the resumption of a suspendable
284 performs actions which would cause the throttle() method to be
285 called again.
286 287 ***************************************************************************/288 289 privatevoidresumeAll ( )
290 {
291 this.suspended_ = false;
292 foreach ( s; this.suspendables )
293 {
294 s.resume();
295 }
296 }
297 298 299 /***************************************************************************
300 301 Suspends all suspendables and sets the suspended_ flag to true.
302 303 Note that the suspended_ flag is set before suspending the suspendables
304 in order to avoid a race condition when the suspending of a suspendable
305 performs actions which would cause the throttle() method to be
306 called again.
307 308 ***************************************************************************/309 310 privatevoidsuspendAll ( )
311 {
312 this.suspended_ = true;
313 foreach ( s; this.suspendables )
314 {
315 s.suspend();
316 }
317 }
318 }