1 /****************************************************************************** 2 3 Fiber/coroutine based non-blocking I/O select client base class 4 5 Base class for a non-blocking I/O select client using a fiber/coroutine to 6 suspend operation while waiting for the I/O event and resume on that event. 7 8 Copyright: 9 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 10 All rights reserved. 11 12 License: 13 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 14 Alternatively, this file may be distributed under the terms of the Tango 15 3-Clause BSD License (see LICENSE_BSD.txt for details). 16 17 ******************************************************************************/ 18 19 module ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 20 21 22 import ocean.core.MessageFiber; 23 24 import ocean.core.Verify; 25 26 import ocean.io.select.client.model.IFiberSelectClient; 27 28 import ocean.io.select.fiber.SelectFiber; 29 30 import ocean.io.select.protocol.generic.ErrnoIOException: IOError, IOWarning; 31 32 import ocean.meta.types.Qualifiers; 33 34 debug ( SelectFiber ) import ocean.io.Stdout : Stderr; 35 36 37 /******************************************************************************/ 38 39 abstract class IFiberSelectProtocol : IFiberSelectClient 40 { 41 /*************************************************************************** 42 43 Token used when suspending / resuming fiber. 44 45 ***************************************************************************/ 46 47 static private MessageFiber.Token IOReady; 48 49 /*************************************************************************** 50 51 Static ctor. Initialises fiber token. 52 53 ***************************************************************************/ 54 55 static this ( ) 56 { 57 IOReady = MessageFiber.Token("io_ready"); 58 } 59 60 /************************************************************************** 61 62 Local aliases 63 64 **************************************************************************/ 65 66 protected alias .SelectFiber SelectFiber; 67 68 public alias .IOWarning IOWarning; 69 public alias .IOError IOError; 70 71 /************************************************************************** 72 73 I/O device 74 75 **************************************************************************/ 76 77 protected ISelectable conduit; 78 79 /************************************************************************** 80 81 Events to register the I/O device for. 82 83 **************************************************************************/ 84 85 protected Event events_; 86 87 /************************************************************************** 88 89 IOWarning exception instance 90 91 **************************************************************************/ 92 93 protected IOWarning warning_e; 94 95 /************************************************************************** 96 97 IOError exception instance 98 99 **************************************************************************/ 100 101 protected IOError error_e; 102 103 /************************************************************************** 104 105 Events reported to handle() 106 107 **************************************************************************/ 108 109 private Event events_reported; 110 111 /************************************************************************** 112 113 Constructor 114 115 Params: 116 conduit = I/O device 117 events = the epoll events to register the device for 118 fiber = fiber to use to suspend and resume operation 119 120 **************************************************************************/ 121 122 protected this ( ISelectable conduit, Event events, SelectFiber fiber ) 123 { 124 this(conduit, events, fiber, new IOWarning(conduit), new IOError(conduit)); 125 } 126 127 /************************************************************************** 128 129 Constructor 130 131 Note: If distinguishing between warnings and errors is not desired or 132 required, pass the same object for warning_e and error_e. 133 134 135 Params: 136 conduit = I/O device 137 events = the epoll events to register the device for 138 fiber = fiber to use to suspend and resume operation 139 warning_e = Exception instance to throw for warnings 140 error_e = Exception instance to throw on errors and to query 141 device specific error codes if possible 142 143 **************************************************************************/ 144 145 protected this ( ISelectable conduit, Event events, SelectFiber fiber, 146 IOWarning warning_e, IOError error_e ) 147 { 148 verify(conduit !is null); 149 verify(warning_e !is null); 150 verify(error_e !is null); 151 152 super(fiber); 153 this.conduit = conduit; 154 this.events_ = events; 155 this.warning_e = warning_e; 156 this.error_e = error_e; 157 } 158 159 /************************************************************************** 160 161 Constructor 162 163 Uses the conduit, fiber and exceptions from the other instance. This is 164 useful when instances of several subclasses share the same conduit and 165 fiber. 166 167 Params: 168 other = other instance of this class 169 events = the epoll events to register the device for 170 171 **************************************************************************/ 172 173 protected this ( typeof (this) other, Event events ) 174 { 175 this(other.conduit, events, other.fiber, other.warning_e, other.error_e); 176 } 177 178 /************************************************************************** 179 180 Returns: 181 the I/O device file handle. 182 183 **************************************************************************/ 184 185 public override Handle fileHandle ( ) 186 { 187 return this.conduit.fileHandle(); 188 } 189 190 /************************************************************************** 191 192 Returns: 193 the events to register the I/O device for. 194 195 **************************************************************************/ 196 197 public override Event events ( ) 198 { 199 return this.events_; 200 } 201 202 /************************************************************************** 203 204 Returns: 205 current socket error code, if available, or 0 otherwise. 206 207 **************************************************************************/ 208 209 public override int error_code ( ) 210 { 211 return this.error_e.error_code; 212 } 213 214 /************************************************************************** 215 216 Resumes the fiber coroutine and handle the events reported for the 217 conduit. The fiber must be suspended (HOLD state). 218 219 Note that the fiber coroutine keeps going after this method has finished 220 if there is another instance of this class which shares the fiber with 221 this instance and is invoked in the coroutine after this instance has 222 done its job. 223 224 Returns: 225 false if the fiber is finished or true if it keeps going 226 227 Throws: 228 IOException on I/O error 229 230 **************************************************************************/ 231 232 final override protected bool handle ( Event events ) 233 { 234 verify(this.fiber.waiting); 235 236 this.events_reported = events; 237 238 debug ( SelectFiber ) Stderr.formatln("{}.handle: fd {} fiber resumed", 239 typeof(this).stringof, this.conduit.fileHandle); 240 SelectFiber.Message message = this.fiber.resume(IOReady, this); // SmartUnion 241 debug ( SelectFiber ) Stderr.formatln("{}.handle: fd {} fiber yielded, message type = {}", 242 typeof(this).stringof, this.conduit.fileHandle, message.active); 243 244 return (message.active == message.active.num)? message.num != 0 : false; 245 } 246 247 /************************************************************************** 248 249 Registers this instance in the select dispatcher and repeatedly calls 250 transmit() until the transmission is finished. 251 252 Throws: 253 IOException on I/O error, KillableFiber.KilledException if the 254 fiber was killed. 255 256 In: 257 The fiber must be running. 258 259 **************************************************************************/ 260 261 protected void transmitLoop ( ) 262 { 263 verify(this.fiber.running); 264 265 // The reported events are reset at this point to avoid using the events 266 // set by a previous run of this method. 267 268 try for (bool more = this.transmit(this.events_reported = this.events_reported.init); 269 more; 270 more = this.transmit(this.events_reported)) 271 { 272 super.fiber.register(this); 273 274 // Calling suspend() triggers an epoll wait, which will in turn call 275 // handle_() (above) when an event fires for this client. handle_() 276 // sets this.events_reported to the event reported by epoll. 277 super.fiber.suspend(IOReady, this, fiber.Message(true)); 278 279 this.error_e.enforce(!(this.events_reported & Event.EPOLLERR), "I/O error"); 280 } 281 catch (SelectFiber.KilledException e) 282 { 283 throw e; 284 } 285 catch (Exception e) 286 { 287 if (super.fiber.isRegistered(this)) 288 { 289 debug ( SelectFiber) Stderr.formatln("{}.transmitLoop: suspending fd {} fiber ({} @ {}:{})", 290 typeof(this).stringof, this.conduit.fileHandle, e.message(), e.file, e.line); 291 292 // Exceptions thrown by transmit() or in the case of the Error 293 // event are passed to the fiber resume() to be rethrown in 294 // handle_(), above. 295 super.fiber.suspend(IOReady, e); 296 297 debug ( SelectFiber) Stderr.formatln("{}.transmitLoop: resumed fd {} fiber, rethrowing ({} @ {}:{})", 298 typeof(this).stringof, this.conduit.fileHandle, e.message(), e.file, e.line); 299 } 300 301 throw e; 302 } 303 } 304 305 /************************************************************************** 306 307 Reads/writes data from/to super.conduit for which events have been 308 reported. 309 310 Params: 311 events = events reported for super.conduit 312 313 Returns: 314 true to be invoked again (after an epoll wait) or false if finished 315 316 **************************************************************************/ 317 318 abstract protected bool transmit ( Event events ); 319 }