1 module async.net.tcpclient; 2 3 debug import std.stdio; 4 5 import core.stdc.errno; 6 import core.stdc.string; 7 import core.thread; 8 import core.sync.rwmutex; 9 10 import std.socket; 11 import std.conv; 12 import std.string; 13 import std.typecons : Tuple; 14 15 import async.event.selector; 16 import async.eventloop; 17 import async.net.tcpstream; 18 import async.container.bytebuffer; 19 20 class TcpClient : TcpStream 21 { 22 this(Selector selector, Socket socket) 23 { 24 super(socket); 25 26 _selector = selector; 27 _remoteAddress = remoteAddress.toString(); 28 _fd = fd; 29 _closing = false; 30 31 version (Windows) { } else 32 { 33 _hasReadEvent = false; 34 _hasWriteEvent = false; 35 _reading = false; 36 _writing = false; 37 _sendLock = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_WRITERS); 38 _currentEventType = EventType.READ; 39 _lastWriteOffset = 0; 40 } 41 } 42 43 version (Windows) { } else void weakup(EventType event) 44 { 45 final switch (event) 46 { 47 case EventType.READ: 48 _hasReadEvent = true; 49 beginRead(); 50 break; 51 case EventType.WRITE: 52 _hasWriteEvent = true; 53 beginWrite(); 54 break; 55 case EventType.ACCEPT: 56 case EventType.READWRITE: 57 break; 58 } 59 } 60 61 private: 62 63 version (Windows) { } else 64 { 65 void beginRead() 66 { 67 _hasReadEvent = false; 68 69 if (_reading) 70 { 71 return; 72 } 73 74 _reading = true; 75 _selector.workerPool.run!read(this); 76 } 77 78 protected static void read(TcpClient client) 79 { 80 ubyte[] data; 81 ubyte[4096] buffer; 82 83 while (!client._closing && client.isAlive) 84 { 85 long len = client._socket.receive(buffer); 86 87 if (len > 0) 88 { 89 data ~= buffer[0 .. cast(uint)len]; 90 91 continue; 92 } 93 else if (len == 0) 94 { 95 client.readCallback(-1); 96 return; 97 } 98 else 99 { 100 if (errno == EINTR) 101 { 102 continue; 103 } 104 else if (errno == EAGAIN/* || errno == EWOULDBLOCK*/) 105 { 106 break; 107 } 108 else 109 { 110 client.readCallback(errno); 111 return; 112 } 113 } 114 } 115 116 if ((data.length > 0) && (client._selector.onReceive !is null)) 117 { 118 if (client._selector.codec is null) 119 { 120 client._selector.onReceive(client, data); 121 } 122 else 123 { 124 client._receiveBuffer ~= data; 125 126 label_parseOne: 127 const Tuple!(long, size_t) ret = client._selector.codec.decode(client._receiveBuffer); 128 129 if (ret[0] >= 0) 130 { 131 const ubyte[] message = client._receiveBuffer[0 .. ret[0]]; 132 client._receiveBuffer.popFront(ret[0] + ret[1]); 133 client._selector.onReceive(client, message); 134 goto label_parseOne; 135 } 136 else if (ret[0] == -2) // The magic is error. 137 { 138 client.forceClose(); 139 return; 140 } 141 } 142 } 143 144 client.readCallback(0); 145 } 146 147 void readCallback(const int err) // err: 0: OK, -1: client disconnection, 1,2... errno 148 { 149 version (linux) 150 { 151 if (err == -1) 152 { 153 _selector.removeClient(fd, err); 154 } 155 } 156 157 _reading = false; 158 159 if (_hasReadEvent) 160 { 161 beginRead(); 162 } 163 } 164 165 void beginWrite() 166 { 167 _hasWriteEvent = false; 168 169 if (_writing) 170 { 171 return; 172 } 173 174 _writing = true; 175 _selector.workerPool.run!write(this); 176 } 177 178 protected static void write(TcpClient client) 179 { 180 while (!client._closing && client.isAlive && (!client._writeQueue.empty() || (client._lastWriteOffset > 0))) 181 { 182 if (client._writingData.length == 0) 183 { 184 synchronized (client._sendLock.writer) 185 { 186 client._writingData = client._writeQueue.front; 187 client._writeQueue.popFront(); 188 client._lastWriteOffset = 0; 189 } 190 } 191 192 while (!client._closing && client.isAlive && (client._lastWriteOffset < client._writingData.length)) 193 { 194 long len = client._socket.send(client._writingData[cast(uint)client._lastWriteOffset .. $]); 195 196 if (len > 0) 197 { 198 client._lastWriteOffset += len; 199 200 continue; 201 } 202 else if (len == 0) 203 { 204 //client._selector.removeClient(fd); 205 206 if (client._lastWriteOffset < client._writingData.length) 207 { 208 if (client._selector.onSendCompleted !is null) 209 { 210 client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset); 211 } 212 213 debug writefln("The sending is incomplete, the total length is %d, but actually sent only %d.", client._writingData.length, client._lastWriteOffset); 214 } 215 216 client._writingData.length = 0; 217 client._lastWriteOffset = 0; 218 219 client.writeCallback(-1); // sending is break and incomplete. 220 return; 221 } 222 else 223 { 224 if (errno == EINTR) 225 { 226 continue; 227 } 228 else if (errno == EAGAIN/* || errno == EWOULDBLOCK*/) 229 { 230 if (client._currentEventType != EventType.READWRITE) 231 { 232 client._selector.reregister(client.fd, EventType.READWRITE); 233 client._currentEventType = EventType.READWRITE; 234 } 235 236 client.writeCallback(0); // Wait eventloop notify to continue again; 237 return; 238 } 239 else 240 { 241 client._writingData.length = 0; 242 client._lastWriteOffset = 0; 243 244 client.writeCallback(errno); // Some error. 245 return; 246 } 247 } 248 } 249 250 if (client._lastWriteOffset == client._writingData.length) 251 { 252 if (client._selector.onSendCompleted !is null) 253 { 254 client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset); 255 } 256 257 client._writingData.length = 0; 258 client._lastWriteOffset = 0; 259 } 260 } 261 262 if (client._writeQueue.empty() && (client._writingData.length == 0) && (client._currentEventType == EventType.READWRITE)) 263 { 264 client._selector.reregister(client.fd, EventType.READ); 265 client._currentEventType = EventType.READ; 266 } 267 268 client.writeCallback(0); 269 return; 270 } 271 272 void writeCallback(const int err) // err: 0: OK, -1: client disconnection, 1,2... errno 273 { 274 _writing = false; 275 276 if (_hasWriteEvent) 277 { 278 beginWrite(); 279 } 280 } 281 } 282 283 public: 284 285 version (Windows) 286 { 287 int send(const scope ubyte[] data) 288 { 289 if (data.length == 0) 290 { 291 return -1; 292 } 293 294 if (!isAlive()) 295 { 296 return -2; 297 } 298 299 _selector.iocp_send(_fd, data); 300 301 return 0; 302 } 303 } 304 else 305 { 306 int send(const scope ubyte[] data) 307 { 308 if (data.length == 0) 309 { 310 return -1; 311 } 312 313 if (!isAlive()) 314 { 315 return -2; 316 } 317 318 synchronized (_sendLock.writer) 319 { 320 _writeQueue ~= data; 321 } 322 323 weakup(EventType.WRITE); // First write direct, and when it encounter EAGAIN, it will open the EVENT notification. 324 325 return 0; 326 } 327 } 328 329 void close() 330 { 331 _closing = true; 332 333 _socket.shutdown(SocketShutdown.BOTH); 334 _socket.close(); 335 } 336 337 /* 338 Important: 339 340 The method for emergency shutdown of the application layer is close the socket. 341 When a message that does not meet the requirements is sent to the server, 342 this method should be called to avoid the waste of resources. 343 */ 344 void forceClose() 345 { 346 if (isAlive) 347 { 348 _selector.removeClient(fd); 349 } 350 } 351 352 public: 353 354 string _remoteAddress; 355 int _fd; 356 357 private: 358 359 Selector _selector; 360 shared bool _closing; 361 362 version (Windows) { } else 363 { 364 shared bool _hasReadEvent; 365 shared bool _hasWriteEvent; 366 shared bool _reading; 367 shared bool _writing; 368 369 ByteBuffer _writeQueue; 370 ubyte[] _writingData; 371 size_t _lastWriteOffset; 372 ReadWriteMutex _sendLock; 373 374 EventType _currentEventType; 375 } 376 377 ByteBuffer _receiveBuffer; 378 }