1 /******************************************************************************* 2 3 Task based epoll select dispatcher client. 4 5 Manages the epoll registration of a file descriptor, suspends a task to wait 6 for I/O events to occur for that file descriptor, and resumes the task to 7 handle the occurred events. 8 9 Copyright: 10 Copyright (c) 2017 dunnhumby Germany GmbH. 11 All rights reserved. 12 13 License: 14 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 15 Alternatively, this file may be distributed under the terms of the Tango 16 3-Clause BSD License (see LICENSE_BSD.txt for details). 17 18 *******************************************************************************/ 19 20 module ocean.io.select.protocol.task.TaskSelectClient; 21 22 import ocean.core.Verify; 23 import ocean.io.select.client.model.ISelectClient; 24 25 class TaskSelectClient: ISelectClient 26 { 27 import ocean.task.Task: Task; 28 import ocean.task.IScheduler: theScheduler; 29 import ocean.io.select.protocol.task.TimeoutException; 30 import ocean.util.log.Logger; 31 import ocean.meta.types.Qualifiers; 32 33 debug (SelectFiber) import ocean.io.Stdout: Stdout; 34 35 /*************************************************************************** 36 37 The I/O device. 38 39 ***************************************************************************/ 40 41 private ISelectable iodev; 42 43 /*************************************************************************** 44 45 The I/O events passed to `epoll.register()/modify()`. 46 Also used in logic to avoid unnecessary unregister/register sequences: 47 A non-zero value means the I/O device is, zero that it is not registered 48 with epoll. 49 50 ***************************************************************************/ 51 52 private Event events_expected; 53 54 /*************************************************************************** 55 56 The task that is suspended while waiting for events. 57 58 ***************************************************************************/ 59 60 private Task task; 61 62 /*************************************************************************** 63 64 The I/O events reported by epoll. 65 66 ***************************************************************************/ 67 68 private Event events_reported; 69 70 /*************************************************************************** 71 72 `true` if the I/O device timed out before an event occurred. 73 74 ***************************************************************************/ 75 76 private bool timeout_reported; 77 78 /*************************************************************************** 79 80 Reusable timeout exception. 81 82 ***************************************************************************/ 83 84 private TimeoutException e_timeout; 85 86 /*************************************************************************** 87 88 Obtains the current error status of the I/O device, such as socket 89 error. 90 91 ***************************************************************************/ 92 93 private int delegate ( ) error_code_dg; 94 95 /*************************************************************************** 96 97 Constructor. 98 99 Params: 100 iodev = the I/O device 101 error_code_dg = delegate to query the I/O device error status 102 103 ***************************************************************************/ 104 105 public this ( ISelectable iodev, scope int delegate ( ) error_code_dg ) 106 { 107 this.iodev = iodev; 108 this.error_code_dg = error_code_dg; 109 } 110 111 /************************************************************************** 112 113 Suspends the current task until epoll reports any event in 114 `events_expected` for the I/O device or the I/O device times out. 115 116 Params: 117 events_expected = the events to wait for (`EPOLLHUP` and `EPOLLERR` 118 are always implicitly added) 119 120 Returns: 121 the events reported by epoll. 122 123 Throws: 124 `EpollException` if registering with epoll failed, 125 `TimeoutException` on timeout waiting for I/O events. 126 127 **************************************************************************/ 128 129 public Event ioWait ( Event events_expected ) 130 out 131 { 132 assert(!this.task); 133 } 134 do 135 { 136 verify(events_expected != Event.init); 137 verify(this.task is null); 138 139 // Based on the current value of this.events_expected do the following: 140 // - If this.events_expected is 0 then the I/O device is not registered 141 // with epoll so register it. 142 // - If this.events_expected is non-zero then the I/O device is still 143 // registered from a previous call of this method so 144 // - if this.events_expected differs from events_expected then change 145 // the events of the existing registration, 146 // - otherwise don't touch the existing registration. 147 148 if (this.events_expected != events_expected) 149 { 150 bool already_registered = !!this.events_expected; 151 this.events_expected = events_expected; 152 auto epoll = theScheduler.epoll; 153 if (already_registered) 154 epoll.modify(this); 155 else 156 epoll.register(this); 157 } 158 159 try 160 { 161 this.task = task.getThis(); 162 verify(this.task !is null); 163 this.task.suspend(); 164 verify(this.task is null); 165 166 // At this point handle() has resumed the task and is blocked until 167 // the task is suspended again (or terminates). In order to avoid 168 // unecessary epoll unregister/register sequences we stay registered 169 // with epoll. Now there are two possible scenarios: 170 171 // 1. This method is called again without suspending in the mean 172 // time the task so handle() is still waiting. We will find 173 // events_expected as we left it i.e. not 0, meaning the I/O device 174 // is still registered. Then we set this.task to the current task 175 // and suspend again. This will resume handle(), which will see 176 // this.task is not null and therefore return true to stay 177 // registered with epoll and wait for events. 178 179 // 2. The task is suspended or terminates outside the reach of this 180 // class. handle() will be resumed and see this.task is null so it 181 // will set this.events_expected = 0 and return false to request 182 // being unregistered. If this method is called again, it will see 183 // events_expected == 0, meaning the I/O device is not registered, 184 // and register it. 185 186 if (this.events_reported & events_reported.EPOLLERR) 187 // Reset events_expected because with EPOLLERR the I/O device is 188 // automatically unregistered. 189 this.events_expected = events_expected.init; 190 191 if (this.timeout_reported) 192 { 193 this.timeout_reported = false; 194 if (this.e_timeout is null) 195 this.e_timeout = new TimeoutException; 196 throw this.e_timeout; 197 } 198 199 return this.events_reported; 200 } 201 finally 202 this.events_reported = this.events_reported.init; 203 } 204 205 /************************************************************************** 206 207 Unregisters the I/O device. 208 209 You need to call this method if you close and then reopen or otherwise 210 reassign the I/O device's file descriptor *without* suspending and 211 resuming or terminating and restarting the task in between. You may call 212 this method at any time, even if the I/O device is not (yet or any more) 213 usable, in which case it will not throw. 214 215 Calling `unregister` is needed if the file descriptor is closed and 216 reopened because when closing the OS unregisters it from epoll behind 217 this instance's back. However, 218 - suspending or terminating the task has the same effect as calling 219 `unregister` and 220 - if the file descriptor is not going to be closed then unregistering 221 is harmless, as `ioWait` this will re-register it on the next call. 222 223 Returns: 224 0 if everything worked as expected or 225 - `ENOENT` if the client was already unregistered or 226 - `EBADF` if the I/O device file descriptor does not refer to a 227 useable device (i.e. it was closed already or not opened yet). 228 In any case the I/O device is not registered with epoll. 229 230 Throws: 231 `EpollException` on error. 232 233 **************************************************************************/ 234 235 public int unregister ( ) 236 { 237 this.events_expected = events_expected.init; 238 return theScheduler.epoll.unregister(this); 239 } 240 241 /************************************************************************** 242 243 Resumes the task to handle `events`. 244 245 Params: 246 events = the events reported by epoll 247 248 Returns: 249 true to stay registered with epoll or false to be unregistered. 250 251 **************************************************************************/ 252 253 override public bool handle ( Event events ) 254 { 255 // This handler should only be called while ioWait() is blocked in the 256 // task.suspend() call so this.task should always refer to the task to 257 // be resumed. 258 if (Task task = this.task) 259 { 260 this.task = null; 261 262 debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~ 263 ".handle: fd {} task resumed", this.fd); 264 265 this.events_reported = events; 266 task.resume(); 267 268 debug ( SelectFiber ) Stdout.formatln(typeof(this).stringof ~ 269 ".handle: fd {} task yielded", this.fd); 270 271 if (this.task) 272 { 273 // After the task has been resumed, ioWait() has returned, then 274 // it was called again and suspended the task again so stay 275 // registered with epoll. 276 return true; 277 } 278 else 279 { 280 // The task was suspended outside ioWait() or has terminated so 281 // it is not waiting for events for this.iodev. 282 this.events_expected = this.events_expected.init; 283 return false; 284 } 285 } 286 else 287 { 288 // Should not happen, unregister to make the event we cannot handle 289 // firing. 290 Log.lookup(typeof(this).stringof).error( 291 "handle {} fd {} events {:X}: no task to resume, unregistering", 292 this.iodev, this.fd, events 293 ); 294 debug ( SelectFiber ) 295 Stdout.formatln(typeof(this).stringof ~ 296 ".handle: fd {} no task to resume, unregistering", this.fd); 297 return false; 298 } 299 } 300 301 /************************************************************************** 302 303 Returns: 304 the I/O device file handle. 305 306 **************************************************************************/ 307 308 override public Handle fileHandle ( ) { return this.iodev.fileHandle; } 309 310 /************************************************************************** 311 312 Returns: 313 the events to register the I/O device for. 314 315 **************************************************************************/ 316 317 override public Event events ( ) { return this.events_expected; } 318 319 /************************************************************************** 320 321 Returns: 322 current I/O (e.g. socket) error code, if available, or 0 otherwise. 323 324 **************************************************************************/ 325 326 override public int error_code ( ) { return this.error_code_dg(); } 327 328 /*************************************************************************** 329 330 Called if `EPOLLERR` was reported for the I/O device. 331 332 Future direction: This class will handle `EPOLLERR` through `handle()` 333 when error reporting and finalizing are removed from `ISelectClient`. 334 335 Params: 336 exception = not the issue here 337 event = the reported events including `EPOLLERR` 338 339 ***************************************************************************/ 340 341 override protected void error_ ( Exception exception, Event events ) 342 { 343 this.events_reported = events; 344 } 345 346 /************************************************************************** 347 348 Called after this instance was unregistered from epoll because either 349 - `handle()` returned `false` (`status.Success`) or 350 - the I/O device timed out (`status.Timeout`) or 351 - `EPOLLERR` has been reported for the I/O device (`status.Error`). 352 353 Note that for `status` other than `Success` `handle()` is not called. 354 355 Future direction: When error reporting and finalizing are removed from 356 `ISelectClient`, 357 - timeouts can be handled by overriding `void timeout()`, 358 - this class will handle `EPOLLERR` through `handle()`, 359 - finalization with `status.Success` doesn't need to be handled. 360 361 Params: 362 event = the events reported including `EPOLLERR` 363 364 **************************************************************************/ 365 366 override public void finalize ( FinalizeStatus status ) 367 { 368 if (Task task = this.task) 369 { 370 this.task = null; 371 this.events_expected = events_expected.init; 372 373 /* D2: final */ switch (status) 374 { 375 case status.Success: 376 break; 377 378 case status.Timeout: 379 this.timeout_reported = true; 380 task.resume(); 381 break; 382 383 case status.Error: 384 // error_() has been called before so ioWait() will simply 385 // return the reported events which include EPOLLERR. 386 task.resume(); 387 break; 388 389 default: assert(false); 390 } 391 } 392 } 393 394 /*************************************************************************** 395 396 Returns the file descriptor of the I/O device as `int`, for logging. 397 398 Returns: 399 the file descriptor of the I/O device. 400 401 ***************************************************************************/ 402 403 private int fd ( ) 404 { 405 auto handle = this.iodev.fileHandle; 406 407 static if (!is(typeof(handle) == int)) 408 { 409 static assert(is(handle.IsTypedef)); 410 static assert(is(typeof(handle.value) == int)); 411 } 412 413 return cast(int)handle; 414 } 415 }