1 /******************************************************************************* 2 3 Ready-to-use task pool implementation that should be used in cases where the 4 application has to spawn a large amount of the same type of task. It is 5 possible to mix many different pools as well as an arbitrary amount of 6 stand-alone tasks in the same applications - they will all use the same 7 global `ocean.task.Scheduler`, including its pool of fibers. 8 9 Usage example: 10 See the documented unittest of the `TaskPool` class 11 12 Copyright: 13 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 14 All rights reserved. 15 16 License: 17 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 18 Alternatively, this file may be distributed under the terms of the Tango 19 3-Clause BSD License (see LICENSE_BSD.txt for details). 20 21 *******************************************************************************/ 22 23 module ocean.task.TaskPool; 24 25 26 import ocean.meta.types.Qualifiers; 27 28 import ocean.task.Task; 29 import ocean.task.IScheduler; 30 31 import ocean.core.Enforce; 32 import ocean.core.Buffer; 33 import ocean.core.array.Mutation; /* : insertShift */; 34 import ocean.meta.types.Function /* : ParametersOf */; 35 import ocean.meta.traits.Aggregates /* : hasMember */; 36 import ocean.meta.AliasSeq; 37 import ocean.util.container.pool.ObjectPool; 38 39 /******************************************************************************* 40 41 Task pool which integrates with ocean.task.Scheduler. 42 43 It is assumed that tasks that are going to be used with this class will 44 require certain parameters to be passed to the task when it is started. 45 These parameters are defined by the arguments of the `copyArguments` method, 46 which the task class must implement. When a task is fetched from the pool to 47 be used, its `copyArguments` method will be called, allowing the specified 48 parameters to be passed into it. 49 50 If the derived tasks contains a method matching the signature of 51 `void deserialize ( void[] )` then the TaskPool with be created with 52 support for starting tasks with a void buffer (via the restore() method). 53 This supports serialization and deserialization of task internal state. 54 55 It is crucial for tasks to either deep copy their initial parameters or 56 ensure that those can never change during the task's lifetime, otherwise 57 very hard to debug fiber races can happen. 58 59 Params: 60 TaskT = specific task type managed by this pool 61 62 *******************************************************************************/ 63 64 class TaskPool ( TaskT : Task ) : ObjectPool!(Task) 65 { 66 static assert( 67 is (typeof(TaskT.copyArguments)), 68 "Task derivative must define copyArguments function to work with " ~ 69 " a task pool" 70 ); 71 72 /*************************************************************************** 73 74 Convenience alias for knowing task type. 75 76 ***************************************************************************/ 77 78 public alias TaskT TaskType; 79 80 81 /*************************************************************************** 82 83 Convenience method that does preparing initial arguments of reusable 84 task and starting it via theScheduler in one go. 85 86 Params: 87 args = same set of args as defined by `copyArguments` method of 88 user-supplied task class, will be forwarded to it. 89 90 Returns: 91 'false' if new task can't be started because pool limit is reached 92 for now, 'true' otherwise 93 94 ***************************************************************************/ 95 96 public bool start ( ParametersOf!(TaskT.copyArguments) args ) 97 { 98 if (this.num_busy() >= this.limit()) 99 return false; 100 101 auto task = cast(TaskT) this.get(new TaskT); 102 assert (task !is null); 103 104 task.copyArguments(args); 105 this.startImpl(task); 106 return true; 107 } 108 109 /*************************************************************************** 110 111 Common part of start implementation reused by derivatives. Split into 112 separate method to ensure that recycling hook won't be omitted. 113 114 Params: 115 task = already setup task to run and recyle 116 117 ***************************************************************************/ 118 119 protected void startImpl ( Task task ) 120 { 121 task.terminationHook(&this.taskTerminationHook); 122 theScheduler.schedule(task); 123 } 124 125 /*************************************************************************** 126 127 Suspends the current task until all running tasks in the pool have 128 finished executing. 129 130 Because of `terminationHook` implementation details, by the time caller 131 task gets resumed, there will still be one (last) non-recycled running 132 task in the pool, suspended right before actual termination. Once caller 133 task gets suspended again for any reason, that last task will be 134 recycled too. It is possible to manually call 135 `theScheduler.processEvents()` after `awaitRunningTasks()` to force 136 recycling of that last task at cost of a small additional delay in 137 resuming the caller. 138 139 Note: it is important to ensure that the current task (i.e. the one to 140 be suspended) is not itself a task from the pool. If that were allowed, 141 the current task would never get resumed, and this function would never 142 return. 143 144 Throws: 145 `Exception` if the current task is null or if the current task 146 belongs to the task pool 147 148 ***************************************************************************/ 149 150 public void awaitRunningTasks () 151 { 152 if (!this.num_busy()) 153 return; 154 155 auto current_task = Task.getThis(); 156 enforce(current_task !is null, 157 "Current task is null in TaskPool.awaitRunningTasks"); 158 159 scope tasks_iterator = this.new AllItemsIterator; 160 int count; 161 162 foreach (task; tasks_iterator) 163 { 164 enforce(!this.isSame(this.toItem(current_task), this.toItem(task)), 165 "Current task cannot be from the pool of tasks to wait upon"); 166 167 if (!this.isBusy(this.toItem(task))) 168 continue; 169 170 ++count; 171 172 task.terminationHook({ 173 --count; 174 if (count == 0) 175 theScheduler.delayedResume(current_task); 176 }); 177 } 178 179 if (count > 0) 180 current_task.suspend(); 181 } 182 183 static if (__traits(hasMember, TaskT, "deserialize")) 184 { 185 /*********************************************************************** 186 187 Starts a task in the same manner as `start` but instead calls the 188 `deserialize()` method on the derived task with arguments supported. 189 This is to support dumping and loading tasks from disk. 190 191 Params: 192 args = Arguments matching the function arguments of the 193 'deserialize()' function of the task type. 194 195 Returns: 196 'false' if new task can't be started because pool limit is reached 197 for now, 'true' otherwise 198 199 ***********************************************************************/ 200 201 public bool restore ( ParametersOf!(TaskT.deserialize) args ) 202 { 203 if (this.num_busy() >= this.limit()) 204 return false; 205 206 auto task = cast(TaskT) this.get(new TaskT); 207 assert (task !is null); 208 209 task.deserialize(args); 210 this.startImpl(task); 211 212 return true; 213 } 214 } 215 216 /*************************************************************************** 217 218 Used to recycle pool tasks when they finish 219 220 ***************************************************************************/ 221 222 private void taskTerminationHook ( ) 223 { 224 auto task = Task.getThis(); 225 this.recycle(task); 226 } 227 } 228 229 /// 230 unittest 231 { 232 void example ( ) 233 { 234 static class DummyTask : Task 235 { 236 import ocean.core.Array : copy; 237 238 // The task requires a single string, which is copied from the outside 239 // by `copyArguments()` 240 private mstring buffer; 241 242 public void copyArguments ( cstring arg ) 243 { 244 this.buffer.copy(arg); 245 } 246 247 override public void recycle ( ) 248 { 249 this.buffer.length = 0; 250 assumeSafeAppend(this.buffer); 251 } 252 253 public override void run ( ) 254 { 255 // do good stuff 256 } 257 } 258 259 auto pool = new TaskPool!(DummyTask); 260 261 // Start some tasks, passing the required parameters to the pool's `start()` 262 // method 263 pool.start("abcd"); 264 pool.start("xyz"); 265 266 theScheduler.eventLoop(); 267 } 268 }