1 ///
2 module async.bufferedtcp;
3 
4 import std.algorithm : copy;
5 import std.array     : array, empty, front, popFront;
6 import std.range     : isInputRange, isOutputRange;
7 
8 import memutils.circularbuffer : CircularBuffer;
9 
10 import async.events;
11 import async.tcp    : AsyncTCPConnection, TCPEvent, TCPEventHandler;
12 import async.types  : StatusInfo;
13 
14 ///
15 final class BufferedTCPConnection(size_t size = 4092)
16 {
17     ///
18     alias OnEvent = void delegate(BufferedTCPConnection!size conn);
19     ///
20     alias OnRead  =
21         void delegate(BufferedTCPConnection!size conn, in ubyte[] msg);
22 
23     private
24     {
25         AsyncTCPConnection asyncConn;
26 
27         OnEvent       onConnectCb;
28         OnEvent       onCloseCb;
29         OnEvent       onErrorCb;
30         OnReadInfo[]  onReadCbs;
31         OnWriteInfo[] onWriteCbs;
32 
33         CircularBuffer!(ubyte, size) readBuffer;
34         CircularBuffer!(ubyte, size) writeBuffer;
35         ubyte[] workBuffer = new ubyte[size];
36     }
37 
38     ///
39     this(
40         EventLoop evl,
41         fd_t preInitializedSocket = fd_t.init,
42         in OnEvent onConnectCb = null,
43         in OnEvent onCloseCb   = null,
44         in OnEvent onErrorCb   = null)
45     in
46     {
47         assert(evl !is null);
48     }
49     body
50     {
51         asyncConn = new AsyncTCPConnection(evl, preInitializedSocket);
52 
53         this.onConnectCb = onConnectCb;
54         this.onCloseCb   = onCloseCb;
55         this.onErrorCb   = onErrorCb;
56     }
57 
58     ///
59     this(
60         AsyncTCPConnection conn,
61         in OnEvent onConnectCb = null,
62         in OnEvent onCloseCb   = null,
63         in OnEvent onErrorCb   = null)
64     in
65     {
66         assert(conn !is null);
67     }
68     body
69     {
70         asyncConn = conn;
71 
72         this.onConnectCb = onConnectCb;
73         this.onCloseCb   = onCloseCb;
74         this.onErrorCb   = onErrorCb;
75     }
76 
77     ///
78     @property bool hasError() const
79     {
80         return asyncConn.hasError;
81     }
82 
83     /**
84      * The status code is Status.ASYNC if the call is delayed (yield),
85      * Status.ABORT if an unrecoverable socket/fd error occurs (throw), or
86      * Status.ERROR if an internal error occured (assert).
87      */
88     @property StatusInfo status() const
89     {
90         return asyncConn.status;
91     }
92 
93     /**
94      * Returns: Human-readable error message from the underlying operating
95      *          system.
96      */
97     @property string error() const
98     {
99         return asyncConn.error;
100     }
101 
102     ///
103     @property bool isConnected() const nothrow
104     {
105         return asyncConn.isConnected;
106     }
107 
108     /**
109      * Returns: true if this connection was accepted by an AsyncTCPListener
110      *          instance.
111      */
112     @property bool inbound() const
113     {
114         return asyncConn.inbound;
115     }
116 
117     /// Disables(true)/enables(false) nagle's algorithm (default:enabled).
118     @property void noDelay(bool b)
119     {
120         asyncConn.noDelay(b);
121     }
122 
123     /// Changes the default OS configurations for this underlying TCP Socket.
124     bool setOption(T)(TCPOption op, in T val)
125     {
126         return asyncConn.setOption(op, val);
127     }
128 
129     /// Returns the OS-specific structure of the internet address
130     /// of the remote network adapter
131     @property NetworkAddress peer() const
132     {
133         return asyncConn.peer;
134     }
135 
136     /// Returns the OS-specific structure of the internet address
137     /// for the local end of the connection.
138     @property NetworkAddress local()
139     {
140         return asyncConn.local;
141     }
142 
143     /// Sets the remote address as an OS-specific structure (only usable before connecting).
144     @property void peer(NetworkAddress addr)
145     {
146         asyncConn.peer = addr;
147     }
148 
149     /// (Blocking) Resolves the specified host and resets the peer to this address.
150     /// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
151     typeof(this) host(string hostname, size_t port)
152     {
153         asyncConn.host(hostname, port);
154         return this;
155     }
156 
157     /// Sets the peer to the specified IP address and port. (only usable before connecting).
158     typeof(this) ip(string ip, size_t port)
159     {
160         asyncConn.ip(ip, port);
161         return this;
162     }
163 
164     /// Starts the connection by registering the associated callback handler in the
165     /// underlying OS event loop.
166     bool run(void delegate(TCPEvent) del)
167     {
168         TCPEventHandler handler;
169         handler.conn = asyncConn;
170         handler.del  = del;
171         return run(handler);
172     }
173 
174     private bool run(TCPEventHandler del)
175     {
176         return asyncConn.run(del);
177     }
178 
179     /**
180      * Receive data from the underlying stream. To be used when TCPEvent.READ
181      * is received by the callback handler.
182      * IMPORTANT: This must be called until is returns a lower value than the
183      * buffer!
184      */
185     private long recv()
186     in
187     {
188         assert(isConnected, "No socket to operate on");
189     }
190     body
191     {
192         long cnt = asyncConn.recv(workBuffer);
193         if (cnt > 0)
194             copy(workBuffer[0..cnt], &readBuffer);
195         return cnt;
196     }
197 
198     /**
199      * Send data through the underlying stream by moving it into the OS buffer.
200      */
201     private long send()
202     in
203     {
204         assert(isConnected, "No socket to operate on");
205     }
206     body
207     {
208         copy(writeBuffer[], workBuffer);
209         return asyncConn.send(workBuffer[0..writeBuffer.length].array);
210     }
211 
212     /**
213      * Removes the connection from the event loop, closing it if necessary, and
214      * cleans up the underlying resources.
215      */
216     private bool kill(in bool forced = false)
217     in
218     {
219         assert(isConnected);
220     }
221     body
222     {
223         bool ret = asyncConn.kill(forced);
224         return ret;
225     }
226 
227     ///
228     void read(in size_t len, in OnRead onReadCb)
229     {
230         onReadCbs ~= OnReadInfo(len, onReadCb);
231     }
232 
233     // Note: All buffers must be empty when returning from TCPEvent.READ
234     private void onRead()
235     {
236         long read;
237         do read = recv();
238         while (read == readBuffer.capacity);
239 
240         if (onReadCbs.empty)
241             return;
242 
243         foreach (ref info; onReadCbs) with (info)
244             if (readBuffer.length >= len)
245             {
246                 cb(this, readBuffer[0..len].array);
247                 readBuffer.popFrontN(len);
248                 onReadCbs.popFront();
249             }
250             else
251                 break;
252     }
253 
254     ///
255     void write(R)(in R msg, in size_t len, in OnEvent cb = null)
256     if (isInputRange!R)
257     {
258         writeBuffer.put(msg[0..len]);
259 
260         onWriteCbs ~= OnWriteInfo(len, cb);
261         onWrite();
262     }
263 
264     private void onWrite()
265     {
266         if (writeBuffer.length == 0)
267             return;
268 
269         long sent = send();
270         writeBuffer.popFrontN(sent);
271 
272         foreach (ref info; onWriteCbs) with (info)
273             if (sent >= len)
274             {
275                 cb(this);
276                 onWriteCbs.popFront();
277                 sent -= len;
278             }
279             else
280                 break;
281     }
282 
283     private void onConnect()
284     {
285         if (onConnectCb !is null)
286             onConnectCb(this);
287     }
288 
289     private void onError()
290     {
291         if (onErrorCb !is null)
292             onErrorCb(this);
293     }
294 
295     ///
296     void close()
297     {
298         kill();
299         onClose();
300     }
301 
302     private void onClose()
303     {
304         if (onCloseCb !is null)
305             onCloseCb(this);
306     }
307 
308     ///
309     void handle(TCPEvent ev)
310     {
311         final switch (ev)
312         {
313             case TCPEvent.CONNECT:
314                 onConnect();
315                 break;
316             case TCPEvent.READ:
317                 onRead();
318                 break;
319             case TCPEvent.WRITE:
320                 onWrite();
321                 break;
322             case TCPEvent.CLOSE:
323                 onClose();
324                 break;
325             case TCPEvent.ERROR:
326                 onError();
327                 break;
328         }
329     }
330 
331     private struct OnReadInfo
332     {
333         const size_t len;
334         const OnRead cb;
335     }
336 
337     private struct OnWriteInfo
338     {
339         const size_t len;
340         const OnEvent cb;
341     }
342 }