1 ///
2 module async.tcp;
3 import std.traits : isPointer;
4 import async.types;
5 import async.events;
6 import std.typecons : Tuple;
7 
8 /// Wraps a TCP stream between 2 network adapters, using a custom handler to
9 /// signal related events. Many of these objects can be active concurrently
10 /// in a thread if the event loop is running and the handlers do not block.
11 final class AsyncTCPConnection
12 {
13 package:
14 	EventLoop m_evLoop;
15 
16 private:
17 	NetworkAddress m_peer;
18 
19 nothrow:
20 	fd_t m_socket;
21 	fd_t m_preInitializedSocket;
22 	bool m_noDelay;
23 	bool m_inbound;
24 public:
25 	///
26 	this(EventLoop evl, fd_t preInitializedSocket = fd_t.init)
27 	in { assert(evl !is null); }
28 	body {
29 		m_evLoop = evl;
30 		m_preInitializedSocket = preInitializedSocket;
31 	}
32 
33 	mixin DefStatus;
34 
35 	/// Returns false if the connection has gone.
36 	@property bool isConnected() const {
37 		return m_socket != fd_t.init;
38 	}
39 
40 	/// Returns true if this connection was accepted by an AsyncTCPListener instance.
41 	@property bool inbound() const {
42 		return m_inbound;
43 	}
44 
45 	/// Disables(true)/enables(false) nagle's algorithm (default:enabled).
46 	@property void noDelay(bool b)
47 	{
48 		if (m_socket == fd_t.init)
49 			m_noDelay = b;
50 		else
51 			setOption(TCPOption.NODELAY, true);
52 	}
53 
54 	/// Changes the default OS configurations for this underlying TCP Socket.
55 	bool setOption(T)(TCPOption op, in T val)
56 	in { assert(isConnected, "No socket to operate on"); }
57 	body {
58 		return m_evLoop.setOption(m_socket, op, val);
59 	}
60 
61 	/// Returns the OS-specific structure of the internet address
62 	/// of the remote network adapter
63 	@property NetworkAddress peer() const
64 	{
65 		return m_peer;
66 	}
67 
68 	/// Returns the OS-specific structure of the internet address
69 	/// for the local end of the connection.
70 	@property NetworkAddress local()
71 	in {
72 		assert(isConnected && m_peer != NetworkAddress.init, "Cannot get local address from a non-connected socket");
73 	}
74 	body {
75 		return m_evLoop.localAddr(m_socket, m_peer.ipv6);
76 	}
77 
78 	/// Sets the remote address as an OS-specific structure (only usable before connecting).
79 	@property void peer(NetworkAddress addr)
80 	in {
81 		assert(!isConnected, "Cannot change remote address on a connected socket");
82 		assert(addr != NetworkAddress.init);
83 	}
84 	body {
85 		m_peer = addr;
86 	}
87 
88 	/// (Blocking) Resolves the specified host and resets the peer to this address.
89 	/// Use AsyncDNS for a non-blocking resolver. (only usable before connecting).
90 	typeof(this) host(string hostname, size_t port)
91 	in {
92 		assert(!isConnected, "Cannot change remote address on a connected socket");
93 	}
94 	body {
95 		m_peer = m_evLoop.resolveHost(hostname, cast(ushort) port);
96 		return this;
97 	}
98 
99 	/// Sets the peer to the specified IP address and port. (only usable before connecting).
100 	typeof(this) ip(string ip, size_t port)
101 	in {
102 		assert(!isConnected, "Cannot change remote address on a connected socket");
103 	}
104 	body {
105 		m_peer = m_evLoop.resolveIP(ip, cast(ushort) port);
106 		return this;
107 	}
108 
109 	/// Starts the connection by registering the associated callback handler in the
110 	/// underlying OS event loop.
111 	bool run(void delegate(TCPEvent) del) {
112 		TCPEventHandler handler;
113 		handler.del = del;
114 		handler.conn = this;
115 		return run(handler);
116 	}
117 
118 	///
119 	bool run(TCPEventHandler del)
120 	in { assert(!isConnected); }
121 	body {
122 		m_socket = m_evLoop.run(this, del);
123 		if (m_socket == 0)
124 			return false;
125 		else
126 			return true;
127 	}
128 
129 	/// Receive data from the underlying stream. To be used when TCPEvent.READ is received by the
130 	/// callback handler. IMPORTANT: This must be called until is returns a lower value than the buffer!
131 	final pragma(inline, true)
132 	long recv(ref ubyte[] ub)
133 	//in { assert(isConnected, "No socket to operate on"); }
134 	//body
135 	{
136 		return m_evLoop.recv(m_socket, ub);
137 	}
138 
139 	/// Send data through the underlying stream by moving it into the OS buffer.
140 	final pragma(inline, true)
141 	long send(in ubyte[] ub)
142 	//in { assert(isConnected, "No socket to operate on"); }
143 	//body
144 	{
145 		long ret = m_evLoop.send(m_socket, ub);
146 		version(Posix)
147 			if (m_evLoop.status.code == Status.ASYNC)
148 				this.writeBlocked = true;
149 		return ret;
150 	}
151 
152 	/// Removes the connection from the event loop, closing it if necessary, and
153 	/// cleans up the underlying resources.
154 	bool kill(bool forced = false)
155 	in { assert(isConnected); }
156 	body {
157 		bool ret = m_evLoop.kill(this, forced);
158 		scope(exit) m_socket = 0;
159 		return ret;
160 	}
161 
162 	@property fd_t socket() const {
163 		return m_socket;
164 	}
165 
166     // [shove]
167 	@property StatusInfo status() const
168 	{
169 		return m_evLoop.status;
170 	}
171 
172 	@property string error() const
173 	{
174 		return m_evLoop.error;
175 	}
176 	
177 package:
178 	mixin COSocketMixins;
179 
180 	@property void inbound(bool b) {
181 		m_inbound = b;
182 	}
183 
184 	@property bool noDelay() const
185 	{
186 		return m_noDelay;
187 	}
188 
189 	@property void socket(fd_t sock) {
190 		m_socket = sock;
191 	}
192 
193 	@property fd_t preInitializedSocket() const {
194 		return m_preInitializedSocket;
195 	}
196 }
197 
198 /// Accepts connections on a single IP:PORT tuple by sending a new inbound AsyncTCPConnection
199 /// object to the handler for every newly completed handshake.
200 ///
201 /// Note: If multiple threads are listening to the same IP:PORT tuple, the connections will
202 /// be distributed evenly between them. However, this behavior on Windows is not implemented yet.
203 final class AsyncTCPListener
204 {
205 private:
206 nothrow:
207 	EventLoop m_evLoop;
208 	fd_t m_socket;
209 	NetworkAddress m_local;
210 	bool m_noDelay;
211 	bool m_started;
212 
213 public:
214 
215 	///
216 	this(EventLoop evl, fd_t sock = fd_t.init) { m_evLoop = evl; m_socket = sock; }
217 
218 	mixin DefStatus;
219 
220 	/// Sets the default value for nagle's algorithm on new connections.
221 	@property void noDelay(bool b)
222 	in { assert(!m_started, "Cannot set noDelay on a running object."); }
223 	body {
224 		m_noDelay = b;
225 	}
226 
227 	/// Returns the local internet address as an OS-specific structure.
228 	@property NetworkAddress local() const
229 	{
230 		return m_local;
231 	}
232 
233 	/// Sets the local internet address as an OS-specific structure.
234 	@property void local(NetworkAddress addr)
235 	in { assert(!m_started, "Cannot rebind a listening socket"); }
236 	body {
237 		m_local = addr;
238 	}
239 
240 	/// Sets the local listening interface to the specified hostname/port.
241 	typeof(this) host(string hostname, size_t port)
242 	in { assert(!m_started, "Cannot rebind a listening socket"); }
243 	body {
244 		m_local = m_evLoop.resolveHost(hostname, cast(ushort) port);
245 		return this;
246 	}
247 
248 	/// Sets the local listening interface to the specified ip/port.
249 	typeof(this) ip(string ip, size_t port)
250 	in { assert(!m_started, "Cannot rebind a listening socket"); }
251 	body {
252 		m_local = m_evLoop.resolveIP(ip, cast(ushort) port);
253 		return this;
254 	}
255 
256 	/// Starts accepting connections by registering the given handler with the underlying OS event.
257 	bool run(void delegate(TCPEvent) delegate(AsyncTCPConnection) del) {
258 		TCPAcceptHandler handler;
259 		handler.ctxt = this;
260 		handler.del = del;
261 		return run(handler);
262 	}
263 
264 	private bool run(TCPAcceptHandler del)
265 	in {
266 		assert(m_local != NetworkAddress.init, "Cannot bind without an address. Please run .host() or .ip()");
267 	}
268 	body {
269 		m_socket = m_evLoop.run(this, del);
270 		if (m_socket == fd_t.init)
271 			return false;
272 		else {
273 			if (m_local.port == 0)
274 				m_local = m_evLoop.localAddr(m_socket, m_local.ipv6);
275 			m_started = true;
276 			return true;
277 		}
278 	}
279 
280 	/// Use to implement distributed servicing of connections
281 	@property fd_t socket() const {
282 		return m_socket;
283 	}
284 
285 	/// Stops accepting connections and cleans up the underlying OS resources.
286 	bool kill()
287 	in { assert(m_socket != 0); }
288 	body {
289 		bool ret = m_evLoop.kill(this);
290 		if (ret)
291 			m_started = false;
292 		return ret;
293 	}
294 
295 package:
296 	version(Posix) mixin EvInfoMixins;
297 	version(Distributed) version(Windows) mixin TCPListenerDistMixins;
298 	@property bool noDelay() const
299 	{
300 		return m_noDelay;
301 	}
302 }
303 
304 package struct TCPEventHandler {
305 	AsyncTCPConnection conn;
306 
307 	/// Use getContext/setContext to persist the context in each activity. Using AsyncTCPConnection in args
308 	/// allows the EventLoop implementation to create and pass a new object, which is necessary for listeners.
309 	void delegate(TCPEvent) del;
310 
311 	void opCall(TCPEvent ev){
312 		if (conn is null || !conn.isConnected) return; //, "Connection was disposed before shutdown could be completed");
313 		del(ev);
314 		return;
315 	}
316 }
317 
318 package struct TCPAcceptHandler {
319 	AsyncTCPListener ctxt;
320 	void delegate(TCPEvent) delegate(AsyncTCPConnection) del;
321 
322 	TCPEventHandler opCall(AsyncTCPConnection conn){ // conn is null = error!
323 		assert(ctxt !is null);
324 
325 		void delegate(TCPEvent) ev_handler = del(conn);
326 		TCPEventHandler handler;
327 		handler.del = ev_handler;
328 		handler.conn = conn;
329 		return handler;
330 	}
331 }
332 
333 ///
334 enum TCPEvent : char {
335 	ERROR = 0, /// The connection will be forcefully closed, this is debugging information
336 	CONNECT, /// indicates write will not block, although recv may or may not have data
337 	READ, /// called once when new bytes are in the buffer
338 	WRITE, /// only called when send returned Status.ASYNC
339 	CLOSE /// The connection is being shutdown
340 }
341 
342 ///
343 enum TCPOption : char {
344 	NODELAY = 0,		/// Don't delay send to coalesce packets
345 	REUSEADDR = 1, ///
346 	REUSEPORT, ///
347 	CORK, ///
348 	LINGER, ///
349 	BUFFER_RECV, ///
350 	BUFFER_SEND, ///
351 	TIMEOUT_RECV, ///
352 	TIMEOUT_SEND, ///
353 	TIMEOUT_HALFOPEN, ///
354 	KEEPALIVE_ENABLE, ///
355 	KEEPALIVE_DEFER,	/// Start keeplives after this period
356 	KEEPALIVE_COUNT,	/// Number of keepalives before death
357 	KEEPALIVE_INTERVAL,	/// Interval between keepalives
358 	DEFER_ACCEPT, ///
359 	QUICK_ACK,			/// Bock/reenable quick ACKs.
360 	CONGESTION ///
361 }