1 /******************************************************************************* 2 3 Copyright: 4 Copyright (c) 2006 Juan Jose Comellas. 5 Some parts copyright (c) 2009-2016 dunnhumby Germany GmbH. 6 All rights reserved. 7 8 License: 9 Tango Dual License: 3-Clause BSD License / Academic Free License v3.0. 10 See LICENSE_TANGO.txt for details. 11 12 Authors: Juan Jose Comellas <juanjo@comellas.com.ar> 13 14 *******************************************************************************/ 15 16 module ocean.sys.Pipe; 17 18 import ocean.meta.types.Qualifiers; 19 20 import ocean.sys.Common; 21 import ocean.io.device.Device; 22 23 import ocean.core.ExceptionDefinitions; 24 25 import core.sys.posix.unistd; 26 27 debug (PipeConduit) 28 { 29 import ocean.io.Stdout; 30 } 31 32 private enum {DefaultBufferSize = 8 * 1024} 33 34 35 /** 36 * Conduit for pipes. 37 * 38 * Each PipeConduit can only read or write, depending on the way it has been 39 * created. 40 */ 41 42 class PipeConduit : Device 43 { 44 private uint _bufferSize; 45 46 47 /** 48 * Create a PipeConduit with the provided handle and access permissions. 49 * 50 * Params: 51 * handle = handle of the operating system pipe we will wrap inside 52 * the PipeConduit. 53 * style = access flags for the pipe (readable, writable, etc.). 54 * bufferSize = buffer size. 55 */ 56 private this(Handle handle, uint bufferSize = DefaultBufferSize) 57 { 58 this.handle = handle; 59 _bufferSize = bufferSize; 60 } 61 62 /** 63 * Destructor. 64 */ 65 public ~this() 66 { 67 close(); 68 } 69 70 /** 71 * Returns the buffer size for the PipeConduit. 72 */ 73 public override size_t bufferSize() 74 { 75 return _bufferSize; 76 } 77 78 /** 79 * Returns the name of the device. 80 */ 81 public override istring toString() 82 { 83 return "<pipe>"; 84 } 85 } 86 87 /** 88 * Factory class for Pipes. 89 */ 90 class Pipe 91 { 92 private PipeConduit _source; 93 private PipeConduit _sink; 94 95 /** 96 * Create a Pipe. 97 */ 98 public this(uint bufferSize = DefaultBufferSize) 99 { 100 int[2] fd; 101 102 if (pipe(fd) == 0) 103 { 104 _source = new PipeConduit(cast(ISelectable.Handle) fd[0], bufferSize); 105 _sink = new PipeConduit(cast(ISelectable.Handle) fd[1], bufferSize); 106 } 107 else 108 { 109 error(); 110 } 111 } 112 113 /* Replaces the old pipe with a new one. No memory allocation is performed. 114 */ 115 public void recreate(uint bufferSize = DefaultBufferSize) 116 { 117 int[2] fd; 118 119 if (pipe(fd) == 0) 120 { 121 _source.reopen(cast(ISelectable.Handle) fd[0]); 122 _source._bufferSize = bufferSize; 123 _sink.reopen(cast(ISelectable.Handle) fd[1]); 124 _sink._bufferSize = bufferSize; 125 } 126 else 127 { 128 error(); 129 } 130 } 131 132 /** 133 * Return the PipeConduit that you can write to. 134 */ 135 public PipeConduit sink() 136 { 137 return _sink; 138 } 139 140 /** 141 * Return the PipeConduit that you can read from. 142 */ 143 public PipeConduit source() 144 { 145 return _source; 146 } 147 148 /** 149 * Closes source and sink conduits. 150 */ 151 public void close() 152 { 153 _source.close(); 154 _sink.close(); 155 } 156 157 /** 158 * 159 */ 160 private final void error () 161 { 162 throw new IOException("Pipe error: " ~ SysError.lastMsg); 163 } 164 } 165