1 /******************************************************************************* 2 3 AioScheduler that is informed when the aio operations are done and ready 4 to be continued. 5 6 Scheduler internally has two queues: queue that's accessible to main thread 7 and the queue that's accessible to all worker threads. At no point in time 8 single queue is available to both main thread and worker threads. As a 9 consequence of this, main thread never needs to acquire mutex while 10 accessing its queue, and worker threads are only synchronizing between 11 themselves while accessing the queue. 12 13 At the beginning, both queues are empty. When worker thread is finished 14 processing the request, JobNotification is put into the workers' 15 queue and notifies the main thread that there are request that it needs to 16 wake up. However, since the main thread may not immediately do this, more 17 than one request could end up in the queue. Once main thread is ready to 18 wake up the request, it will obtain the mutex and swap these two queues. 19 Now, main thread contains queue of all request ready to be woken up, and 20 the workers' queue is empty, and main thread now can wake up all the 21 request without interruptions from other threads, and worker threads can 22 schedule new request to be woken up without interfering with the main 23 thread. Once main thread finish processing its queue, and there is at least 24 one request in the workers' queue, it will swap the queues again, and the 25 same procedure will be performed again. 26 27 Copyright: 28 Copyright (c) 2018 dunnhumby Germany GmbH. 29 All rights reserved. 30 31 License: 32 Boost Software License Version 1.0. See LICENSE.txt for details. 33 34 *******************************************************************************/ 35 36 module ocean.util.aio.internal.AioScheduler; 37 38 import ocean.meta.types.Qualifiers; 39 import ocean.io.select.client.SelectEvent; 40 41 /// Ditto 42 class AioScheduler: ISelectEvent 43 { 44 import ocean.sys.ErrnoException; 45 import core.sys.posix.pthread; 46 import ocean.util.aio.JobNotification; 47 import ocean.util.aio.internal.MutexOps; 48 import ocean.util.container.LinkedList; 49 import ocean.util.aio.internal.JobQueue: Job; 50 51 /*************************************************************************** 52 53 Mutex protecting request queue 54 55 ***************************************************************************/ 56 57 private pthread_mutex_t queue_mutex; 58 59 /*************************************************************************** 60 61 See ready_queue's documentation. 62 63 ***************************************************************************/ 64 65 private LinkedList!(Job*)[2] queues; 66 67 /*************************************************************************** 68 69 Queue of requests being ready. NOTE: this is just a pointer to a queue 70 of requests that are finished processing and they will be woken up 71 in the next AioScheduler cycle. On every AioScheduler cycle, pointers 72 to `ready_queue` and `waking_queue` are swapped. During a single cycle, 73 only the worker threads are accessing ready_queue (inserting references 74 to the jobs that are finish) and only the main thread access the 75 waking_queue (popping the requests from it and resuming the fibers 76 waiting for the IO to complete). 77 78 ***************************************************************************/ 79 80 private LinkedList!(Job*)* ready_queue; 81 82 /*************************************************************************** 83 84 Queue of requests that are in process of waking up. For more details, 85 see comment for ready_queue, above. 86 87 ***************************************************************************/ 88 89 private LinkedList!(Job*)* waking_queue; 90 91 /*************************************************************************** 92 93 Queue of the request whose results should be discarded. 94 95 ***************************************************************************/ 96 97 private LinkedList!(Job*) discarded_queue; 98 99 /*************************************************************************** 100 101 Constructor. 102 103 Params: 104 exception = exception to throw in case of error 105 106 ***************************************************************************/ 107 108 public this (ErrnoException exception) 109 { 110 exception.enforceRetCode!(pthread_mutex_init).call( 111 &this.queue_mutex, null); 112 113 this.queues[0] = new LinkedList!(Job*); 114 this.queues[1] = new LinkedList!(Job*); 115 this.discarded_queue = new LinkedList!(Job*); 116 this.ready_queue = &this.queues[0]; 117 this.waking_queue = &this.queues[1]; 118 } 119 120 /*************************************************************************** 121 122 Mark the request ready to be woken up by scheduler 123 124 Params: 125 job = job that has completed, that needs to be finalized and whose 126 suspended request should be resumed 127 lock_mutex = function or delegate to lock the mutex 128 unlock_mutex = function or delegate to unlock the mutex 129 130 ***************************************************************************/ 131 132 public void requestReady (MutexOp)(Job* req, 133 MutexOp lock_mutex, MutexOp unlock_mutex) 134 { 135 lock_mutex(&this.queue_mutex); 136 scope (exit) 137 { 138 unlock_mutex(&this.queue_mutex); 139 } 140 141 // Check if the results of this request was marked as not needed 142 if (!this.discarded_queue.remove(req)) 143 { 144 this.ready_queue.append(req); 145 this.trigger(); 146 } 147 else 148 { 149 req.recycle(); 150 } 151 } 152 153 /*************************************************************************** 154 155 Discards the results of the given AIO operation. 156 157 Params: 158 req = JobNotification instance that was waiting for results 159 160 ***************************************************************************/ 161 162 public void discardResults (Job* req) 163 { 164 lock_mutex(&this.queue_mutex); 165 scope (exit) 166 { 167 unlock_mutex(&this.queue_mutex); 168 } 169 170 // Since we're guarded by lock above, two scenarios might happen: 171 // 1) The given ThreadWorker already submitted the results for this 172 // operation, which we will simply remove from the ready queue. 173 // 2) No worker thread was assigned for this request, or the operation 174 // was not yet completed. In this case tell the AioScheduler to discard 175 // these results as soon as they arrive 176 if (!this.ready_queue.remove(req)) 177 { 178 this.discarded_queue.append(req); 179 } 180 else 181 { 182 req.recycle(); 183 } 184 } 185 186 187 /*************************************************************************** 188 189 Called from handle() when the event fires. 190 191 Params: 192 n = number of the times this event has been triggered. 193 194 ***************************************************************************/ 195 196 protected override bool handle_ ( ulong n ) 197 { 198 this.swapQueues(); 199 200 foreach (job; *this.waking_queue) 201 { 202 auto req = job.suspended_job; 203 assert (req); 204 205 job.finished(); 206 req.wake(); 207 job.recycle(); 208 } 209 210 this.waking_queue.clear(); 211 212 return true; 213 } 214 215 /*************************************************************************** 216 217 Swaps the waking and ready queue. 218 219 ***************************************************************************/ 220 221 private void swapQueues () 222 { 223 lock_mutex(&this.queue_mutex); 224 scope (exit) 225 { 226 unlock_mutex(&this.queue_mutex); 227 } 228 229 assert (this.waking_queue.size() == 0); 230 231 auto tmp = this.waking_queue; 232 this.waking_queue = this.ready_queue; 233 this.ready_queue = tmp; 234 } 235 }