1 /*******************************************************************************
2 
3     Ready-to-use task pool implementation that should be used in cases where the
4     application has to spawn a large amount of the same type of task. It is
5     possible to mix many different pools as well as an arbitrary amount of
6     stand-alone tasks in the same applications - they will all use the same
7     global `ocean.task.Scheduler`, including its pool of fibers.
8 
9     Usage example:
10         See the documented unittest of the `TaskPool` class
11 
12     Copyright:
13         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
14         All rights reserved.
15 
16     License:
17         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
18         Alternatively, this file may be distributed under the terms of the Tango
19         3-Clause BSD License (see LICENSE_BSD.txt for details).
20 
21 *******************************************************************************/
22 
23 module ocean.task.TaskPool;
24 
25 
26 import ocean.meta.types.Qualifiers;
27 
28 import ocean.task.Task;
29 import ocean.task.IScheduler;
30 
31 import ocean.core.Enforce;
32 import ocean.core.Buffer;
33 import ocean.core.array.Mutation; /* : insertShift */;
34 import ocean.meta.types.Function /* : ParametersOf */;
35 import ocean.meta.traits.Aggregates /* : hasMember */;
36 import ocean.meta.AliasSeq;
37 import ocean.util.container.pool.ObjectPool;
38 
39 /*******************************************************************************
40 
41     Task pool which integrates with ocean.task.Scheduler.
42 
43     It is assumed that tasks that are going to be used with this class will
44     require certain parameters to be passed to the task when it is started.
45     These parameters are defined by the arguments of the `copyArguments` method,
46     which the task class must implement. When a task is fetched from the pool to
47     be used, its `copyArguments` method will be called, allowing the specified
48     parameters to be passed into it.
49 
50     If the derived tasks contains a method matching the signature of
51     `void deserialize ( void[] )` then the TaskPool with be created with
52     support for starting tasks with a void buffer (via the restore() method).
53     This supports serialization and deserialization of task internal state.
54 
55     It is crucial for tasks to either deep copy their initial parameters or
56     ensure that those can never change during the task's lifetime, otherwise
57     very hard to debug fiber races can happen.
58 
59     Params:
60         TaskT = specific task type managed by this pool
61 
62 *******************************************************************************/
63 
64 class TaskPool ( TaskT : Task ) : ObjectPool!(Task)
65 {
66     static assert(
67         is (typeof(TaskT.copyArguments)),
68         "Task derivative must define copyArguments function to work with " ~
69             " a task pool"
70     );
71 
72     /***************************************************************************
73 
74         Convenience alias for knowing task type.
75 
76     ***************************************************************************/
77 
78     public alias TaskT TaskType;
79 
80 
81     /***************************************************************************
82 
83         Convenience method that does preparing initial arguments of reusable
84         task and starting it via theScheduler in one go.
85 
86         Params:
87             args = same set of args as defined by `copyArguments` method of
88                 user-supplied task class, will be forwarded to it.
89 
90         Returns:
91             'false' if new task can't be started because pool limit is reached
92             for now, 'true' otherwise
93 
94     ***************************************************************************/
95 
96     public bool start ( ParametersOf!(TaskT.copyArguments) args )
97     {
98         if (this.num_busy() >= this.limit())
99             return false;
100 
101         auto task = cast(TaskT) this.get(new TaskT);
102         assert (task !is null);
103 
104         task.copyArguments(args);
105         this.startImpl(task);
106         return true;
107     }
108 
109     /***************************************************************************
110 
111         Common part of start implementation reused by derivatives. Split into
112         separate method to ensure that recycling hook won't be omitted.
113 
114         Params:
115             task = already setup task to run and recyle
116 
117     ***************************************************************************/
118 
119     protected void startImpl ( Task task )
120     {
121         task.terminationHook(&this.taskTerminationHook);
122         theScheduler.schedule(task);
123     }
124 
125     /***************************************************************************
126 
127         Suspends the current task until all running tasks in the pool have
128         finished executing.
129 
130         Because of `terminationHook` implementation details, by the time caller
131         task gets resumed, there will still be one (last) non-recycled running
132         task in the pool, suspended right before actual termination. Once caller
133         task gets suspended again for any reason, that last task will be
134         recycled too. It is possible to manually call
135         `theScheduler.processEvents()` after `awaitRunningTasks()` to force
136         recycling of that last task at cost of a small additional delay in
137         resuming the caller.
138 
139         Note: it is important to ensure that the current task (i.e. the one to
140         be suspended) is not itself a task from the pool. If that were allowed,
141         the current task would never get resumed, and this function would never
142         return.
143 
144         Throws:
145             `Exception` if the current task is null or if the current task
146             belongs to the task pool
147 
148     ***************************************************************************/
149 
150     public void awaitRunningTasks ()
151     {
152         if (!this.num_busy())
153             return;
154 
155         auto current_task = Task.getThis();
156         enforce(current_task !is null,
157             "Current task is null in TaskPool.awaitRunningTasks");
158 
159         scope tasks_iterator = this.new AllItemsIterator;
160         int count;
161 
162         foreach (task; tasks_iterator)
163         {
164             enforce(!this.isSame(this.toItem(current_task), this.toItem(task)),
165                 "Current task cannot be from the pool of tasks to wait upon");
166 
167             if (!this.isBusy(this.toItem(task)))
168                 continue;
169 
170             ++count;
171 
172             task.terminationHook({
173                 --count;
174                 if (count == 0)
175                     theScheduler.delayedResume(current_task);
176             });
177         }
178 
179         if (count > 0)
180             current_task.suspend();
181     }
182 
183     static if (__traits(hasMember, TaskT, "deserialize"))
184     {
185         /***********************************************************************
186 
187             Starts a task in the same manner as `start` but instead calls the
188             `deserialize()` method on the derived task with arguments supported.
189             This is to support dumping and loading tasks from disk.
190 
191             Params:
192                 args = Arguments matching the function arguments of the
193                        'deserialize()' function of the task type.
194 
195             Returns:
196                 'false' if new task can't be started because pool limit is reached
197                 for now, 'true' otherwise
198 
199         ***********************************************************************/
200 
201         public bool restore  ( ParametersOf!(TaskT.deserialize) args )
202         {
203             if (this.num_busy() >= this.limit())
204                 return false;
205 
206             auto task = cast(TaskT) this.get(new TaskT);
207             assert (task !is null);
208 
209             task.deserialize(args);
210             this.startImpl(task);
211 
212             return true;
213         }
214     }
215 
216     /***************************************************************************
217 
218         Used to recycle pool tasks when they finish
219 
220     ***************************************************************************/
221 
222     private void taskTerminationHook ( )
223     {
224         auto task = Task.getThis();
225         this.recycle(task);
226     }
227 }
228 
229 ///
230 unittest
231 {
232     void example ( )
233     {
234         static class DummyTask : Task
235         {
236             import ocean.core.Array : copy;
237 
238             // The task requires a single string, which is copied from the outside
239             // by `copyArguments()`
240             private mstring buffer;
241 
242             public void copyArguments ( cstring arg )
243             {
244                 this.buffer.copy(arg);
245             }
246 
247             override public void recycle ( )
248             {
249                 this.buffer.length = 0;
250                 assumeSafeAppend(this.buffer);
251             }
252 
253             public override void run ( )
254             {
255                 // do good stuff
256             }
257         }
258 
259         auto pool = new TaskPool!(DummyTask);
260 
261         // Start some tasks, passing the required parameters to the pool's `start()`
262         // method
263         pool.start("abcd");
264         pool.start("xyz");
265 
266         theScheduler.eventLoop();
267     }
268 }