1 /*******************************************************************************
2 
3     Implements mapping between specific task `TypeInfo` and matching
4     `FiberPoolEager` instances intended to serve all tasks of such type.
5 
6     Copyright: Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved.
7 
8     License:
9         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
10         Alternatively, this file may be distributed under the terms of the Tango
11         3-Clause BSD License (see LICENSE_BSD.txt for details).
12 
13 *******************************************************************************/
14 
15 module ocean.task.internal.SpecializedPools;
16 
17 import ocean.meta.types.Qualifiers;
18 import ocean.task.internal.FiberPoolEager;
19 import ocean.task.IScheduler;
20 import ocean.task.Task;
21 import ocean.core.Verify;
22 import ocean.core.Enforce;
23 import ocean.core.Optional;
24 
25 debug (TaskScheduler)
26     import ocean.io.Stdout;
27 
28 public alias IScheduler.Configuration.PoolDescription PoolDescription;
29 
30 /*******************************************************************************
31 
32     Defines all mappings between task kinds and dedicated worker fiber pools
33     processing those.
34 
35 *******************************************************************************/
36 
37 public class SpecializedPools
38 {
39     /// mapping as a simple array (expected to contain few elements)
40     private SpecializedPool[] mapping;
41 
42     /// mapping element
43     public static struct SpecializedPool
44     {
45         istring task_name;
46         FiberPoolEager pool;
47     }
48 
49     /***************************************************************************
50 
51         Constructor
52 
53         Params:
54             config = array of specialized descriptions coming from the scheduler
55                 configuration
56 
57     ***************************************************************************/
58 
59     public this ( PoolDescription[] config )
60     {
61         foreach (description; config)
62         {
63             verify(description.task_name.length > 0);
64             verify(
65                 description.stack_size >= 1024,
66                 "Configured stack size is suspiciously low"
67             );
68             verify(
69                 !this.findPool(description.task_name).isDefined(),
70                 "ClasInfo present in task/pool mapping twice"
71             );
72 
73             debug_trace("Registering specialized worker fiber pool for '{}'",
74                 description.task_name);
75 
76             this.mapping ~= SpecializedPool(
77                 description.task_name,
78                 new FiberPoolEager(
79                     description.stack_size
80                 )
81             );
82         }
83     }
84 
85     /***************************************************************************
86 
87         Lookup specific pool data
88 
89         Params:
90             task = task type fully qualified name to look for
91 
92         Returns:
93             `Optional.undefined` is nothing was found, wrapped pool struct
94             otherwise
95 
96     ***************************************************************************/
97 
98     public Optional!(SpecializedPool) findPool ( cstring task )
99     {
100         foreach (meta; this.mapping)
101         {
102             if (meta.task_name == task)
103                 return optional(meta);
104         }
105 
106         return Optional!(SpecializedPool).undefined;
107     }
108 
109     /***************************************************************************
110 
111         Runs task in one of dedicated worker fiber pools if it is registered in
112         the mapping.
113 
114         Params:
115             task = task to run
116 
117         Returns: 'true' if task was present in the mapping, 'false' otherwise.
118 
119     ***************************************************************************/
120 
121     public bool run ( Task task )
122     {
123         bool found = false;
124         auto name = task.classinfo.name;
125 
126         this.findPool(name).visit(
127             ( ) { },
128             (ref SpecializedPool meta) {
129                 debug_trace("Processing task <{}> in a dedicated fiber pool",
130                     cast(void*) task);
131                 meta.pool.run(task);
132                 found = true;
133             }
134         );
135 
136         return found;
137     }
138 
139     /***************************************************************************
140 
141         Kills all worker fibers in all pools
142 
143     ***************************************************************************/
144 
145     public void kill ( )
146     {
147         debug_trace("Killing all worker fibers in all specialized pools");
148 
149         foreach (meta; this.mapping)
150         {
151             scope iterator = meta.pool..new BusyItemsIterator;
152             foreach (ref fiber; iterator)
153                 fiber.kill();
154         }
155     }
156 }
157 
158 private void debug_trace ( T... ) ( cstring format, T args )
159 {
160     debug ( TaskScheduler )
161     {
162         Stdout.formatln( "[ocean.task.internal.SpecializedPools] "
163             ~ format, args ).flush();
164     }
165 }