1 /******************************************************************************* 2 3 Fiberless SelectReader 4 5 Copyright: 6 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 7 All rights reserved. 8 9 License: 10 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 11 Alternatively, this file may be distributed under the terms of the Tango 12 3-Clause BSD License (see LICENSE_BSD.txt for details). 13 14 *******************************************************************************/ 15 16 module ocean.io.select.protocol.SelectReader; 17 18 19 import ocean.core.Verify; 20 import ocean.io.select.client.model.ISelectClient; 21 import ocean.io.device.IODevice; 22 import ocean.io.select.protocol.generic.ErrnoIOException; 23 24 import core.stdc.errno; 25 26 debug (Raw) import ocean.io.Stdout : Stderr; 27 debug (SelectFiber) import ocean.io.Stdout : Stderr; 28 29 /******************************************************************************* 30 31 SelectReader without Fiber 32 33 This is useful for when you want to read when there is something to read but 34 you don't want to block/wait/suspend your fiber when there is nothing. 35 36 *******************************************************************************/ 37 38 class SelectReader : IAdvancedSelectClient 39 { 40 /*************************************************************************** 41 42 Reader device 43 44 ***************************************************************************/ 45 46 private IInputDevice input; 47 48 /*************************************************************************** 49 50 Reader buffer 51 52 ***************************************************************************/ 53 54 private ubyte[] buffer; 55 56 /*************************************************************************** 57 58 Reader delegate, will be called with new data 59 60 ***************************************************************************/ 61 62 private void delegate ( void[] data ) reader; 63 64 /************************************************************************** 65 66 IOWarning exception instance 67 68 **************************************************************************/ 69 70 protected IOWarning warning_e; 71 72 /************************************************************************** 73 74 IOError exception instance 75 76 **************************************************************************/ 77 78 protected IOError error_e; 79 80 /*************************************************************************** 81 82 Events to we are interested in 83 84 ***************************************************************************/ 85 86 private Event events_ = Event.EPOLLIN | Event.EPOLLRDHUP; 87 88 /*************************************************************************** 89 90 Constructor 91 92 Params: 93 input = input device to use 94 buffer_size = buffer size to use 95 warning_e = instance of a reusable exception to use, will be 96 allocated if null 97 error_e = instance of a reusable exception to use, will be 98 allocated if null 99 100 ***************************************************************************/ 101 102 public this ( IInputDevice input, size_t buffer_size, IOWarning warning_e = 103 null , IOError error_e = null) 104 { 105 this.input = input; 106 this.buffer = new ubyte[buffer_size]; 107 108 this.warning_e = warning_e is null ? new IOWarning(input) : warning_e; 109 this.error_e = error_e is null ? new IOError(input) : error_e; 110 } 111 112 113 /************************************************************************** 114 115 Returns: 116 the events to register the I/O device for. 117 118 **************************************************************************/ 119 120 public override Event events ( ) 121 { 122 return this.events_; 123 } 124 125 126 /************************************************************************** 127 128 Returns: 129 the I/O device file handle. 130 131 **************************************************************************/ 132 133 public override Handle fileHandle ( ) 134 { 135 return this.input.fileHandle(); 136 } 137 138 139 /*************************************************************************** 140 141 Feed delegate with data that was read. 142 143 Params: 144 dg = delegate to call with new data 145 146 ***************************************************************************/ 147 148 public void read ( scope void delegate ( void[] data ) dg ) 149 { 150 this.reader = dg; 151 152 this.read(Event.None); 153 } 154 155 156 /*************************************************************************** 157 158 Read data if events don't indicate end of connection 159 160 Params: 161 events = events 162 163 ***************************************************************************/ 164 165 private void read ( Event events ) 166 { 167 .errno = 0; 168 169 input.ssize_t n = this.input.read(this.buffer); 170 171 if (n <= 0) 172 { 173 // EOF or error: Check for socket error and hung-up event first. 174 175 this.error_e.checkDeviceError(n? "read error" : "end of flow whilst reading", __FILE__, __LINE__); 176 177 this.warning_e.enforce(!(events & events.EPOLLRDHUP), "connection hung up on read"); 178 this.warning_e.enforce(!(events & events.EPOLLHUP), "connection hung up"); 179 180 if (n) 181 { 182 // read() error and no socket error or hung-up event: Check 183 // errno. Carry on if there are just currently no data available 184 // (EAGAIN/EWOULDBLOCK/EINTR) or throw error otherwise. 185 186 int errnum = .errno; 187 188 switch (errnum) 189 { 190 default: 191 throw this.error_e.set(errnum, "read error"); 192 193 case EINTR, EAGAIN: 194 static if ( EAGAIN != EWOULDBLOCK ) 195 { 196 case EWOULDBLOCK: 197 } 198 199 // EAGAIN/EWOULDBLOCK: currently no data available. 200 // EINTR: read() was interrupted by a signal before data 201 // became available. 202 203 n = 0; 204 } 205 } 206 else 207 { 208 // EOF and no socket error or hung-up event: Throw EOF warning. 209 210 this.warning_e.enforce(false, "end of flow whilst reading"); 211 } 212 } 213 else 214 { 215 debug (Raw) Stderr.formatln("[{}] Read {:X2} ({} bytes)", 216 this.fileHandle, 217 this.buffer[0 .. n], n); 218 } 219 220 verify(n >= 0); 221 222 if ( n > 0 ) 223 { 224 this.reader(this.buffer[0 .. n]); 225 } 226 } 227 228 229 /*************************************************************************** 230 231 Handle socket events 232 233 Params: 234 events = events to handle 235 236 Returns: 237 true, so it stays registered 238 239 ***************************************************************************/ 240 241 final override protected bool handle ( Event events ) 242 { 243 this.read(events); 244 debug ( SelectFiber ) Stderr.formatln("{}.handle: fd {} read() called", 245 typeof(this).stringof, this.fileHandle); 246 247 return true; 248 } 249 }