1 /*******************************************************************************
2 
3     AioScheduler that is informed when the aio operations are done and ready
4     to be continued.
5 
6     Scheduler internally has two queues: queue that's accessible to main thread
7     and the queue that's accessible to all worker threads. At no point in time
8     single queue is available to both main thread and worker threads. As a
9     consequence of this, main thread never needs to acquire mutex while
10     accessing its queue, and worker threads are only synchronizing between
11     themselves while accessing the queue.
12 
13     At the beginning, both queues are empty. When worker thread is finished
14     processing the request, JobNotification is put into the workers'
15     queue and notifies the main thread that there are request that it needs to
16     wake up. However, since the main thread may not immediately do this, more
17     than one request could end up in the queue. Once main thread is ready to
18     wake up the request, it will obtain the mutex and swap these two queues.
19     Now, main thread contains queue of all request ready to be woken up, and
20     the workers' queue is empty, and main thread now can wake up all the
21     request without interruptions from other threads, and worker threads can
22     schedule new request to be woken up without interfering with the main
23     thread. Once main thread finish processing its queue, and there is at least
24     one request in the workers' queue, it will swap the queues again, and the
25     same procedure will be performed again.
26 
27     Copyright:
28         Copyright (c) 2018 dunnhumby Germany GmbH.
29         All rights reserved.
30 
31     License:
32         Boost Software License Version 1.0. See LICENSE.txt for details.
33 
34 *******************************************************************************/
35 
36 module ocean.util.aio.internal.AioScheduler;
37 
38 import ocean.meta.types.Qualifiers;
39 import ocean.io.select.client.SelectEvent;
40 
41 /// Ditto
42 class AioScheduler: ISelectEvent
43 {
44     import ocean.sys.ErrnoException;
45     import core.sys.posix.pthread;
46     import ocean.util.aio.JobNotification;
47     import ocean.util.aio.internal.MutexOps;
48     import ocean.util.container.LinkedList;
49     import ocean.util.aio.internal.JobQueue: Job;
50 
51     /***************************************************************************
52 
53         Mutex protecting request queue
54 
55     ***************************************************************************/
56 
57     private pthread_mutex_t queue_mutex;
58 
59     /***************************************************************************
60 
61         See ready_queue's documentation.
62 
63     ***************************************************************************/
64 
65     private LinkedList!(Job*)[2] queues;
66 
67     /***************************************************************************
68 
69         Queue of requests being ready. NOTE: this is just a pointer to a queue
70         of requests that are finished processing and they will be woken up
71         in the next AioScheduler cycle. On every AioScheduler cycle, pointers
72         to `ready_queue` and `waking_queue` are swapped. During a single cycle,
73         only the worker threads are accessing ready_queue (inserting references
74         to the jobs that are finish) and only the main thread access the
75         waking_queue (popping the requests from it and resuming the fibers
76         waiting for the IO to complete).
77 
78     ***************************************************************************/
79 
80     private LinkedList!(Job*)* ready_queue;
81 
82     /***************************************************************************
83 
84         Queue of requests that are in process of waking up. For more details,
85         see comment for ready_queue, above.
86 
87     ***************************************************************************/
88 
89     private LinkedList!(Job*)* waking_queue;
90 
91     /***************************************************************************
92 
93         Queue of the request whose results should be discarded.
94 
95     ***************************************************************************/
96 
97     private LinkedList!(Job*) discarded_queue;
98 
99     /***************************************************************************
100 
101         Constructor.
102 
103         Params:
104             exception = exception to throw in case of error
105 
106     ***************************************************************************/
107 
108     public this (ErrnoException exception)
109     {
110         exception.enforceRetCode!(pthread_mutex_init).call(
111                 &this.queue_mutex, null);
112 
113         this.queues[0] = new LinkedList!(Job*);
114         this.queues[1] = new LinkedList!(Job*);
115         this.discarded_queue = new LinkedList!(Job*);
116         this.ready_queue = &this.queues[0];
117         this.waking_queue = &this.queues[1];
118     }
119 
120     /***************************************************************************
121 
122         Mark the request ready to be woken up by scheduler
123 
124         Params:
125             job = job that has completed, that needs to be finalized and whose
126                 suspended request should be resumed
127             lock_mutex = function or delegate to lock the mutex
128             unlock_mutex = function or delegate to unlock the mutex
129 
130     ***************************************************************************/
131 
132     public void requestReady (MutexOp)(Job* req,
133             MutexOp lock_mutex, MutexOp unlock_mutex)
134     {
135         lock_mutex(&this.queue_mutex);
136         scope (exit)
137         {
138             unlock_mutex(&this.queue_mutex);
139         }
140 
141         // Check if the results of this request was marked as not needed
142         if (!this.discarded_queue.remove(req))
143         {
144             this.ready_queue.append(req);
145             this.trigger();
146         }
147         else
148         {
149             req.recycle();
150         }
151     }
152 
153     /***************************************************************************
154 
155         Discards the results of the given AIO operation.
156 
157         Params:
158             req = JobNotification instance that was waiting for results
159 
160     ***************************************************************************/
161 
162     public void discardResults (Job* req)
163     {
164         lock_mutex(&this.queue_mutex);
165         scope (exit)
166         {
167             unlock_mutex(&this.queue_mutex);
168         }
169 
170         // Since we're guarded by lock above, two scenarios might happen:
171         // 1) The given ThreadWorker already submitted the results for this
172         //    operation, which we will simply remove from the ready queue.
173         // 2) No worker thread was assigned for this request, or the operation
174         //    was not yet completed. In this case tell the AioScheduler to discard
175         //    these results as soon as they arrive
176         if (!this.ready_queue.remove(req))
177         {
178             this.discarded_queue.append(req);
179         }
180         else
181         {
182             req.recycle();
183         }
184     }
185 
186 
187     /***************************************************************************
188 
189         Called from handle() when the event fires.
190 
191         Params:
192             n = number of the times this event has been triggered.
193 
194     ***************************************************************************/
195 
196     protected override bool handle_ ( ulong n )
197     {
198         this.swapQueues();
199 
200         foreach (job; *this.waking_queue)
201         {
202             auto req = job.suspended_job;
203             assert (req);
204 
205             job.finished();
206             req.wake();
207             job.recycle();
208         }
209 
210         this.waking_queue.clear();
211 
212         return true;
213     }
214 
215     /***************************************************************************
216 
217       Swaps the waking and ready queue.
218 
219     ***************************************************************************/
220 
221     private void swapQueues ()
222     {
223         lock_mutex(&this.queue_mutex);
224         scope (exit)
225         {
226             unlock_mutex(&this.queue_mutex);
227         }
228 
229         assert (this.waking_queue.size() == 0);
230 
231         auto tmp = this.waking_queue;
232         this.waking_queue = this.ready_queue;
233         this.ready_queue = tmp;
234     }
235 }