1 /*******************************************************************************
2 3 AioScheduler that is informed when the aio operations are done and ready
4 to be continued.
5 6 Scheduler internally has two queues: queue that's accessible to main thread
7 and the queue that's accessible to all worker threads. At no point in time
8 single queue is available to both main thread and worker threads. As a
9 consequence of this, main thread never needs to acquire mutex while
10 accessing its queue, and worker threads are only synchronizing between
11 themselves while accessing the queue.
12 13 At the beginning, both queues are empty. When worker thread is finished
14 processing the request, JobNotification is put into the workers'
15 queue and notifies the main thread that there are request that it needs to
16 wake up. However, since the main thread may not immediately do this, more
17 than one request could end up in the queue. Once main thread is ready to
18 wake up the request, it will obtain the mutex and swap these two queues.
19 Now, main thread contains queue of all request ready to be woken up, and
20 the workers' queue is empty, and main thread now can wake up all the
21 request without interruptions from other threads, and worker threads can
22 schedule new request to be woken up without interfering with the main
23 thread. Once main thread finish processing its queue, and there is at least
24 one request in the workers' queue, it will swap the queues again, and the
25 same procedure will be performed again.
26 27 Copyright:
28 Copyright (c) 2018 dunnhumby Germany GmbH.
29 All rights reserved.
30 31 License:
32 Boost Software License Version 1.0. See LICENSE.txt for details.
33 34 *******************************************************************************/35 36 moduleocean.util.aio.internal.AioScheduler;
37 38 importocean.meta.types.Qualifiers;
39 importocean.io.select.client.SelectEvent;
40 41 /// Ditto42 classAioScheduler: ISelectEvent43 {
44 importocean.sys.ErrnoException;
45 importcore.sys.posix.pthread;
46 importocean.util.aio.JobNotification;
47 importocean.util.aio.internal.MutexOps;
48 importocean.util.container.LinkedList;
49 importocean.util.aio.internal.JobQueue: Job;
50 51 /***************************************************************************
52 53 Mutex protecting request queue
54 55 ***************************************************************************/56 57 privatepthread_mutex_tqueue_mutex;
58 59 /***************************************************************************
60 61 See ready_queue's documentation.
62 63 ***************************************************************************/64 65 privateLinkedList!(Job*)[2] queues;
66 67 /***************************************************************************
68 69 Queue of requests being ready. NOTE: this is just a pointer to a queue
70 of requests that are finished processing and they will be woken up
71 in the next AioScheduler cycle. On every AioScheduler cycle, pointers
72 to `ready_queue` and `waking_queue` are swapped. During a single cycle,
73 only the worker threads are accessing ready_queue (inserting references
74 to the jobs that are finish) and only the main thread access the
75 waking_queue (popping the requests from it and resuming the fibers
76 waiting for the IO to complete).
77 78 ***************************************************************************/79 80 privateLinkedList!(Job*)* ready_queue;
81 82 /***************************************************************************
83 84 Queue of requests that are in process of waking up. For more details,
85 see comment for ready_queue, above.
86 87 ***************************************************************************/88 89 privateLinkedList!(Job*)* waking_queue;
90 91 /***************************************************************************
92 93 Queue of the request whose results should be discarded.
94 95 ***************************************************************************/96 97 privateLinkedList!(Job*) discarded_queue;
98 99 /***************************************************************************
100 101 Constructor.
102 103 Params:
104 exception = exception to throw in case of error
105 106 ***************************************************************************/107 108 publicthis (ErrnoExceptionexception)
109 {
110 exception.enforceRetCode!(pthread_mutex_init).call(
111 &this.queue_mutex, null);
112 113 this.queues[0] = newLinkedList!(Job*);
114 this.queues[1] = newLinkedList!(Job*);
115 this.discarded_queue = newLinkedList!(Job*);
116 this.ready_queue = &this.queues[0];
117 this.waking_queue = &this.queues[1];
118 }
119 120 /***************************************************************************
121 122 Mark the request ready to be woken up by scheduler
123 124 Params:
125 job = job that has completed, that needs to be finalized and whose
126 suspended request should be resumed
127 lock_mutex = function or delegate to lock the mutex
128 unlock_mutex = function or delegate to unlock the mutex
129 130 ***************************************************************************/131 132 publicvoidrequestReady (MutexOp)(Job* req,
133 MutexOplock_mutex, MutexOpunlock_mutex)
134 {
135 lock_mutex(&this.queue_mutex);
136 scope (exit)
137 {
138 unlock_mutex(&this.queue_mutex);
139 }
140 141 // Check if the results of this request was marked as not needed142 if (!this.discarded_queue.remove(req))
143 {
144 this.ready_queue.append(req);
145 this.trigger();
146 }
147 else148 {
149 req.recycle();
150 }
151 }
152 153 /***************************************************************************
154 155 Discards the results of the given AIO operation.
156 157 Params:
158 req = JobNotification instance that was waiting for results
159 160 ***************************************************************************/161 162 publicvoiddiscardResults (Job* req)
163 {
164 lock_mutex(&this.queue_mutex);
165 scope (exit)
166 {
167 unlock_mutex(&this.queue_mutex);
168 }
169 170 // Since we're guarded by lock above, two scenarios might happen:171 // 1) The given ThreadWorker already submitted the results for this172 // operation, which we will simply remove from the ready queue.173 // 2) No worker thread was assigned for this request, or the operation174 // was not yet completed. In this case tell the AioScheduler to discard175 // these results as soon as they arrive176 if (!this.ready_queue.remove(req))
177 {
178 this.discarded_queue.append(req);
179 }
180 else181 {
182 req.recycle();
183 }
184 }
185 186 187 /***************************************************************************
188 189 Called from handle() when the event fires.
190 191 Params:
192 n = number of the times this event has been triggered.
193 194 ***************************************************************************/195 196 protectedoverrideboolhandle_ ( ulongn )
197 {
198 this.swapQueues();
199 200 foreach (job; *this.waking_queue)
201 {
202 autoreq = job.suspended_job;
203 assert (req);
204 205 job.finished();
206 req.wake();
207 job.recycle();
208 }
209 210 this.waking_queue.clear();
211 212 returntrue;
213 }
214 215 /***************************************************************************
216 217 Swaps the waking and ready queue.
218 219 ***************************************************************************/220 221 privatevoidswapQueues ()
222 {
223 lock_mutex(&this.queue_mutex);
224 scope (exit)
225 {
226 unlock_mutex(&this.queue_mutex);
227 }
228 229 assert (this.waking_queue.size() == 0);
230 231 autotmp = this.waking_queue;
232 this.waking_queue = this.ready_queue;
233 this.ready_queue = tmp;
234 }
235 }