1 /******************************************************************************* 2 3 Extends reusable fiber pool with task queue. Used in task scheduler 4 as "default" way to schedule tasks. 5 6 Copyright: 7 Copyright (c) 2017 dunnhumby Germany GmbH. All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module ocean.task.internal.FiberPoolWithQueue; 17 18 import core.thread : Fiber; 19 20 import ocean.meta.types.Qualifiers; 21 import ocean.core.Enforce; 22 import ocean.task.IScheduler; 23 import ocean.task.internal.FiberPool; 24 import ocean.task.Task; 25 import ocean.util.container.queue.FixedRingQueue; 26 27 debug (TaskScheduler) 28 import ocean.io.Stdout; 29 30 /// Ditto 31 public class FiberPoolWithQueue : FiberPool 32 { 33 /// Queue of tasks awaiting idle worker fiber to be executed in 34 /* package(ocean.task) */ 35 public FixedRingQueue!(Task) queued_tasks; 36 37 /// Thrown when all fibers are busy, task queue is full and no custom 38 /// delegate to handle queue overflow is supplied. 39 /* package(ocean.task) */ 40 public TaskQueueFullException queue_full_e; 41 42 /*************************************************************************** 43 44 Called each time task is attempted to be queue but size limit is 45 reached. Both the queue and the task will be supplied as arguments. 46 47 ***************************************************************************/ 48 49 public TaskQueueFullCB task_queue_full_cb; 50 51 /************************************************************************** 52 53 Constructor 54 55 Params: 56 queue_limit = max size of task queue, recommened to be at least as 57 large as pool limit 58 stack_size = fiber stack size to use in this pool 59 limit = limit to pool size. If set to 0 (default), there is no 60 app limit and pool growth will be limited only by OS 61 resources 62 63 **************************************************************************/ 64 65 public this ( size_t queue_limit, size_t stack_size, size_t limit ) 66 { 67 super(stack_size, limit); 68 this.queued_tasks = new FixedRingQueue!(Task)(queue_limit); 69 this.queue_full_e = new TaskQueueFullException; 70 } 71 72 /*************************************************************************** 73 74 Method used to queue the task for later execution. 75 76 Will always put the task into the queue, even if there are idle worker 77 fibers. This method is mostly useful when implementing advanced library 78 facilities to ensure that no immediate execution takes place. 79 80 Will result in starting the task in the next event loop cycle at the 81 earliest. 82 83 Params: 84 task = derivative from `ocean.task.Task` defining some application 85 task to execute 86 87 Throws: 88 TaskQueueFullException if task queue is at full capacity AND 89 if no custom `task_queue_full_cb` is set. 90 91 ***************************************************************************/ 92 93 public void queue ( Task task ) 94 { 95 if (!this.queued_tasks.push(task)) 96 { 97 debug_trace("trying to queue a task while task queue is full"); 98 99 if (this.task_queue_full_cb !is null) 100 this.task_queue_full_cb(task, this.queued_tasks); 101 else 102 enforce(this.queue_full_e, false); 103 } 104 else 105 { 106 debug_trace( 107 "task '{}' queued for delayed execution", 108 cast(void*) task 109 ); 110 } 111 } 112 113 /*************************************************************************** 114 115 Method used to execute a task. 116 117 If there are idle worker fibers, the task will be executed immediately 118 and this method will only return when that task first calls `suspend`. 119 120 If all workers are busy, the task will be added to the queue and this 121 method will return immediately. 122 123 Params: 124 task = derivative from `ocean.task.Task` defining some application 125 task to execute 126 127 Throws: 128 TaskQueueFullException if task queue is at full capacity AND 129 if no custom `task_queue_full_cb` is set. 130 131 ***************************************************************************/ 132 133 public void runOrQueue ( Task task ) 134 { 135 if (this.num_busy() >= this.limit()) 136 { 137 this.queue(task); 138 return; 139 } 140 141 auto fiber = this.get(); 142 debug_trace("running task <{}> via worker fiber <{}>", 143 cast(void*) task, cast(void*) fiber); 144 // `Task.entryPoint` is supposed to be entry method for worker fiber 145 // but it does not allow to reuse worker fiber immediately for new tasks 146 // waititing in the queue. Because of that a custom method is used as 147 // entry point instead which will internally call `Task.entryPoint` 148 // directly: 149 task.assignTo(fiber, &this.workerFiberMethod); 150 task.resume(); 151 } 152 153 /*************************************************************************** 154 155 Set in runOrQueue() as a "real" fiber entry method when a task is 156 assigned to a worker fiber. 157 158 Takes care of: 159 - recycling both task and worker fiber after main task method finishes 160 - reusing current worker fiber to run new scheduled tasks if there are 161 any 162 163 ***************************************************************************/ 164 165 private void workerFiberMethod ( ) 166 { 167 auto fiber = cast(WorkerFiber) Fiber.getThis(); 168 enforce(fiber !is null); 169 auto task = fiber.activeTask(); 170 enforce(task !is null); 171 172 void runTask ( ) 173 { 174 bool had_exception = task.entryPoint(); 175 176 // in case task was resumed after unhandled exception, delay further 177 // execution for one cycle to avoid situation where exception handler 178 // calls `Task.continueAfterThrow()` and that throws again 179 if (had_exception) 180 theScheduler.processEvents(); 181 182 // makes impossible to use the task by an accident in the period 183 // between finishing it here and getting it started anew after 184 // recycling 185 task.fiber = null; 186 } 187 188 runTask(); 189 190 while (this.queued_tasks.pop(task)) 191 { 192 // there are some scheduled tasks in the queue. it is best for 193 // latency and performance to start one of those immediately in 194 // current fiber instead of going through recycle+get again 195 debug_trace("Reusing worker fiber <{}> to run scheduled task <{}>", 196 cast(void*) fiber, cast(void*) task); 197 198 task.assignTo(fiber); 199 runTask(); 200 } 201 202 // there are no scheduled tasks right now, can simply recycle 203 // worker fiber for future usage 204 debug_trace("Recycling worker fiber <{}>", cast(void*) fiber); 205 this.recycle(fiber); 206 } 207 } 208 209 /****************************************************************************** 210 211 See `FiberPoolWithQueue.task_queue_full_cb` 212 213 ******************************************************************************/ 214 215 public alias void delegate(Task, FixedRingQueue!(Task)) TaskQueueFullCB; 216 217 218 private void debug_trace ( T... ) ( cstring format, T args ) 219 { 220 debug ( TaskScheduler ) 221 { 222 Stdout.formatln( "[ocean.task.internal.FiberPoolWithQueue] " 223 ~ format, args ).flush(); 224 } 225 }