1 module async.net.tcpclient; 2 3 debug import std.stdio; 4 5 import core.stdc.errno; 6 import core.stdc.string; 7 import core.thread; 8 import core.sync.rwmutex; 9 10 import std.socket; 11 import std.conv; 12 import std.string; 13 14 import async.event.selector; 15 import async.eventloop; 16 import async.net.tcpstream; 17 import async.container.bytebuffer; 18 19 class TcpClient : TcpStream 20 { 21 this(Selector selector, Socket socket) 22 { 23 super(socket); 24 25 _selector = selector; 26 _hasReadEvent = false; 27 _hasWriteEvent = false; 28 _reading = false; 29 _writing = false; 30 _sendLock = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_WRITERS); 31 32 _remoteAddress = remoteAddress.toString(); 33 _fd = fd; 34 _currentEventType = EventType.READ; 35 _closing = false; 36 37 _lastWriteOffset = 0; 38 } 39 40 void weakup(EventType event) 41 { 42 final switch (event) 43 { 44 case EventType.READ: 45 _hasReadEvent = true; 46 beginRead(); 47 break; 48 case EventType.WRITE: 49 _hasWriteEvent = true; 50 beginWrite(); 51 break; 52 case EventType.ACCEPT: 53 case EventType.READWRITE: 54 break; 55 } 56 } 57 58 private: 59 60 void beginRead() 61 { 62 _hasReadEvent = false; 63 64 if (_reading) 65 { 66 return; 67 } 68 69 _reading = true; 70 _selector.workerPool.run!read(this); 71 } 72 73 protected static void read(TcpClient client) 74 { 75 ubyte[] data; 76 ubyte[4096] buffer; 77 78 while (!client._closing && client.isAlive) 79 { 80 long len = client._socket.receive(buffer); 81 82 if (len > 0) 83 { 84 data ~= buffer[0 .. cast(uint)len]; 85 86 continue; 87 } 88 else if (len == 0) 89 { 90 client.readCallback(-1); 91 return; 92 } 93 else 94 { 95 if (errno == EINTR) 96 { 97 continue; 98 } 99 else if (errno == EAGAIN || errno == EWOULDBLOCK) 100 { 101 break; 102 } 103 else 104 { 105 client.readCallback(errno); 106 return; 107 } 108 } 109 } 110 111 if ((data.length > 0) && (client._selector.onReceive !is null)) 112 { 113 client._selector.onReceive(client, data); 114 } 115 116 client.readCallback(0); 117 } 118 119 void readCallback(int err) // err: 0: OK, -1: client disconnection, 1,2... errno 120 { 121 version (linux) 122 { 123 if (err == -1) 124 { 125 _selector.removeClient(fd, err); 126 } 127 } 128 129 _reading = false; 130 131 if (_hasReadEvent) 132 { 133 beginRead(); 134 } 135 } 136 137 void beginWrite() 138 { 139 _hasWriteEvent = false; 140 141 if (_writing) 142 { 143 return; 144 } 145 146 _writing = true; 147 _selector.workerPool.run!write(this); 148 } 149 150 protected static void write(TcpClient client) 151 { 152 while (!client._closing && client.isAlive && (!client._writeQueue.empty() || (client._lastWriteOffset > 0))) 153 { 154 if (client._writingData.length == 0) 155 { 156 synchronized (client._sendLock.writer) 157 { 158 client._writingData = client._writeQueue.front; 159 client._writeQueue.popFront(); 160 client._lastWriteOffset = 0; 161 } 162 } 163 164 while (!client._closing && client.isAlive && (client._lastWriteOffset < client._writingData.length)) 165 { 166 long len = client._socket.send(client._writingData[cast(uint)client._lastWriteOffset .. $]); 167 168 if (len > 0) 169 { 170 client._lastWriteOffset += len; 171 172 continue; 173 } 174 else if (len == 0) 175 { 176 //client._selector.removeClient(fd); 177 178 if (client._lastWriteOffset < client._writingData.length) 179 { 180 if (client._selector.onSendCompleted !is null) 181 { 182 client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset); 183 } 184 185 debug writefln("The sending is incomplete, the total length is %d, but actually sent only %d.", client._writingData.length, client._lastWriteOffset); 186 } 187 188 client._writingData.length = 0; 189 client._lastWriteOffset = 0; 190 191 client.writeCallback(-1); // sending is break and incomplete. 192 return; 193 } 194 else 195 { 196 if (errno == EINTR) 197 { 198 continue; 199 } 200 else if (errno == EAGAIN || errno == EWOULDBLOCK) 201 { 202 if (client._currentEventType != EventType.READWRITE) 203 { 204 client._selector.reregister(client.fd, EventType.READWRITE); 205 client._currentEventType = EventType.READWRITE; 206 } 207 208 client.writeCallback(0); // Wait eventloop notify to continue again; 209 return; 210 } 211 else 212 { 213 client._writingData.length = 0; 214 client._lastWriteOffset = 0; 215 216 client.writeCallback(errno); // Some error. 217 return; 218 } 219 } 220 } 221 222 if (client._lastWriteOffset == client._writingData.length) 223 { 224 if (client._selector.onSendCompleted !is null) 225 { 226 client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset); 227 } 228 229 client._writingData.length = 0; 230 client._lastWriteOffset = 0; 231 } 232 } 233 234 if (client._writeQueue.empty() && (client._writingData.length == 0) && (client._currentEventType == EventType.READWRITE)) 235 { 236 client._selector.reregister(client.fd, EventType.READ); 237 client._currentEventType = EventType.READ; 238 } 239 240 client.writeCallback(0); 241 return; 242 } 243 244 void writeCallback(int err) // err: 0: OK, -1: client disconnection, 1,2... errno 245 { 246 _writing = false; 247 248 if (_hasWriteEvent) 249 { 250 beginWrite(); 251 } 252 } 253 254 public: 255 256 int send(ubyte[] data) 257 { 258 if (data.length == 0) 259 { 260 return -1; 261 } 262 263 if (!isAlive()) 264 { 265 return -2; 266 } 267 268 synchronized (_sendLock.writer) 269 { 270 _writeQueue ~= data; 271 } 272 273 weakup(EventType.WRITE); // First write direct, and when it encounter EAGAIN, it will open the EVENT notification. 274 275 return 0; 276 } 277 278 void close() 279 { 280 _closing = true; 281 282 _socket.shutdown(SocketShutdown.BOTH); 283 _socket.close(); 284 } 285 286 /* 287 Important: 288 289 The method for emergency shutdown of the application layer is close the socket. 290 When a message that does not meet the requirements is sent to the server, 291 this method should be called to avoid the waste of resources. 292 */ 293 void forceClose() 294 { 295 if (isAlive) 296 { 297 _selector.removeClient(fd); 298 } 299 } 300 301 public: 302 303 string _remoteAddress; 304 int _fd; 305 306 private: 307 308 Selector _selector; 309 shared bool _hasReadEvent; 310 shared bool _hasWriteEvent; 311 shared bool _reading; 312 shared bool _writing; 313 314 ByteBuffer _writeQueue; 315 ubyte[] _writingData; 316 size_t _lastWriteOffset; 317 ReadWriteMutex _sendLock; 318 319 EventType _currentEventType; 320 shared bool _closing; 321 }