1 /****************************************************************************** 2 3 Fiber/coroutine based non-blocking input select client base class 4 5 Base class for a non-blocking input select client using a fiber/coroutine to 6 suspend operation while waiting for the read event and resume on that event. 7 Provides a stream-like interface with consumer delegate invocation to 8 receive and consume data from the input until the consumer indicates it has 9 finished. 10 11 Copyright: 12 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 13 All rights reserved. 14 15 License: 16 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 17 Alternatively, this file may be distributed under the terms of the Tango 18 3-Clause BSD License (see LICENSE_BSD.txt for details). 19 20 ******************************************************************************/ 21 22 module ocean.io.select.protocol.fiber.FiberSelectReader; 23 24 25 import ocean.core.Verify; 26 27 import ocean.io.select.protocol.fiber.model.IFiberSelectProtocol; 28 29 import ocean.io.device.IODevice: IInputDevice; 30 31 import core.stdc.errno: errno, EAGAIN, EWOULDBLOCK, EINTR; 32 33 debug (Raw) import ocean.io.Stdout: Stderr; 34 35 36 /******************************************************************************/ 37 38 class FiberSelectReader : IFiberSelectProtocol 39 { 40 import ocean.core.Enforce: enforce; 41 42 /************************************************************************** 43 44 Default input buffer size (16 kB). 45 46 **************************************************************************/ 47 48 public static immutable size_t default_buffer_size = 16 * 1024; 49 50 /************************************************************************** 51 52 Consumer callback delegate type 53 54 Params: 55 data = data to consume 56 57 Returns: 58 - if finished, a value of [0, data.length] reflecting the number of 59 elements (bytes) consumed or 60 - a value greater than data.length if more data is required. 61 62 **************************************************************************/ 63 64 alias size_t delegate ( void[] data ) Consumer; 65 66 /************************************************************************** 67 68 Input device 69 70 **************************************************************************/ 71 72 public alias .IInputDevice IInputDevice; 73 74 private IInputDevice input; 75 76 /************************************************************************** 77 78 Data buffer 79 80 **************************************************************************/ 81 82 private void[] data; 83 84 /************************************************************************** 85 86 End index of available and consumed data. 87 88 Available data is data received by receive() or read() and not yet 89 consumed by the consumer delegate passed to consume() or read(). 90 Consumed data is data received by receive() or read() and already 91 consumed by the consumer delegate passed to consume() or read(). 92 93 **************************************************************************/ 94 95 private size_t available = 0, 96 consumed = 0; 97 98 /************************************************************************** 99 100 Invariant to assure consumed/available are in correct order and range 101 102 **************************************************************************/ 103 104 invariant() 105 { 106 assert (available <= this.data.length); 107 assert (consumed <= this.data.length); 108 assert (consumed <= available); 109 } 110 111 /************************************************************************** 112 113 Constructor. 114 115 error_e and warning_e may be the same object if distinguishing between 116 error and warning is not required. 117 118 Params: 119 input = input device 120 fiber = input reading fiber 121 warning_e = exception to throw on end-of-flow condition or if the 122 remote hung up 123 error_e = exception to throw on I/O error 124 buffer_size = input buffer size 125 126 In: 127 buffer_size must not be 0. 128 129 **************************************************************************/ 130 131 public this ( IInputDevice input, SelectFiber fiber, 132 IOWarning warning_e, IOError error_e, 133 size_t buffer_size = this.default_buffer_size ) 134 { 135 verify(buffer_size > 0, "zero input buffer size specified"); 136 137 super(this.input = input, Event.EPOLLIN | Event.EPOLLRDHUP, 138 fiber, warning_e, error_e); 139 140 this.data = new ubyte[buffer_size]; 141 } 142 143 /************************************************************************** 144 145 Constructor 146 147 Uses the conduit, fiber and exceptions from the other 148 IFiberSelectProtocol instance. This is useful when this instance shares 149 the conduit and fiber with another IFiberSelectProtocol instance, e.g. 150 a FiberSelectWriter. 151 152 The conduit owned by the other instance must have been downcast from 153 IInputDevice. 154 155 Params: 156 other = other instance of this class 157 buffer_size = input buffer size 158 159 In: 160 buffer_size must not be 0. 161 162 **************************************************************************/ 163 164 public this ( typeof (super) other, 165 size_t buffer_size = this.default_buffer_size ) 166 { 167 verify(buffer_size > 0, "zero input buffer size specified"); 168 169 super(other, Event.EPOLLIN | Event.EPOLLRDHUP); 170 171 this.data = new ubyte[buffer_size]; 172 173 this.input = cast (IInputDevice) this.conduit; 174 175 verify(this.input !is null, typeof (this).stringof ~ ": the conduit of " 176 ~ "the other " ~ typeof (super).stringof ~ " instance must be a " 177 ~ IInputDevice.stringof); 178 } 179 180 /************************************************************************** 181 182 Resets the amount of consumed/available data to 0. 183 184 Returns: 185 this instance 186 187 **************************************************************************/ 188 189 public typeof (this) reset ( ) 190 { 191 this.consumed = 0; 192 this.available = 0; 193 194 return this; 195 } 196 197 /************************************************************************** 198 199 Returns: 200 data in buffer available and consumed so far 201 202 **************************************************************************/ 203 204 public void[] consumed_data ( ) 205 { 206 return this.data[0 .. this.consumed]; 207 } 208 209 /************************************************************************** 210 211 Returns: 212 data in buffer available but not consumed so far 213 214 **************************************************************************/ 215 216 public void[] remaining_data ( ) 217 { 218 return this.data[this.consumed .. this.available]; 219 } 220 221 /************************************************************************** 222 223 Invokes consume with the data that are currently available and haven't 224 yet been consumed. 225 If the amount of data is sufficient, consume must return the number of 226 bytes it consumed. Otherwise if consume consumed all data and needs more 227 input data to be read from the I/O device, it must return a value 228 greater than the number of data bytes passed to it. 229 230 Invokes consume to consume the available data until consume indicates to 231 be finished or all available data is consumed. 232 233 Params: 234 consume = consumer callback delegate 235 236 Returns: 237 - true if all available data in buffer has been consumed and consume 238 indicated that it requires more or 239 - false if consume indicated to be finished. 240 241 **************************************************************************/ 242 243 public bool consume ( scope Consumer consume ) 244 { 245 size_t n = consume(this.data[this.consumed .. this.available]), 246 end = this.consumed + n; 247 248 // n can be very big (size_t.max) so end may overflow, n will be greater 249 // than this.available in this case. 250 251 if (end <= this.available && n <= this.available) 252 { 253 this.consumed = end; 254 255 return false; 256 } 257 else 258 { 259 // All data consumed: reset and return false if end == available. 260 261 this.reset(); 262 263 return end != this.available; 264 } 265 } 266 267 /************************************************************************** 268 269 Reads T.sizeof bytes from the socket and writes it to 'value' 270 Suspends if not enough data is available and resumes when 271 the data became available 272 273 Params: 274 value = reference to a variable to be filled 275 276 Returns: 277 this instance 278 279 Throws: 280 IOException if no data were received and won't arrive later: 281 - IOWarning on end-of-flow condition or if the remote hung up, 282 - IOError (IOWarning subclass) on I/O error. 283 284 **************************************************************************/ 285 286 public typeof (this) read ( T ) ( ref T value ) 287 { 288 return this.readRaw((cast(ubyte*)&value)[0 .. value.sizeof]); 289 } 290 291 /************************************************************************** 292 293 Reads data.length bytes from the socket and writes them to the array. 294 295 Will only return once enough data is available and the array could 296 be filled. 297 298 Params: 299 data_out = pre-allocated array which will be filled 300 301 Returns: 302 this instance 303 304 Throws: 305 IOException if no data were received and won't arrive later: 306 - IOWarning on end-of-flow condition or if the remote hung up, 307 - IOError (IOWarning subclass) on I/O error. 308 309 **************************************************************************/ 310 311 public typeof (this) readRaw ( ubyte[] data_out ) 312 { 313 auto available_data = this.data[this.consumed .. this.available]; 314 315 if (available_data.length >= data_out.length) 316 { 317 // Enough data are already in the buffer: Copy them into data_out 318 // and return. 319 data_out[] = cast(ubyte[])available_data[0 .. data_out.length]; 320 this.consumed += data_out.length; 321 if (this.consumed == this.available) 322 this.reset(); 323 } 324 else 325 { 326 // Not enough data in the buffer: First copy all buffered data to 327 // data_out. 328 data_out[0 .. available_data.length] = cast(ubyte[])available_data; 329 data_out = data_out[available_data.length .. $]; 330 this.reset(); 331 if (data_out.length > this.data.length) 332 { 333 // Read straight into data_out, circumventing the internal 334 // buffer, as long as the amount of data left to read is greater 335 // than the buffer size. 336 auto this_data = this.data; 337 this.data = data_out; 338 try 339 { 340 while ((this.data.length - this.available) > this_data.length) 341 this.receive(); 342 data_out = cast(ubyte[])this.data[this.available .. $]; 343 } 344 finally 345 { 346 this.data = this_data; 347 this.reset(); 348 } 349 } 350 351 // Read the remaining data. 352 while (this.available < data_out.length) 353 this.receive(); 354 data_out[] = cast(ubyte[]) 355 this.data[0 .. this.consumed = data_out.length]; 356 } 357 358 return this; 359 } 360 361 /************************************************************************** 362 363 Reads data from the input conduit and appends them to the data buffer, 364 waiting for data to be read from the input conduit if 365 366 If no data is available from the input conduit, the input reading fiber 367 is suspended and continues reading on resume. 368 369 Returns: 370 number of bytes read 371 372 Throws: 373 IOException if no data were received and won't arrive later: 374 - IOWarning on end-of-flow condition or if the remote hung up, 375 - IOError (IOWarning subclass) on I/O error. 376 377 **************************************************************************/ 378 379 public size_t receive ( ) 380 { 381 if (this.available >= this.data.length) 382 { 383 this.reset(); 384 } 385 386 size_t available_before = this.available; 387 388 this.transmitLoop(); 389 390 return this.available - available_before; 391 } 392 393 /************************************************************************** 394 395 Reads data from the input conduit, appends it to the data buffer and 396 invokes consume with the data that is currently available and hasn't 397 yet been consumed. 398 If consume feels that the amount of data passed to it is sufficient, 399 it must return the number of bytes it consumed. Otherwise if consume 400 consumed all the data and still needs more input data to be read from 401 the I/O device, it must return a value greater than length of the the 402 data passed to it. The fiber is then suspended to wait for more data 403 to be available from the input device, and consume is invoked again 404 with the newly available data. 405 406 Params: 407 consume = consumer callback delegate 408 409 Returns: 410 this instance 411 412 Throws: 413 IOException if no data was received and none will arrive later: 414 - IOWarning on end-of-flow condition or if the remote hung up, 415 - IOError (IOWarning subclass) on an I/O error. 416 417 **************************************************************************/ 418 419 public typeof (this) readConsume ( scope Consumer consume ) 420 { 421 bool more; 422 423 do 424 { 425 if (this.consumed >= this.available) // only this.consumed == this.available possible 426 { 427 this.receive(); 428 } 429 430 more = this.consume(consume); 431 } 432 while (more); 433 434 return this; 435 } 436 437 /************************************************************************** 438 439 Reads data from the input conduit and appends them to the data buffer. 440 441 Params: 442 events = events reported for the input device 443 444 Returns: 445 true if data were received or false to retry later. 446 447 Throws: 448 IOException if no data were received and won't arrive later: 449 - IOWarning on end-of-flow condition or if the remote hung up, 450 - IOError (IOWarning subclass) on I/O error. 451 452 Note: POSIX says the following about the return value of read(): 453 454 When attempting to read from an empty pipe or FIFO [remark: includes 455 sockets]: 456 457 - If no process has the pipe open for writing, read() shall return 0 458 to indicate end-of-file. 459 460 - If some process has the pipe open for writing and O_NONBLOCK is 461 set, read() shall return -1 and set errno to [EAGAIN]. 462 463 - If some process has the pipe open for writing and O_NONBLOCK is 464 clear, read() shall block the calling thread until some data is 465 written or the pipe is closed by all processes that had the pipe 466 open for writing. 467 468 @see http://pubs.opengroup.org/onlinepubs/009604499/functions/read.html 469 470 **************************************************************************/ 471 472 protected override bool transmit ( Event events ) 473 out (received) 474 { 475 assert (received <= data.length, "received length too high"); 476 } 477 do 478 { 479 verify(this.available < this.data.length, "requested to receive nothing"); 480 481 .errno = 0; 482 483 input.ssize_t n = this.input.read(this.data[this.available .. $]); 484 485 if (n <= 0) 486 { 487 // EOF or error: Check for socket error and hung-up event first. 488 489 this.error_e.checkDeviceError(n? "read error" : "end of flow whilst reading", __FILE__, __LINE__); 490 491 enforce(this.warning_e, !(events & events.EPOLLRDHUP), "connection hung up on read"); 492 enforce(this.warning_e, !(events & events.EPOLLHUP), "connection hung up"); 493 494 if (n) 495 { 496 // read() error and no socket error or hung-up event: Check 497 // errno. Carry on if there are just currently no data available 498 // (EAGAIN/EWOULDBLOCK/EINTR) or throw error otherwise. 499 500 switch (.errno) 501 { 502 default: 503 this.error_e.enforce(false, "read error"); 504 assert (false); 505 506 case EINTR, EAGAIN: 507 static if ( EAGAIN != EWOULDBLOCK ) 508 { 509 case EWOULDBLOCK: 510 } 511 512 // EAGAIN/EWOULDBLOCK: currently no data available. 513 // EINTR: read() was interrupted by a signal before data 514 // became available. 515 516 n = 0; 517 } 518 } 519 else 520 { 521 // EOF and no socket error or hung-up event: Throw EOF warning. 522 enforce(this.warning_e, false, "end of flow whilst reading"); 523 } 524 } 525 else 526 { 527 debug (Raw) Stderr.formatln("[{}] Read {:X2} ({} bytes)", 528 super.conduit.fileHandle, 529 this.data[this.available .. this.available + n], n); 530 531 this.available += n; 532 } 533 534 verify(n >= 0); 535 536 return !n; 537 } 538 } 539 540 /******************************************************************************/ 541 542 version (unittest) 543 { 544 import ocean.io.select.client.model.ISelectClient; 545 import ocean.io.select.fiber.SelectFiber; 546 import core.sys.posix.stdlib; 547 import ocean.core.Test; 548 import ocean.meta.types.Qualifiers; 549 } 550 551 unittest 552 { 553 static class UnSelectFiber: SelectFiber 554 { 555 this ( ) { super(null, {assert(false);}); } 556 override: 557 Message start (Message) { assert(false); } 558 Message suspend (Token, Object, Message) { assert(false); } 559 Message resume (Token, Object, Message) { assert(false); } 560 void kill (istring, long) { assert(false); } 561 bool register (ISelectClient) { assert(false); } 562 bool unregister () { assert(false); } 563 } 564 565 // All numeric constants related to buffer sizes and amounts of data below 566 // are tuned to test the internals of readRaw(). 567 568 static class Input: IInputDevice 569 { 570 /// Input data for `read()`. 571 void[] data; 572 573 /// Copies `data[0 .. n]` into `dst[0 .. n]` where `n` is the minimum of 574 /// `data.length`, `dst.length` and 15, and advances 575 /// `data = data[n .. $]`. Returns `n`. 576 ssize_t read ( void[] dst ) 577 { 578 if (dst.length > this.data.length) 579 dst = dst[0 .. this.data.length]; 580 if (dst.length > 15) 581 dst = dst[0 .. 15]; 582 583 dst[] = this.data[0 .. dst.length]; 584 this.data = this.data[dst.length .. $]; 585 return dst.length; 586 } 587 588 Handle fileHandle ( ) { return cast(Handle)4711; } /// Interface method 589 } 590 591 static class TestFiberSelectReader: FiberSelectReader 592 { 593 this ( IInputDevice input, SelectFiber fiber ) 594 { 595 auto e = new IOError(input); 596 super(input, fiber, e, e, 50); 597 } 598 599 override protected void transmitLoop ( ) 600 { 601 while (this.transmit(Event.init)) {} 602 } 603 } 604 605 // The test starts here. 606 607 scope input = new Input, 608 fiber = new UnSelectFiber, 609 reader = new TestFiberSelectReader(input, fiber); 610 611 ubyte[100] input_data, output_data; 612 613 ushort[3] xsubi; 614 xsubi[0] = 0x330E; // see jrand48() documentation 615 foreach (ref n; cast(uint[])input_data) 616 n = cast(uint)jrand48(xsubi); 617 618 input.data = input_data; 619 620 reader.readRaw(output_data[0 .. 10]); 621 test!("==")(input_data[0 .. 10], output_data[0 .. 10]); 622 623 reader.readRaw(output_data[10 .. $ - 3]); 624 test!("==")(input_data[10 .. $ - 3], output_data[10 .. $ - 3]); 625 626 reader.readRaw(output_data[$ - 3 .. $]); 627 test!("==")(input_data[$ - 3 .. $], output_data[$ - 3 .. $]); 628 }