1 module async.net.tcpclient;
2 
3 debug import std.stdio;
4 
5 import core.stdc.errno;
6 import core.stdc.string;
7 import core.thread;
8 import core.sync.rwmutex;
9 
10 import std.socket;
11 import std.conv;
12 import std.string;
13 
14 import async.event.selector;
15 import async.eventloop;
16 import async.net.tcpstream;
17 import async.container.bytebuffer;
18 
19 class TcpClient : TcpStream
20 {
21     this(Selector selector, Socket socket)
22     {
23         super(socket);
24 
25         _selector           = selector;
26         _hasReadEvent       = false;
27         _hasWriteEvent      = false;
28         _reading            = false;
29         _writing            = false;
30         _sendLock           = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_WRITERS);
31 
32         _remoteAddress      = remoteAddress.toString();
33         _fd                 = fd;
34         _currentEventType   = EventType.READ;
35         _closing            = false;
36 
37         _lastWriteOffset    = 0;
38     }
39 
40     void weakup(EventType event)
41     {
42         final switch (event)
43         {
44         case EventType.READ:
45             _hasReadEvent = true;
46             beginRead();
47             break;
48         case EventType.WRITE:
49             _hasWriteEvent = true;
50             beginWrite();
51             break;
52         case EventType.ACCEPT:
53         case EventType.READWRITE:
54             break;
55         }
56     }
57 
58 private:
59 
60     void beginRead()
61     {
62         _hasReadEvent = false;
63 
64         if (_reading)
65         {
66             return;
67         }
68 
69         _reading = true;
70         _selector.workerPool.run!read(this);
71     }
72 
73     protected static void read(TcpClient client)
74     {
75         ubyte[]     data;
76         ubyte[4096] buffer;
77 
78         while (!client._closing && client.isAlive)
79         {
80             long len = client._socket.receive(buffer);
81 
82             if (len > 0)
83             {
84                 data ~= buffer[0 .. cast(uint)len];
85 
86                 continue;
87             }
88             else if (len == 0)
89             {
90                 client.readCallback(-1);
91                 return;
92             }
93             else
94             {
95                 if (errno == EINTR)
96                 {
97                     continue;
98                 }
99                 else if (errno == EAGAIN || errno == EWOULDBLOCK)
100                 {
101                     break;
102                 }
103                 else
104                 {
105                     client.readCallback(errno);
106                     return;
107                 }
108             }
109         }
110 
111         if ((data.length > 0) && (client._selector.onReceive !is null))
112         {
113             client._selector.onReceive(client, data);
114         }
115 
116         client.readCallback(0);
117     }
118 
119     void readCallback(int err)  // err: 0: OK, -1: client disconnection, 1,2... errno
120     {
121         version (linux)
122         {
123             if (err == -1)
124             {
125                 _selector.removeClient(fd, err);
126             }
127         }
128 
129         _reading = false;
130 
131         if (_hasReadEvent)
132         {
133             beginRead();
134         }
135     }
136 
137     void beginWrite()
138     {
139         _hasWriteEvent = false;
140 
141         if (_writing)
142         {
143             return;
144         }
145 
146         _writing = true;
147         _selector.workerPool.run!write(this);
148     }
149 
150     protected static void write(TcpClient client)
151     {
152         while (!client._closing && client.isAlive && (!client._writeQueue.empty() || (client._lastWriteOffset > 0)))
153         {
154             if (client._writingData.length == 0)
155             {
156                 synchronized (client._sendLock.writer)
157                 {
158                     client._writingData     = client._writeQueue.front;
159                     client._writeQueue.popFront();
160                     client._lastWriteOffset = 0;
161                 }
162             }
163 
164             while (!client._closing && client.isAlive && (client._lastWriteOffset < client._writingData.length))
165             {
166                 long len = client._socket.send(client._writingData[cast(uint)client._lastWriteOffset .. $]);
167 
168                 if (len > 0)
169                 {
170                     client._lastWriteOffset += len;
171 
172                     continue;
173                 }
174                 else if (len == 0)
175                 {
176                     //client._selector.removeClient(fd);
177 
178                     if (client._lastWriteOffset < client._writingData.length)
179                     {
180                         if (client._selector.onSendCompleted !is null)
181                         {
182                             client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset);
183                         }
184 
185                         debug writefln("The sending is incomplete, the total length is %d, but actually sent only %d.", client._writingData.length, client._lastWriteOffset);
186                     }
187 
188                     client._writingData.length = 0;
189                     client._lastWriteOffset    = 0;
190 
191                     client.writeCallback(-1);  // sending is break and incomplete.
192                     return;
193                 }
194                 else
195                 {
196                     if (errno == EINTR)
197                     {
198                         continue;
199                     }
200                     else if (errno == EAGAIN || errno == EWOULDBLOCK)
201                     {
202                         if (client._currentEventType != EventType.READWRITE)
203                         {
204                             client._selector.reregister(client.fd, EventType.READWRITE);
205                             client._currentEventType = EventType.READWRITE;
206                         }
207 
208                         client.writeCallback(0);  // Wait eventloop notify to continue again;
209                         return;
210                     }
211                     else
212                     {
213                         client._writingData.length = 0;
214                         client._lastWriteOffset    = 0;
215 
216                         client.writeCallback(errno);  // Some error.
217                         return;
218                     }
219                 }
220             }
221 
222             if (client._lastWriteOffset == client._writingData.length)
223             {
224                 if (client._selector.onSendCompleted !is null)
225                 {
226                     client._selector.onSendCompleted(client._fd, client._remoteAddress, client._writingData, cast(size_t)client._lastWriteOffset);
227                 }
228 
229                 client._writingData.length = 0;
230                 client._lastWriteOffset    = 0;
231             }
232         }
233 
234         if (client._writeQueue.empty() && (client._writingData.length == 0) && (client._currentEventType == EventType.READWRITE))
235         {
236             client._selector.reregister(client.fd, EventType.READ);
237             client._currentEventType = EventType.READ;
238         }
239 
240         client.writeCallback(0);
241         return;
242     }
243 
244     void writeCallback(int err)  // err: 0: OK, -1: client disconnection, 1,2... errno
245     {
246         _writing = false;
247 
248         if (_hasWriteEvent)
249         {
250             beginWrite();
251         }
252     }
253 
254 public:
255 
256     int send(ubyte[] data)
257     {
258         if (data.length == 0)
259         {
260             return -1;
261         }
262 
263         if (!isAlive())
264         {
265             return -2;
266         }
267 
268         synchronized (_sendLock.writer)
269         {
270             _writeQueue ~= data;
271         }
272 
273         weakup(EventType.WRITE);  // First write direct, and when it encounter EAGAIN, it will open the EVENT notification.
274 
275         return 0;
276     }
277 
278     void close()
279     {
280         _closing = true;
281 
282         _socket.shutdown(SocketShutdown.BOTH);
283         _socket.close();
284     }
285 
286     /*
287     Important:
288 
289     The method for emergency shutdown of the application layer is close the socket.
290     When a message that does not meet the requirements is sent to the server,
291     this method should be called to avoid the waste of resources.
292     */
293     void forceClose()
294     {
295         if (isAlive)
296         {
297             _selector.removeClient(fd);
298         }
299     }
300 
301 public:
302 
303     string           _remoteAddress;
304     int              _fd;
305 
306 private:
307 
308     Selector         _selector;
309     shared bool      _hasReadEvent;
310     shared bool      _hasWriteEvent;
311     shared bool      _reading;
312     shared bool      _writing;
313 
314     ByteBuffer       _writeQueue;
315     ubyte[]          _writingData;
316     size_t           _lastWriteOffset;
317     ReadWriteMutex   _sendLock;
318 
319     EventType        _currentEventType;
320     shared bool      _closing;
321 }