1 /******************************************************************************* 2 3 Implements mapping between specific task `TypeInfo` and matching 4 `FiberPoolEager` instances intended to serve all tasks of such type. 5 6 Copyright: Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved. 7 8 License: 9 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 10 Alternatively, this file may be distributed under the terms of the Tango 11 3-Clause BSD License (see LICENSE_BSD.txt for details). 12 13 *******************************************************************************/ 14 15 module ocean.task.internal.SpecializedPools; 16 17 import ocean.meta.types.Qualifiers; 18 import ocean.task.internal.FiberPoolEager; 19 import ocean.task.IScheduler; 20 import ocean.task.Task; 21 import ocean.core.Verify; 22 import ocean.core.Enforce; 23 import ocean.core.Optional; 24 25 debug (TaskScheduler) 26 import ocean.io.Stdout; 27 28 public alias IScheduler.Configuration.PoolDescription PoolDescription; 29 30 /******************************************************************************* 31 32 Defines all mappings between task kinds and dedicated worker fiber pools 33 processing those. 34 35 *******************************************************************************/ 36 37 public class SpecializedPools 38 { 39 /// mapping as a simple array (expected to contain few elements) 40 private SpecializedPool[] mapping; 41 42 /// mapping element 43 public static struct SpecializedPool 44 { 45 istring task_name; 46 FiberPoolEager pool; 47 } 48 49 /*************************************************************************** 50 51 Constructor 52 53 Params: 54 config = array of specialized descriptions coming from the scheduler 55 configuration 56 57 ***************************************************************************/ 58 59 public this ( PoolDescription[] config ) 60 { 61 foreach (description; config) 62 { 63 verify(description.task_name.length > 0); 64 verify( 65 description.stack_size >= 1024, 66 "Configured stack size is suspiciously low" 67 ); 68 verify( 69 !this.findPool(description.task_name).isDefined(), 70 "ClasInfo present in task/pool mapping twice" 71 ); 72 73 debug_trace("Registering specialized worker fiber pool for '{}'", 74 description.task_name); 75 76 this.mapping ~= SpecializedPool( 77 description.task_name, 78 new FiberPoolEager( 79 description.stack_size 80 ) 81 ); 82 } 83 } 84 85 /*************************************************************************** 86 87 Lookup specific pool data 88 89 Params: 90 task = task type fully qualified name to look for 91 92 Returns: 93 `Optional.undefined` is nothing was found, wrapped pool struct 94 otherwise 95 96 ***************************************************************************/ 97 98 public Optional!(SpecializedPool) findPool ( cstring task ) 99 { 100 foreach (meta; this.mapping) 101 { 102 if (meta.task_name == task) 103 return optional(meta); 104 } 105 106 return Optional!(SpecializedPool).undefined; 107 } 108 109 /*************************************************************************** 110 111 Runs task in one of dedicated worker fiber pools if it is registered in 112 the mapping. 113 114 Params: 115 task = task to run 116 117 Returns: 'true' if task was present in the mapping, 'false' otherwise. 118 119 ***************************************************************************/ 120 121 public bool run ( Task task ) 122 { 123 bool found = false; 124 auto name = task.classinfo.name; 125 126 this.findPool(name).visit( 127 ( ) { }, 128 (ref SpecializedPool meta) { 129 debug_trace("Processing task <{}> in a dedicated fiber pool", 130 cast(void*) task); 131 meta.pool.run(task); 132 found = true; 133 } 134 ); 135 136 return found; 137 } 138 139 /*************************************************************************** 140 141 Kills all worker fibers in all pools 142 143 ***************************************************************************/ 144 145 public void kill ( ) 146 { 147 debug_trace("Killing all worker fibers in all specialized pools"); 148 149 foreach (meta; this.mapping) 150 { 151 scope iterator = meta.pool..new BusyItemsIterator; 152 foreach (ref fiber; iterator) 153 fiber.kill(); 154 } 155 } 156 } 157 158 private void debug_trace ( T... ) ( cstring format, T args ) 159 { 160 debug ( TaskScheduler ) 161 { 162 Stdout.formatln( "[ocean.task.internal.SpecializedPools] " 163 ~ format, args ).flush(); 164 } 165 }