1 module async.net.tcpclient;
2 
3 debug import std.stdio;
4 
5 import core.stdc.errno;
6 import core.stdc.string;
7 import core.thread;
8 import core.sync.rwmutex;
9 
10 import std.socket;
11 import std.conv;
12 import std.string;
13 import std.typecons : Tuple;
14 
15 import async.event.selector;
16 import async.eventloop;
17 import async.net.tcpstream;
18 import async.container.bytebuffer;
19 
20 class TcpClient : TcpStream
21 {
22     this(Selector selector, Socket socket)
23     {
24         super(socket);
25 
26         _selector           = selector;
27         _remoteAddress      = remoteAddress.toString();
28         _fd                 = fd;
29         _closing            = false;
30 
31         version (Windows) { } else
32         {
33             _hasReadEvent       = false;
34             _hasWriteEvent      = false;
35             _reading            = false;
36             _writing            = false;
37             _sendLock           = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_WRITERS);
38             _currentEventType   = EventType.READ;
39             _lastWriteOffset    = 0;
40         }
41     }
42 
43     version (Windows) { } else void weakup(EventType event)
44     {
45         final switch (event)
46         {
47             case EventType.READ:
48                 _hasReadEvent = true;
49                 beginRead();
50                 break;
51             case EventType.WRITE:
52                 _hasWriteEvent = true;
53                 beginWrite();
54                 break;
55             case EventType.ACCEPT:
56             case EventType.READWRITE:
57                 break;
58         }
59     }
60 
61 private:
62 
63     version (Windows) { } else
64     {
65         void beginRead()
66         {
67             _hasReadEvent = false;
68 
69             if (_reading)
70             {
71                 return;
72             }
73 
74             _reading = true;
75             _selector.workerPool.run!read(this);
76         }
77 
78         protected static void read(TcpClient client)
79         {
80             ubyte[]     data;
81             ubyte[4096] buffer;
82 
83             while (!client._closing && client.isAlive)
84             {
85                 long len = client._socket.receive(buffer);
86 
87                 if (len > 0)
88                 {
89                     data ~= buffer[0 .. cast(uint)len];
90 
91                     continue;
92                 }
93                 else if (len == 0)
94                 {
95                     client.readCallback(-1);
96                     return;
97                 }
98                 else
99                 {
100                     if (errno == EINTR)
101                     {
102                         continue;
103                     }
104                     else if (errno == EAGAIN/* || errno == EWOULDBLOCK*/)
105                     {
106                         break;
107                     }
108                     else
109                     {
110                         client.readCallback(errno);
111                         return;
112                     }
113                 }
114             }
115 
116             if ((data.length > 0) && (client._selector.onReceive !is null))
117             {
118                 if (client._selector.codec is null)
119                 {
120                     client._selector.onReceive(client, data);
121                 }
122                 else
123                 {
124                     client._receiveBuffer ~= data;
125                     
126                     label_parseOne:
127                     const Tuple!(long, size_t) ret = client._selector.codec.decode(client._receiveBuffer);
128 
129                     if (ret[0] >= 0)
130                     {
131                         const ubyte[] message = client._receiveBuffer[0 .. ret[0]];
132                         client._receiveBuffer.popFront(ret[0] + ret[1]);
133                         client._selector.onReceive(client, message);
134                         goto label_parseOne;
135                     }
136                     else if (ret[0] == -2) // The magic is error.
137                     {
138                         client.forceClose();
139                         return;
140                     }
141                 }
142             }
143 
144             client.readCallback(0);
145         }
146 
147         void readCallback(const int err)  // err: 0: OK, -1: client disconnection, 1,2... errno
148         {
149             version (linux)
150             {
151                 if (err == -1)
152                 {
153                     _selector.removeClient(fd, err);
154                 }
155             }
156 
157             _reading = false;
158 
159             if (_hasReadEvent)
160             {
161                 beginRead();
162             }
163         }
164 
165         void beginWrite()
166         {
167             _hasWriteEvent = false;
168 
169             if (_writing)
170             {
171                 return;
172             }
173 
174             _writing = true;
175             _selector.workerPool.run!write(this);
176         }
177 
178         protected static void write(TcpClient client)
179         {
180             while (!client._closing && client.isAlive && (!client._writeQueue.empty() || (client._lastWriteOffset > 0)))
181             {
182                 if (client._writingData.length == 0)
183                 {
184                     synchronized (client._sendLock.writer)
185                     {
186                         client._writingData     = client._writeQueue.front;
187                         client._writeQueue.popFront();
188                         client._lastWriteOffset = 0;
189                     }
190                 }
191 
192                 while (!client._closing && client.isAlive && (client._lastWriteOffset < client._writingData.length))
193                 {
194                     long len = client._socket.send(client._writingData[cast(uint)client._lastWriteOffset .. $]);
195 
196                     if (len > 0)
197                     {
198                         client._lastWriteOffset += len;
199 
200                         continue;
201                     }
202                     else if (len == 0)
203                     {
204                         //client._selector.removeClient(fd);
205 
206                         if (client._lastWriteOffset < client._writingData.length)
207                         {
208                             if (client._selector.onSendCompleted !is null)
209                             {
210                                 client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset);
211                             }
212 
213                             debug writefln("The sending is incomplete, the total length is %d, but actually sent only %d.", client._writingData.length, client._lastWriteOffset);
214                         }
215 
216                         client._writingData.length = 0;
217                         client._lastWriteOffset    = 0;
218 
219                         client.writeCallback(-1);  // sending is break and incomplete.
220                         return;
221                     }
222                     else
223                     {
224                         if (errno == EINTR)
225                         {
226                             continue;
227                         }
228                         else if (errno == EAGAIN/* || errno == EWOULDBLOCK*/)
229                         {
230                             if (client._currentEventType != EventType.READWRITE)
231                             {
232                                 client._selector.reregister(client.fd, EventType.READWRITE);
233                                 client._currentEventType = EventType.READWRITE;
234                             }
235 
236                             client.writeCallback(0);  // Wait eventloop notify to continue again;
237                             return;
238                         }
239                         else
240                         {
241                             client._writingData.length = 0;
242                             client._lastWriteOffset    = 0;
243 
244                             client.writeCallback(errno);  // Some error.
245                             return;
246                         }
247                     }
248                 }
249 
250                 if (client._lastWriteOffset == client._writingData.length)
251                 {
252                     if (client._selector.onSendCompleted !is null)
253                     {
254                         client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset);
255                     }
256 
257                     client._writingData.length = 0;
258                     client._lastWriteOffset    = 0;
259                 }
260             }
261 
262             if (client._writeQueue.empty() && (client._writingData.length == 0) && (client._currentEventType == EventType.READWRITE))
263             {
264                 client._selector.reregister(client.fd, EventType.READ);
265                 client._currentEventType = EventType.READ;
266             }
267 
268             client.writeCallback(0);
269             return;
270         }
271 
272         void writeCallback(const int err)  // err: 0: OK, -1: client disconnection, 1,2... errno
273         {
274             _writing = false;
275 
276             if (_hasWriteEvent)
277             {
278                 beginWrite();
279             }
280         }
281     }
282 
283 public:
284 
285     version (Windows)
286     {
287         int send(const scope ubyte[] data)
288         {
289             if (data.length == 0)
290             {
291                 return -1;
292             }
293 
294             if (!isAlive())
295             {
296                 return -2;
297             }
298 
299             _selector.iocp_send(_fd, data);
300 
301             return 0;
302         }
303     }
304     else
305     {
306         int send(const scope ubyte[] data)
307         {
308             if (data.length == 0)
309             {
310                 return -1;
311             }
312 
313             if (!isAlive())
314             {
315                 return -2;
316             }
317 
318             synchronized (_sendLock.writer)
319             {
320                 _writeQueue ~= data;
321             }
322 
323             weakup(EventType.WRITE);  // First write direct, and when it encounter EAGAIN, it will open the EVENT notification.
324 
325             return 0;
326         }
327     }
328 
329     void close()
330     {
331         _closing = true;
332 
333         _socket.shutdown(SocketShutdown.BOTH);
334         _socket.close();
335     }
336 
337     /*
338     Important:
339 
340     The method for emergency shutdown of the application layer is close the socket.
341     When a message that does not meet the requirements is sent to the server,
342     this method should be called to avoid the waste of resources.
343     */
344     void forceClose()
345     {
346         if (isAlive)
347         {
348             _selector.removeClient(fd);
349         }
350     }
351 
352 public:
353 
354     string           _remoteAddress;
355     int              _fd;
356 
357 private:
358 
359     Selector         _selector;
360     shared bool      _closing;
361 
362     version (Windows) { } else
363     {
364         shared bool      _hasReadEvent;
365         shared bool      _hasWriteEvent;
366         shared bool      _reading;
367         shared bool      _writing;
368 
369         ByteBuffer       _writeQueue;
370         ubyte[]          _writingData;
371         size_t           _lastWriteOffset;
372         ReadWriteMutex   _sendLock;
373 
374         EventType        _currentEventType;
375     }
376 
377     ByteBuffer       _receiveBuffer;
378 }