1 module async.windows;
2 
3 version (Windows):
4 
5 import core.atomic;
6 import core.thread : Fiber;
7 import async.types;
8 import std.container : Array;
9 import std..string : toStringz;
10 import std.conv : to;
11 import std.datetime : Duration, msecs, seconds;
12 import std.algorithm : min;
13 import std.exception;
14 import async.internals.win32;
15 import async.internals.logging;
16 import std.traits : isIntegral;
17 import std.typecons : Tuple, tuple;
18 import std.utf : toUTFz;
19 import core.sync.mutex;
20 import async.events;
21 import memutils.utils;
22 import memutils.hashmap;
23 import memutils.vector;
24 pragma(lib, "ws2_32");
25 pragma(lib, "ole32");
26 alias fd_t = SIZE_T;
27 alias error_t = EWIN;
28 
29 //todo :  see if new connections with SO_REUSEADDR are evenly distributed between threads
30 
31 
32 package struct EventLoopImpl {
33 	pragma(msg, "Using Windows message-based notifications and alertable IO for events");
34 
35 private:
36 	HashMap!(fd_t, TCPAcceptHandler) m_connHandlers; // todo: Change this to an array
37 	HashMap!(fd_t, TCPEventHandler) m_tcpHandlers;
38 	HashMap!(fd_t, TimerHandler) m_timerHandlers;
39 	HashMap!(fd_t, UDPHandler) m_udpHandlers;
40 	HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
41 	HashMap!(uint, DWFolderWatcher) m_dwFolders;
42 	HashMap!(fd_t, tcp_keepalive)* kcache;
43 	~this() { kcache.destroy(); }
44 nothrow:
45 private:
46 	struct TimerCache {
47 		TimerHandler cb;
48 		fd_t fd;
49 	}
50 	TimerCache m_timer;
51 
52 	EventLoop m_evLoop;
53 	bool m_started;
54 	wstring m_window;
55 	HWND m_hwnd;
56 	DWORD m_threadId;
57 	ushort m_instanceId;
58 	StatusInfo m_status;
59 	error_t m_error = EWIN.WSA_OK;
60 	__gshared Mutex gs_mtx;
61 
62 	HANDLE[] m_waitObjects;
63 	AsyncOverlapped*[AsyncSocket] m_pendingConnects;
64 	bool[AsyncOverlapped*] m_pendingAccepts;
65 
66 	@property HANDLE pendingConnectEvent()
67 	{ return m_waitObjects[0]; }
68 
69 	@property HANDLE pendingAcceptEvent()
70 	{ return m_waitObjects[1]; }
71 
72 	AsyncAcceptRequest.Queue  m_completedSocketAccepts;
73 	AsyncReceiveRequest.Queue m_completedSocketReceives;
74 	AsyncSendRequest.Queue    m_completedSocketSends;
75 package:
76 	@property bool started() const {
77 		return m_started;
78 	}
79 	bool init(EventLoop evl)
80 	in { assert(!m_started); }
81 	body
82 	{
83 		try if (!gs_mtx)
84 			gs_mtx = new Mutex; catch (Throwable) {}
85 		static ushort j;
86 		assert (j == 0, "Current implementation is only tested with 1 event loop per thread. There are known issues with signals on linux.");
87 		j += 1;
88 		m_status = StatusInfo.init;
89 
90 		import core.thread;
91 		//try Thread.getThis().priority = Thread.PRIORITY_MAX;
92 		//catch (Exception e) { assert(false, "Could not set thread priority"); }
93 		SetThreadPriority(GetCurrentThread(), 31);
94 		m_evLoop = evl;
95 		shared static ushort i;
96 		m_instanceId = i;
97 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
98 		wstring inststr;
99 		import std.conv : to;
100 		try { inststr = m_instanceId.to!wstring; }
101 		catch (Exception e) {
102 			return false;
103 		}
104 		m_window = "VibeWin32MessageWindow" ~ inststr;
105 		wstring classname = "VibeWin32MessageWindow" ~ inststr;
106 
107 		LPCWSTR wnz;
108 		LPCWSTR clsn;
109 		try {
110 			wnz = cast(LPCWSTR) m_window.toUTFz!(immutable(wchar)*);
111 			clsn = cast(LPCWSTR) classname.toUTFz!(immutable(wchar)*);
112 		} catch (Exception e) {
113 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
114 			return false;
115 		}
116 
117 		m_threadId = GetCurrentThreadId();
118 		WNDCLASSW wc;
119 		wc.lpfnWndProc = &wndProc;
120 		wc.lpszClassName = clsn;
121 		RegisterClassW(&wc);
122 		m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE,
123 		                       cast(HMENU) null, null, null);
124 		static if (LOG) try log("Window registered: " ~ m_hwnd.to!string); catch (Throwable) {}
125 		auto ptr = cast(ULONG_PTR)cast(void*)&this;
126 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr);
127 		assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this );
128 		WSADATA wd;
129 		m_error = cast(error_t) WSAStartup(0x0202, &wd);
130 		if (m_error == EWIN.WSA_OK)
131 			m_status.code = Status.OK;
132 		else {
133 			m_status.code = Status.ABORT;
134 			static if(LOG) log(m_status);
135 			return false;
136 		}
137 		assert(wd.wVersion == 0x0202);
138 
139 		auto dummySocket = socket(AF_INET6, SOCK_STREAM, 0);
140 		if (dummySocket == INVALID_SOCKET) return false;
141 		scope (exit) closesocket(dummySocket);
142 
143 		DWORD bytesReturned;
144 
145 		if (WSAIoctl(dummySocket,
146 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
147 		             &WSAID_ACCEPTEX, GUID.sizeof,
148 		             &AcceptEx, AcceptEx.sizeof,
149 		             &bytesReturned,
150 		             null, null) == SOCKET_ERROR) {
151 			m_error = WSAGetLastErrorSafe();
152 			m_status.code = Status.ABORT;
153 			return false;
154 		}
155 
156 		if (WSAIoctl(dummySocket,
157 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
158 		             &WSAID_GETACCEPTEXSOCKADDRS, GUID.sizeof,
159 		             &GetAcceptExSockaddrs, GetAcceptExSockaddrs.sizeof,
160 		             &bytesReturned,
161 		             null, null) == SOCKET_ERROR) {
162 			m_error = WSAGetLastErrorSafe();
163 			m_status.code = Status.ABORT;
164 			return false;
165 		}
166 
167 		if (WSAIoctl(dummySocket,
168 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
169 		             &WSAID_CONNECTEX, GUID.sizeof,
170 		             &ConnectEx, ConnectEx.sizeof,
171 		             &bytesReturned,
172 		             null, null) == SOCKET_ERROR) {
173 			m_error = WSAGetLastErrorSafe();
174 			m_status.code = Status.ABORT;
175 			return false;
176 		}
177 
178 		if (WSAIoctl(dummySocket,
179 		             SIO_GET_EXTENSION_FUNCTION_POINTER,
180 		             &WSAID_DISCONNECTEX, GUID.sizeof,
181 		             &DisconnectEx, DisconnectEx.sizeof,
182 		             &bytesReturned,
183 		             null, null) == SOCKET_ERROR) {
184 			m_error = WSAGetLastErrorSafe();
185 			m_status.code = Status.ABORT;
186 			return false;
187 		}
188 
189 		// Event for pending ConnectEx requests
190 		m_waitObjects ~= CreateEvent(null, false, false, null);
191 		// Event for pending AcceptEx requests
192 		m_waitObjects ~= CreateEvent(null, false, false, null);
193 
194 		m_started = true;
195 		return true;
196 	}
197 
198 	// todo: find where to call this
199 	void exit() {
200 		cast(void)PostThreadMessageW(m_threadId, WM_QUIT, 0, 0);
201 	}
202 
203 	@property StatusInfo status() const {
204 		return m_status;
205 	}
206 
207 	@property string error() const {
208 		string* ptr;
209 		string pv = ((ptr = (m_error in EWSAMessages)) !is null) ? *ptr : string.init;
210 		return pv;
211 	}
212 
213 	bool loop(Duration timeout = 0.seconds)
214 	in {
215 		assert(Fiber.getThis() is null);
216 		assert(m_started);
217 	}
218 	body {
219 		DWORD msTimeout;
220 
221 		if (timeout == -1.seconds)
222 			msTimeout = DWORD.max;
223 		else msTimeout = cast(DWORD) min(timeout.total!"msecs", DWORD.max);
224 
225 		/*
226 		 * Waits until one or all of the specified objects are in the signaled state
227 		 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms684245%28v=vs.85%29.aspx
228 		*/
229 		m_status = StatusInfo.init;
230 		DWORD signal = MsgWaitForMultipleObjectsEx(
231 			cast(DWORD) m_waitObjects.length,
232 			m_waitObjects.ptr,
233 			msTimeout,
234 			QS_ALLEVENTS,
235 			MWMO_ALERTABLE | MWMO_INPUTAVAILABLE		// MWMO_ALERTABLE: Wakes up to execute overlapped hEvent (i/o completion)
236 			// MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
237 			);
238 
239 		auto errors =
240 		[ tuple(WAIT_FAILED, Status.EVLOOP_FAILURE) ];	/* WAIT_FAILED: Failed to call MsgWait..() */
241 
242 		if (signal == WAIT_TIMEOUT) {
243 			return true;
244 		}
245 
246 		if (signal == WAIT_IO_COMPLETION) {
247 			if (m_status.code != Status.OK) return false;
248 
249 			foreach (request; m_completedSocketReceives) {
250 				if (request.socket.receiveContinuously) {
251 					m_completedSocketReceives.removeFront();
252 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
253 					if (request.socket.receiveContinuously && request.socket.alive) {
254 						request.message.count = 0;
255 						submitRequest(request);
256 					} else {
257 						assumeWontThrow(NetworkMessage.free(request.message));
258 						assumeWontThrow(AsyncReceiveRequest.free(request));
259 					}
260 				} else {
261 					m_completedSocketReceives.removeFront();
262 					if (request.message) {
263 						assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
264 						assumeWontThrow(NetworkMessage.free(request.message));
265 					} else {
266 						assumeWontThrow(request.onComplete.get!1)();
267 					}
268 					assumeWontThrow(AsyncReceiveRequest.free(request));
269 				}
270 			}
271 
272 			foreach (request; m_completedSocketSends) {
273 				m_completedSocketSends.removeFront();
274 				request.onComplete();
275 				assumeWontThrow(NetworkMessage.free(request.message));
276 				assumeWontThrow(AsyncSendRequest.free(request));
277 			}
278 
279 			signal = MsgWaitForMultipleObjectsEx(
280 				cast(DWORD) m_waitObjects.length,
281 				m_waitObjects.ptr,
282 				0,
283 				QS_ALLEVENTS,
284 				MWMO_INPUTAVAILABLE // MWMO_INPUTAVAILABLE: Processes key/mouse input to avoid window ghosting
285 				);
286 			if (signal == WAIT_TIMEOUT) {
287 				return true;
288 			}
289 		}
290 
291 		if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) {
292 			static if (LOG) log("Event Loop Exiting because of error");
293 			return false;
294 		}
295 
296 		// Input messages
297 		if (signal == WAIT_OBJECT_0 + m_waitObjects.length) {
298 			MSG msg;
299 			while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
300 				m_status = StatusInfo.init;
301 				TranslateMessage(&msg);
302 				DispatchMessageW(&msg);
303 
304 				if (m_status.code == Status.ERROR) {
305 					static if (LOG) log(m_status.text);
306 					return false;
307 				}
308 			}
309 			return true;
310 		}
311 
312 		// Events
313 		DWORD transferred, flags;
314 		switch (signal - WAIT_OBJECT_0) {
315 			// ConnectEx completion
316 			case 0:
317 				foreach (ref pendingConnect; m_pendingConnects.byKeyValue()) {
318 					auto socket = pendingConnect.key;
319 					auto overlapped = pendingConnect.value;
320 
321 					if (WSAGetOverlappedResult(socket.handle,
322 					                           &overlapped.overlapped,
323 					                           &transferred,
324 					                           false,
325 					                           &flags)) {
326 						m_pendingConnects.remove(socket);
327 						assumeWontThrow(AsyncOverlapped.free(overlapped));
328 						if (updateConnectContext(socket.handle)) {
329 							socket.handleConnect();
330 							return true;
331 						} else {
332 							socket.kill();
333 							socket.handleError();
334 							return false;
335 						}
336 					} else {
337 						m_error = WSAGetLastErrorSafe();
338 						if (m_error == WSA_IO_INCOMPLETE) {
339 							continue;
340 						} else {
341 							m_status.code = Status.ABORT;
342 							socket.kill();
343 							socket.handleError();
344 							return false;
345 						}
346 					}
347 				}
348 				break;
349 			// AcceptEx completion
350 			case 1:
351 				foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) {
352 					auto request = overlapped.accept;
353 					auto socket = request.socket;
354 
355 					if (WSAGetOverlappedResult(socket.handle,
356 					                           &overlapped.overlapped,
357 											   &transferred,
358 											   false,
359 											   &flags)) {
360 						m_pendingAccepts.remove(overlapped);
361 						assumeWontThrow(AsyncOverlapped.free(overlapped));
362 						m_completedSocketAccepts.insertBack(request);
363 					} else {
364 						m_error = WSAGetLastErrorSafe();
365 						if (m_error == WSA_IO_INCOMPLETE) {
366 							continue;
367 						} else {
368 							m_status.code = Status.ABORT;
369 							m_pendingAccepts.remove(overlapped);
370 							assumeWontThrow(AsyncOverlapped.free(overlapped));
371 							assumeWontThrow(AsyncAcceptRequest.free(request));
372 							socket.kill();
373 							socket.handleError();
374 							return false;
375 						}
376 					}
377 				}
378 				foreach (request; m_completedSocketAccepts) {
379 					sockaddr* localAddress, remoteAddress;
380 					socklen_t localAddressLength, remoteAddressLength;
381 
382 					GetAcceptExSockaddrs(request.buffer.ptr,
383 										 0,
384 										 cast(DWORD) request.buffer.length / 2,
385 										 cast(DWORD) request.buffer.length / 2,
386 										 &localAddress,
387 										 &localAddressLength,
388 										 &remoteAddress,
389 										 &remoteAddressLength);
390 
391 					m_completedSocketAccepts.removeFront();
392 					if (!onAccept(request.socket.handle, request, remoteAddress)) {
393 						return false;
394 					}
395 				}
396 				break;
397 			default:
398 				.warning("Unknown event was triggered: ", signal);
399 				break;
400 		}
401 
402 		return true;
403 	}
404 
405 	bool run(AsyncEvent ctxt, EventHandler del)
406 	{
407 		return true;
408 	}
409 
410 	fd_t run(AsyncTCPListener ctxt, TCPAcceptHandler del)
411 	{
412 		m_status = StatusInfo.init;
413 		fd_t fd = ctxt.socket;
414 		bool reusing;
415 		if (fd == fd_t.init) {
416 
417 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
418 
419 			if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
420 				return 0;
421 
422 			if (!setOption(fd, TCPOption.REUSEADDR, true)) {
423 				closeSocket(fd, false);
424 				return 0;
425 			}
426 			// todo: defer accept?
427 
428 			if (ctxt.noDelay) {
429 				if (!setOption(fd, TCPOption.NODELAY, true)) {
430 					closeSocket(fd, false);
431 					return 0;
432 				}
433 			}
434 		} else reusing = true;
435 
436 		if (initTCPListener(fd, ctxt, reusing))
437 		{
438 			try {
439 				static if (LOG) log("Running listener on socket fd#" ~ fd.to!string);
440 				m_connHandlers[fd] = del;
441 				version(Distributed)ctxt.init(m_hwnd, fd);
442 			}
443 			catch (Exception e) {
444 				setInternalError!"m_connHandlers assign"(Status.ERROR, e.msg);
445 				closeSocket(fd, false);
446 				return 0;
447 			}
448 		}
449 		else
450 		{
451 			return 0;
452 		}
453 
454 
455 		return fd;
456 	}
457 
458 	fd_t run(AsyncTCPConnection ctxt, TCPEventHandler del)
459 	in {
460 		assert(ctxt.socket == fd_t.init);
461 		assert(ctxt.peer.family != AF_UNSPEC);
462 	}
463 	body {
464 		m_status = StatusInfo.init;
465 		fd_t fd = ctxt.preInitializedSocket;
466 
467 		if (fd == fd_t.init)
468 			fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
469 		static if (LOG) log("Starting connection at: " ~ fd.to!string);
470 		if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET))
471 			return 0;
472 
473 		try {
474 			(m_tcpHandlers)[fd] = del;
475 		}
476 		catch (Exception e) {
477 			setInternalError!"m_tcpHandlers assign"(Status.ERROR, e.msg);
478 			closeSocket(fd, false);
479 			return 0;
480 		}
481 
482 		nothrow void closeAll() {
483 			try {
484 				static if (LOG) log("Remove event handler for " ~ fd.to!string);
485 				m_tcpHandlers.remove(fd);
486 			}
487 			catch (Exception e) {
488 				setInternalError!"m_tcpHandlers remove"(Status.ERROR, e.msg);
489 			}
490 			closeSocket(fd, false);
491 		}
492 
493 		if (ctxt.noDelay) {
494 			if (!setOption(fd, TCPOption.NODELAY, true)) {
495 				closeAll();
496 				return 0;
497 			}
498 		}
499 
500 		if (!initTCPConnection(fd, ctxt)) {
501 			closeAll();
502 			return 0;
503 		}
504 
505 
506 		static if (LOG) try log("Client started FD#" ~ fd.to!string);
507 		catch (Throwable) {}
508 		return fd;
509 	}
510 
511 	fd_t run(AsyncUDPSocket ctxt, UDPHandler del) {
512 		m_status = StatusInfo.init;
513 		fd_t fd = ctxt.preInitializedSocket;
514 
515 		if (fd == fd_t.init)
516 			fd = WSASocketW(cast(int)ctxt.local.family, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED);
517 
518 		if (catchSocketError!("run AsyncUDPSocket")(fd, INVALID_SOCKET))
519 			return 0;
520 
521 		if (initUDPSocket(fd, ctxt))
522 		{
523 			try {
524 				(m_udpHandlers)[fd] = del;
525 			}
526 			catch (Exception e) {
527 				setInternalError!"m_udpHandlers assign"(Status.ERROR, e.msg);
528 				closesocket(fd);
529 				return 0;
530 			}
531 		}
532 		else return 0;
533 
534 		static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string);
535 		catch (Throwable) {}
536 
537 		return fd;
538 	}
539 
540 	fd_t run(shared AsyncSignal ctxt) {
541 		m_status = StatusInfo.init;
542 		static if (LOG) try log("Signal subscribed to: " ~ m_hwnd.to!string); catch (Throwable) {}
543 		return (cast(fd_t)m_hwnd);
544 	}
545 
546 	fd_t run(AsyncNotifier ctxt) {
547 		m_status = StatusInfo.init;
548 		//static if (LOG) try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch (Throwable) {}
549 		return cast(fd_t) m_hwnd;
550 	}
551 
552 	fd_t run(AsyncTimer ctxt, TimerHandler del, Duration timeout) {
553 		if (timeout < 0.seconds)
554 			timeout = 0.seconds;
555 		m_status = StatusInfo.init;
556 		fd_t timer_id = ctxt.id;
557 		if (timer_id == fd_t.init) {
558 			timer_id = createIndex();
559 		}
560 		static if (LOG) try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch (Throwable) {}
561 
562 		BOOL err;
563 		try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null);
564 		catch(Exception e) {
565 			setInternalError!"SetTimer"(Status.ERROR);
566 			return 0;
567 		}
568 
569 		if (err == 0)
570 		{
571 			m_error = GetLastErrorSafe();
572 			m_status.code = Status.ERROR;
573 			m_status.text = "kill(AsyncTimer)";
574 			static if (LOG) log(m_status);
575 			return 0;
576 		}
577 
578 		if (m_timer.fd == fd_t.init || m_timer.fd == timer_id)
579 		{
580 			m_timer.fd = timer_id;
581 			m_timer.cb = del;
582 		}
583 		else {
584 			try
585 			{
586 				(m_timerHandlers)[timer_id] = del;
587 			}
588 			catch (Exception e) {
589 				setInternalError!"HashMap assign"(Status.ERROR);
590 				return 0;
591 			}
592 		}
593 
594 
595 		return timer_id;
596 	}
597 
598 	fd_t run(AsyncDirectoryWatcher ctxt, DWHandler del)
599 	{
600 		static fd_t ids;
601 		auto fd = ++ids;
602 
603 		try (m_dwHandlers)[fd] = new DWHandlerInfo(del);
604 		catch (Exception e) {
605 			setInternalError!"AsyncDirectoryWatcher.hashMap(run)"(Status.ERROR, "Could not add handler to hashmap: " ~ e.msg);
606 		}
607 
608 		return fd;
609 
610 	}
611 
612 	bool kill(AsyncDirectoryWatcher ctxt) {
613 
614 		try {
615 			Array!DWFolderWatcher toFree;
616 			foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
617 				if (v.fd == ctxt.fd) {
618 					CloseHandle(v.handle);
619 					m_dwFolders.remove(k);
620 				}
621 			}
622 
623 			foreach (DWFolderWatcher obj; toFree[])
624 				ThreadMem.free(obj);
625 
626 			// todo: close all the handlers...
627 			m_dwHandlers.remove(ctxt.fd);
628 		}
629 		catch (Exception e) {
630 			setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
631 			return false;
632 		}
633 
634 		return true;
635 	}
636 
637 	bool kill(AsyncTCPConnection ctxt, bool forced = false)
638 	{
639 
640 		m_status = StatusInfo.init;
641 		fd_t fd = ctxt.socket;
642 
643 		static if (LOG) log("Killing socket "~ fd.to!string);
644 		try {
645 			auto cb = m_tcpHandlers.get(ctxt.socket);
646 			if (cb != TCPEventHandler.init){
647 				*cb.conn.connected = false;
648 				*cb.conn.connecting = false;
649 				return closeSocket(fd, true, forced);
650 			}
651 		} catch (Exception e) {
652 			setInternalError!"in m_tcpHandlers"(Status.ERROR, e.msg);
653 			assert(false);
654 			//return false;
655 		}
656 
657 		return true;
658 	}
659 
660 	bool kill(AsyncTCPListener ctxt)
661 	{
662 		m_status = StatusInfo.init;
663 		fd_t fd = ctxt.socket;
664 		try {
665 			if ((ctxt.socket in m_connHandlers) !is null) {
666 				return closeSocket(fd, false, true);
667 			}
668 		} catch (Exception e) {
669 			setInternalError!"in m_connHandlers"(Status.ERROR, e.msg);
670 			return false;
671 		}
672 
673 		return true;
674 	}
675 
676 	bool kill(shared AsyncSignal ctxt) {
677 		return true;
678 	}
679 
680 	bool kill(AsyncNotifier ctxt) {
681 		return true;
682 	}
683 
684 	bool kill(AsyncTimer ctxt) {
685 		m_status = StatusInfo.init;
686 
687 		static if (LOG) try log("Kill timer" ~ ctxt.id.to!string); catch (Throwable) {}
688 
689 		BOOL err = KillTimer(m_hwnd, ctxt.id);
690 		if (err == 0)
691 		{
692 			m_error = GetLastErrorSafe();
693 			m_status.code = Status.ERROR;
694 			m_status.text = "kill(AsyncTimer)";
695 			static if (LOG) log(m_status);
696 			return false;
697 		}
698 
699 		destroyIndex(ctxt);
700 		scope(exit)
701 			ctxt.id = fd_t.init;
702 		if (m_timer.fd == ctxt.id) {
703 			ctxt.id = 0;
704 			m_timer = TimerCache.init;
705 		} else {
706 			try {
707 				m_timerHandlers.remove(ctxt.id);
708 			}
709 			catch (Exception e) {
710 				setInternalError!"HashMap remove"(Status.ERROR);
711 				return 0;
712 			}
713 		}
714 
715 
716 		return true;
717 	}
718 
719 	bool kill(AsyncEvent ctxt, bool forced = false) {
720 		return true;
721 	}
722 
723 	bool kill(AsyncUDPSocket ctxt) {
724 		m_status = StatusInfo.init;
725 
726 		fd_t fd = ctxt.socket;
727 		INT err = closesocket(fd);
728 		if (catchSocketError!"closesocket"(err))
729 			return false;
730 
731 		try m_udpHandlers.remove(ctxt.socket);
732 		catch (Exception e) {
733 			setInternalError!"HashMap remove"(Status.ERROR);
734 			return 0;
735 		}
736 
737 		return true;
738 	}
739 
740 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
741 		m_status = StatusInfo.init;
742 		int err;
743 		try {
744 			nothrow bool errorHandler() {
745 				if (catchSocketError!"setOption:"(err)) {
746 					try m_status.text ~= option.to!string;
747 					catch (Exception e){ assert(false, "to!string conversion failure"); }
748 					return false;
749 				}
750 
751 				return true;
752 			}
753 
754 
755 
756 			final switch (option) {
757 
758 				case TCPOption.NODELAY: // true/false
759 					static if (!is(T == bool))
760 						assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
761 					else {
762 						BOOL val = value?1:0;
763 						socklen_t len = val.sizeof;
764 						err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
765 						return errorHandler();
766 					}
767 				case TCPOption.REUSEPORT:
768 				case TCPOption.REUSEADDR: // true/false
769 					static if (!is(T == bool))
770 						assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
771 					else
772 					{
773 						BOOL val = value?1:0;
774 						socklen_t len = val.sizeof;
775 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
776 						return errorHandler();
777 					}
778 				case TCPOption.QUICK_ACK:
779 					static if (!is(T == bool))
780 						assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
781 					else {
782 						m_status.code = Status.NOT_IMPLEMENTED;
783 						return false; // quick ack is not implemented
784 					}
785 				case TCPOption.KEEPALIVE_ENABLE: // true/false
786 					static if (!is(T == bool))
787 						assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
788 					else
789 					{
790 						BOOL val = value?1:0;
791 						socklen_t len = val.sizeof;
792 						err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
793 						return errorHandler();
794 					}
795 				case TCPOption.KEEPALIVE_COUNT: // retransmit 10 times before dropping half-open conn
796 					static if (!isIntegral!T)
797 						assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
798 					else {
799 						m_status.code = Status.NOT_IMPLEMENTED;
800 						return false;
801 					}
802 				case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds between each keepalive packets
803 					static if (!is(T == Duration))
804 						assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
805 					else {
806 
807 						if (!kcache)
808 							kcache = new HashMap!(fd_t, tcp_keepalive)();
809 
810 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
811 						tcp_keepalive sReturned;
812 						DWORD dwBytes;
813 						kaSettings.onoff = ULONG(1);
814 						if (kaSettings.keepalivetime == ULONG.init) {
815 							kaSettings.keepalivetime = 1000;
816 						}
817 						kaSettings.keepaliveinterval = value.total!"msecs".to!ULONG;
818 						(*kcache)[fd] = kaSettings;
819 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
820 
821 						return errorHandler();
822 					}
823 				case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
824 					static if (!is(T == Duration))
825 						assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
826 					else {
827 
828 						if (!kcache)
829 							kcache = new HashMap!(fd_t, tcp_keepalive)();
830 
831 						tcp_keepalive kaSettings = kcache.get(fd, tcp_keepalive.init);
832 						tcp_keepalive sReturned;
833 						DWORD dwBytes;
834 						kaSettings.onoff = ULONG(1);
835 						if (kaSettings.keepaliveinterval == ULONG.init) {
836 							kaSettings.keepaliveinterval = 75*1000;
837 						}
838 						kaSettings.keepalivetime = value.total!"msecs".to!ULONG;
839 
840 						(*kcache)[fd] = kaSettings;
841 						err = WSAIoctl(fd, SIO_KEEPALIVE_VALS, &kaSettings, tcp_keepalive.sizeof, &sReturned, tcp_keepalive.sizeof, &dwBytes, null, null);
842 
843 						return errorHandler();
844 					}
845 				case TCPOption.BUFFER_RECV: // bytes
846 					static if (!isIntegral!T)
847 						assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
848 					else {
849 						int val = value.to!int;
850 						socklen_t len = val.sizeof;
851 						err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
852 						return errorHandler();
853 					}
854 				case TCPOption.BUFFER_SEND: // bytes
855 					static if (!isIntegral!T)
856 						assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
857 					else {
858 						int val = value.to!int;
859 						socklen_t len = val.sizeof;
860 						err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
861 						return errorHandler();
862 					}
863 				case TCPOption.TIMEOUT_RECV:
864 					static if (!is(T == Duration))
865 						assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
866 					else {
867 						DWORD val = value.total!"msecs".to!DWORD;
868 						socklen_t len = val.sizeof;
869 						err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &val, len);
870 						return errorHandler();
871 					}
872 				case TCPOption.TIMEOUT_SEND:
873 					static if (!is(T == Duration))
874 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
875 					else {
876 						DWORD val = value.total!"msecs".to!DWORD;
877 						socklen_t len = val.sizeof;
878 						err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
879 						return errorHandler();
880 					}
881 				case TCPOption.TIMEOUT_HALFOPEN:
882 					static if (!is(T == Duration))
883 						assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
884 					else {
885 						m_status.code = Status.NOT_IMPLEMENTED;
886 						return false;
887 					}
888 				case TCPOption.LINGER: // bool onOff, int seconds
889 					static if (!is(T == Tuple!(bool, int)))
890 						assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
891 					else {
892 						linger l = linger(val[0]?1:0, val[1].to!USHORT);
893 						socklen_t llen = l.sizeof;
894 						err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
895 						return errorHandler();
896 					}
897 				case TCPOption.CONGESTION:
898 					static if (!isIntegral!T)
899 						assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
900 					else {
901 						m_status.code = Status.NOT_IMPLEMENTED;
902 						return false;
903 					}
904 				case TCPOption.CORK:
905 					static if (!isIntegral!T)
906 						assert(false, "CORK value type must be int, not " ~ T.stringof);
907 					else {
908 						m_status.code = Status.NOT_IMPLEMENTED;
909 						return false;
910 					}
911 				case TCPOption.DEFER_ACCEPT: // seconds
912 					static if (!isIntegral!T)
913 						assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
914 					else {
915 						int val = value.to!int;
916 						socklen_t len = val.sizeof;
917 						err = setsockopt(fd, SOL_SOCKET, SO_CONDITIONAL_ACCEPT, &val, len);
918 						return errorHandler();
919 					}
920 			}
921 
922 		}
923 		catch (Exception e) {
924 			return false;
925 		}
926 
927 	}
928 
929 	uint read(in fd_t fd, ref ubyte[] data)
930 	{
931 		return 0;
932 	}
933 
934 	uint write(in fd_t fd, in ubyte[] data)
935 	{
936 		return 0;
937 	}
938 
939 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
940 		size_t i;
941 		Array!DWChangeInfo* changes;
942 		try {
943 			changes = &(m_dwHandlers.get(fd, DWHandlerInfo.init).buffer);
944 			if ((*changes).empty)
945 				return 0;
946 
947 			import std.algorithm : min;
948 			size_t cnt = min(dst.length, changes.length);
949 			foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
950 				static if (LOG) try log("reading change: " ~ change.path); catch (Throwable) {}
951 				dst[i] = (*changes)[i];
952 				i++;
953 			}
954 			changes.linearRemove((*changes)[0 .. cnt]);
955 		}
956 		catch (Exception e) {
957 			setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
958 			return 0;
959 		}
960 		static if (LOG) try log("Changes returning with: " ~ i.to!string); catch (Throwable) {}
961 		return cast(uint) i;
962 	}
963 
964 	uint watch(in fd_t fd, in WatchInfo info) {
965 		m_status = StatusInfo.init;
966 		uint wd;
967 		try {
968 			HANDLE hndl = CreateFileW(toUTFz!(const(wchar)*)(info.path.toNativeString()),
969 			                          FILE_LIST_DIRECTORY,
970 			                          FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
971 			                          null,
972 			                          OPEN_EXISTING,
973 			                          FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
974 			                          null);
975 			wd = cast(uint) hndl;
976 			DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
977 			assert(handler !is null);
978 			static if (LOG) log("Watching: " ~ info.path.toNativeString());
979 			(m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
980 		} catch (Exception e) {
981 			setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
982 			return 0;
983 		}
984 		return wd;
985 	}
986 
987 	bool unwatch(in fd_t fd, in fd_t _wd) {
988 		uint wd = cast(uint) _wd;
989 		m_status = StatusInfo.init;
990 		try {
991 			DWFolderWatcher fw = m_dwFolders.get(wd, null);
992 			assert(fw !is null);
993 			m_dwFolders.remove(wd);
994 			fw.close();
995 			ThreadMem.free(fw);
996 		} catch (Exception e) {
997 			setInternalError!"unwatch"(Status.ERROR, "Failed when unwatching directory: " ~ e.msg);
998 			return false;
999 		}
1000 		return true;
1001 	}
1002 
1003 	bool notify(T)(in fd_t fd, in T payload)
1004 		if (is(T == shared AsyncSignal) || is(T == AsyncNotifier))
1005 	{
1006 		m_status = StatusInfo.init;
1007 		import std.conv;
1008 
1009 		auto payloadPtr = cast(ubyte*)payload;
1010 		auto payloadAddr = cast(ulong)payloadPtr;
1011 
1012 		WPARAM wparam = payloadAddr & 0xffffffff;
1013 		LPARAM lparam = cast(uint) (payloadAddr >> 32);
1014 
1015 		BOOL err;
1016 		static if (is(T == AsyncNotifier))
1017 			err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam);
1018 		else
1019 			err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam);
1020 		static if (LOG) try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch (Throwable) {}
1021 		if (err == 0)
1022 		{
1023 			m_error = GetLastErrorSafe();
1024 			m_status.code = Status.ERROR;
1025 			m_status.text = "notify";
1026 			static if (LOG) log(m_status);
1027 			return false;
1028 		}
1029 		return true;
1030 	}
1031 
1032 	fd_t run(AsyncSocket ctxt)
1033 	{
1034 		m_status = StatusInfo.init;
1035 
1036 		auto fd = ctxt.preInitializedHandle;
1037 
1038 		if (fd == INVALID_SOCKET) {
1039 			fd = WSASocketW(ctxt.info.domain, ctxt.info.type, ctxt.info.protocol, null, 0, WSA_FLAG_OVERLAPPED);
1040 		}
1041 
1042 		if (catchErrors!"socket"(fd)) {
1043 			.error("Failed to create socket: ", error);
1044 			return INVALID_SOCKET;
1045 		}
1046 
1047 		return fd;
1048 	}
1049 
1050 	bool kill(AsyncSocket ctxt, bool forced = false)
1051 	{
1052 		m_status = StatusInfo.init;
1053 
1054 		auto handle = ctxt.resetHandle();
1055 
1056 		if (ctxt.connectionOriented && ctxt.passive) {
1057 			foreach (request; m_completedSocketAccepts) if (request.socket is ctxt) {
1058 				sockaddr* localAddress, remoteAddress;
1059 				socklen_t localAddressLength, remoteAddressLength;
1060 
1061 				GetAcceptExSockaddrs(request.buffer.ptr,
1062 									 0,
1063 									 cast(DWORD) request.buffer.length / 2,
1064 									 cast(DWORD) request.buffer.length / 2,
1065 									 &localAddress,
1066 									 &localAddressLength,
1067 									 &remoteAddress,
1068 									 &remoteAddressLength);
1069 
1070 				m_completedSocketAccepts.removeFront();
1071 				if (!onAccept(handle, request, remoteAddress)) {
1072 					.warning("Failed to accept incoming connection request while killing listener");
1073 				}
1074 			}
1075 		}
1076 
1077 		if (!ctxt.passive) {
1078 			foreach (request; m_completedSocketReceives) if (request.socket is ctxt) {
1079 				m_completedSocketReceives.removeFront();
1080 				if (request.message) {
1081 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
1082 					assumeWontThrow(NetworkMessage.free(request.message));
1083 				} else {
1084 					assumeWontThrow(request.onComplete.get!1)();
1085 				}
1086 				assumeWontThrow(AsyncReceiveRequest.free(request));
1087 			}
1088 
1089 			foreach (request; m_completedSocketSends) if (request.socket is ctxt) {
1090 				m_completedSocketSends.removeFront();
1091 				request.onComplete();
1092 				assumeWontThrow(NetworkMessage.free(request.message));
1093 				assumeWontThrow(AsyncSendRequest.free(request));
1094 			}
1095 
1096 			if(!CancelIo(cast(HANDLE) handle)) {
1097 				m_status.code = Status.ABORT;
1098 				m_error = GetLastErrorSafe();
1099 				.error("Failed to cancel outstanding overlapped I/O requests: ", this.error);
1100 				return false;
1101 			}
1102 		}
1103 
1104 		if (ctxt.connectionOriented && ctxt.passive) {
1105 			foreach (overlapped; cast(AsyncOverlapped*[]) m_pendingAccepts.keys) {
1106 				if (overlapped.accept.socket is ctxt) {
1107 					m_pendingAccepts.remove(overlapped);
1108 					assumeWontThrow(AsyncOverlapped.free(overlapped));
1109 				}
1110 			}
1111 		} else if (ctxt.connectionOriented && !ctxt.passive && ctxt in m_pendingConnects) {
1112 			auto overlapped = cast(AsyncOverlapped*) m_pendingConnects[ctxt];
1113 			m_pendingConnects.remove(ctxt);
1114 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1115 		}
1116 
1117 		if (ctxt.connectionOriented && !ctxt.passive) {
1118 			*ctxt.connected = false;
1119 		}
1120 
1121 		INT err;
1122 		if (ctxt.connectionOriented) {
1123 			if (forced) {
1124 				err = shutdown(handle, SD_BOTH);
1125 				closesocket(ctxt.handle);
1126 			} else {
1127 				err = shutdown(handle, SD_SEND);
1128 			}
1129 			if (catchSocketError!"shutdown"(err)) {
1130 				return false;
1131 			}
1132 		} else {
1133 			closesocket(handle);
1134 		}
1135 
1136 		return true;
1137 	}
1138 
1139 	bool bind(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
1140 	{
1141 		import async.internals.socket_compat : bind;
1142 
1143 		auto err = bind(ctxt.handle, addr, addrlen);
1144 		if (catchSocketError!"bind"(err)) {
1145 			.error("Failed to bind socket: ", error);
1146 			return false;
1147 		}
1148 
1149 		return true;
1150 	}
1151 
1152 	bool connect(AsyncSocket ctxt, sockaddr* addr, socklen_t addrlen)
1153 	{
1154 		m_status = StatusInfo.init;
1155 
1156 		// Connectionless sockets can be connected immediately,
1157 		// as this only sets the default remote address.
1158 		if (!ctxt.connectionOriented) {
1159 			import async.internals.socket_compat : connect;
1160 
1161 			auto err = connect(ctxt.handle, addr, addrlen);
1162 			if (catchSocketError!"connect"(err)) {
1163 				.error("Failed to connect socket: ", error);
1164 				return false;
1165 			}
1166 			return true;
1167 		}
1168 
1169 		// ConnectEx requires a bound connection-oriented socket.
1170 		try ctxt.localAddress; catch (SocketOSException) {
1171 			NetworkAddress local;
1172 			switch (ctxt.info.domain) {
1173 				case AF_INET:
1174 					local.addr_ip4.sin_family = AF_INET;
1175 					local.addr_ip4.sin_addr.s_addr = INADDR_ANY;
1176 					local.addr_ip4.sin_port = 0;
1177 					break;
1178 				case AF_INET6:
1179 					local.addr_ip6.sin6_family = AF_INET6;
1180 					local.addr_ip6.sin6_addr = IN6ADDR_ANY;
1181 					local.addr_ip6.sin6_port = 0;
1182 					break;
1183 				default:
1184 					assert(false, "Unsupported address family");
1185 			}
1186 
1187 			if (!bind(ctxt, local.sockAddr, local.sockAddrLen)) {
1188 				return false;
1189 			}
1190 		} catch (Exception e) assert(false);
1191 
1192 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1193 		overlapped.hEvent = pendingConnectEvent;
1194 		if (ConnectEx(ctxt.handle, addr, addrlen, null, 0, null, &overlapped.overlapped)) {
1195 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1196 			if (updateConnectContext(ctxt.handle)) {
1197 				ctxt.handleConnect();
1198 				return true;
1199 			} else {
1200 				ctxt.kill();
1201 				ctxt.handleError();
1202 				return false;
1203 			}
1204 		} else {
1205 			m_error = WSAGetLastErrorSafe();
1206 			if (m_error == WSA_IO_PENDING) {
1207 				m_pendingConnects[ctxt] = overlapped;
1208 				return true;
1209 			} else {
1210 				m_status.code = Status.ABORT;
1211 				ctxt.kill();
1212 				ctxt.handleError();
1213 				return false;
1214 			}
1215 		}
1216 	}
1217 
1218 	auto updateAcceptContext(fd_t listener, fd_t socket)
1219 	{
1220 		auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listener, listener.sizeof);
1221 		if (catchSocketError!"accept"(err)) {
1222 			.error("Failed to setup accepted socket: ", error);
1223 			return false;
1224 		}
1225 		else return true;
1226 	}
1227 
1228 	auto updateConnectContext(fd_t socket)
1229 	{
1230 		auto err = setsockopt(socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0);
1231 		if (catchSocketError!"connect"(err)) {
1232 			.error("Failed to setup connected socket: ", error);
1233 			return false;
1234 		}
1235 		else return true;
1236 	}
1237 
1238 	/+
1239 	bool setupConnectedCOASocket(AsyncSocket ctxt, AsyncSocket incomingOn = null)
1240 	{
1241 		fd_t err;
1242 
1243 		*ctxt.connected = true;
1244 
1245 		if (incomingOn) {
1246 			auto listenerHandle = incomingOn.handle;
1247 			err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &listenerHandle, listenerHandle.sizeof);
1248 			if (catchSocketError!"connect"(err)) {
1249 				.error("Failed to setup connected socket: ", error);
1250 				ctxt.handleError();
1251 				return false;
1252 			}
1253 		} else {
1254 			err = setsockopt(ctxt.handle, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, null, 0);
1255 			if (catchSocketError!"connect"(err)) {
1256 				.error("Failed to setup connected socket: ", error);
1257 				ctxt.handleError();
1258 				return false;
1259 			}
1260 		}
1261 
1262 		return true;
1263 	}
1264 	+/
1265 
1266 	bool listen(AsyncSocket ctxt, int backlog)
1267 	{
1268 		import async.internals.socket_compat : listen;
1269 
1270 		auto err = listen(ctxt.handle, backlog);
1271 		if (catchSocketError!"listen"(err)) {
1272 			.error("Failed to listen on socket: ", error);
1273 			return false;
1274 		}
1275 		return true;
1276 	}
1277 
1278 	bool onAccept(fd_t listener, AsyncAcceptRequest* request, sockaddr* remoteAddress)
1279 	{
1280 		auto socket = request.socket;
1281 		scope (exit) assumeWontThrow(AsyncAcceptRequest.free(request));
1282 
1283 		if (!updateAcceptContext(listener, request.peer)) {
1284 			if (socket.alive) {
1285 				m_status.code = Status.ABORT;
1286 				socket.kill();
1287 				socket.handleError();
1288 			}
1289 			return false;
1290 		}
1291 
1292 		auto peer = request.onComplete(request.peer, remoteAddress.sa_family, socket.info.type, socket.info.protocol);
1293 		if (peer.run()) {
1294 			peer.handleConnect();
1295 			return true;
1296 		} else {
1297 			peer.kill();
1298 			peer.handleError();
1299 			return false;
1300 		}
1301 	}
1302 
1303 	void submitRequest(AsyncAcceptRequest* request)
1304 	{
1305 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1306 		overlapped.accept = request;
1307 		overlapped.hEvent = pendingAcceptEvent;
1308 
1309 		auto socket = request.socket;
1310 
1311 		request.peer = WSASocketW(request.socket.info.domain,
1312 								  request.socket.info.type,
1313 								  request.socket.info.protocol,
1314 								  null, 0, WSA_FLAG_OVERLAPPED);
1315 
1316 		if (request.peer == SOCKET_ERROR) {
1317 			m_error = WSAGetLastErrorSafe();
1318 
1319 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1320 			assumeWontThrow(AsyncAcceptRequest.free(request));
1321 
1322 			.errorf("Failed to create peer socket with WSASocket: %s", error);
1323 			m_status.code = Status.ABORT;
1324 			socket.kill();
1325 			socket.handleError();
1326 			return;
1327 		}
1328 
1329 		DWORD bytesReceived;
1330 	retry:
1331 		if (AcceptEx(socket.handle,
1332 		             request.peer,
1333 		             request.buffer.ptr,
1334 		             0,
1335 		             cast(DWORD) request.buffer.length / 2,
1336 		             cast(DWORD) request.buffer.length / 2,
1337 		             &bytesReceived,
1338 		             &overlapped.overlapped)) {
1339 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1340 			m_completedSocketAccepts.insertBack(request);
1341 			return;
1342 		} else {
1343 			m_error = WSAGetLastErrorSafe();
1344 			if (m_error == WSA_IO_PENDING) {
1345 				m_pendingAccepts[overlapped] = true;
1346 				return;
1347 			// AcceptEx documentation states this error happens if "an incoming connection was indicated,
1348 			// but was subsequently terminated by the remote peer prior to accepting the call".
1349 			// This means there is no pending accept and we have to call AcceptEx again; this,
1350 			// however, is a potential avenue for a denial-of-service attack, in which clients start
1351 			// a connection to us but immediately terminate it, resulting in a (theoretically) infinite
1352 			// loop here. The alternative to continuous resubmitting is closing the socket
1353 			// (either immediately, or after a finite amount of tries to resubmit); that however, also opens up
1354 			// a denial-of-service attack vector (a finite amount of such malicous connection attempts
1355 			// can bring down any of our listening sockets). Of the two, the latter is a lot easier to exploit,
1356 			// so for now we go with the first option of continuous resubmission.
1357 			// TODO: Try to think of an better way to handle this.
1358 			} else if (m_error == WSAECONNRESET) {
1359 				goto retry;
1360 			} else {
1361 				m_status.code = Status.ABORT;
1362 				assumeWontThrow(AsyncOverlapped.free(overlapped));
1363 				assumeWontThrow(AsyncAcceptRequest.free(request));
1364 				socket.kill();
1365 				socket.handleError();
1366 			}
1367 		}
1368 	}
1369 
1370 	void submitRequest(AsyncReceiveRequest* request)
1371 	{
1372 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1373 		overlapped.receive = request;
1374 		auto socket = request.socket;
1375 
1376 		int err = void;
1377 		if (!request.message) {
1378 			.tracef("WSARecv on FD %s with zero byte buffer", socket.handle);
1379 			WSABUF buffer;
1380 			DWORD flags;
1381 			err = WSARecv(socket.handle,
1382 			              &buffer,
1383 			              1,
1384 			              null,
1385 			              &flags,
1386 			              cast(const(WSAOVERLAPPEDX*)) overlapped,
1387 			              cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1388 		} else if (request.message.name) {
1389 			.tracef("WSARecvFrom on FD %s with buffer size %s",
1390 			        socket.handle, request.message.header.msg_iov.len);
1391 			err = WSARecvFrom(socket.handle,
1392 			                  request.message.buffers,
1393 			                  cast(DWORD) request.message.bufferCount,
1394 			                  null,
1395 			                  &request.message.header.msg_flags,
1396 			                  request.message.name,
1397 			                  &request.message.header.msg_namelen,
1398 			                  cast(const(WSAOVERLAPPEDX*)) overlapped,
1399 			                  cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1400 		} else {
1401 			.tracef("WSARecv on FD %s with buffer size %s",
1402 			        socket.handle, request.message.header.msg_iov.len);
1403 			err = WSARecv(socket.handle,
1404 			              request.message.buffers,
1405 			              cast(DWORD) request.message.bufferCount,
1406 			              null,
1407 			              &request.message.header.msg_flags,
1408 			              cast(const(WSAOVERLAPPEDX*)) overlapped,
1409 			              cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedReceiveComplete);
1410 		}
1411 		if (err == SOCKET_ERROR) {
1412 			m_error = WSAGetLastErrorSafe();
1413 			if (m_error == WSA_IO_PENDING) return;
1414 
1415 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1416 			if (request.message) assumeWontThrow(NetworkMessage.free(request.message));
1417 			assumeWontThrow(AsyncReceiveRequest.free(request));
1418 
1419 			// TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals
1420 			//       too many pending overlapped I/O requests.
1421 			if (m_error == WSAECONNRESET ||
1422 			    m_error == WSAECONNABORTED ||
1423 			    m_error == WSAENOTSOCK) {
1424 				socket.handleClose();
1425 
1426 				*socket.connected = false;
1427 
1428 				closesocket(socket.handle);
1429 				return;
1430 			}
1431 
1432 			.errorf("WSARecv* on FD %d encountered socket error: %s", socket.handle, this.error);
1433 			m_status.code = Status.ABORT;
1434 			socket.kill();
1435 			socket.handleError();
1436 		}
1437 	}
1438 
1439 	void submitRequest(AsyncSendRequest* request)
1440 	{
1441 		auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
1442 		overlapped.send = request;
1443 		auto socket = request.socket;
1444 
1445 		int err = void;
1446 		if (request.message.name) {
1447 			.tracef("WSASendTo on FD %s for %s with buffer size %s",
1448 			        socket.handle,
1449 			        NetworkAddress(request.message.name, request.message.header.msg_namelen),
1450 			        request.message.header.msg_iov.len);
1451 			err = WSASendTo(socket.handle,
1452 		                    request.message.buffers,
1453 		                    cast(DWORD) request.message.bufferCount,
1454 		                    null,
1455 		                    request.message.header.msg_flags,
1456 		                    request.message.name,
1457 		                    request.message.nameLength,
1458 		                    cast(const(WSAOVERLAPPEDX*)) overlapped,
1459 		                    cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete);
1460 		} else {
1461 			.tracef("WSASend on FD %s with buffer size %s", socket.handle, request.message.header.msg_iov.len);
1462 			err = WSASend(socket.handle,
1463 		                    request.message.buffers,
1464 		                    cast(DWORD) request.message.bufferCount,
1465 		                    null,
1466 		                    request.message.header.msg_flags,
1467 		                    cast(const(WSAOVERLAPPEDX*)) overlapped,
1468 		                    cast(LPWSAOVERLAPPED_COMPLETION_ROUTINEX) &onOverlappedSendComplete);
1469 		}
1470 
1471 		if (err == SOCKET_ERROR) {
1472 			m_error = WSAGetLastErrorSafe();
1473 			if (m_error == WSA_IO_PENDING) return;
1474 
1475 			assumeWontThrow(AsyncOverlapped.free(overlapped));
1476 			assumeWontThrow(NetworkMessage.free(request.message));
1477 			assumeWontThrow(AsyncSendRequest.free(request));
1478 
1479 			// TODO: Possibly deal with WSAEWOULDBLOCK, which supposedly signals
1480 			//       too many pending overlapped I/O requests.
1481 			if (m_error == WSAECONNRESET ||
1482 			    m_error == WSAECONNABORTED ||
1483 			    m_error == WSAENOTSOCK) {
1484 				socket.handleClose();
1485 
1486 				*socket.connected = false;
1487 
1488 				closesocket(socket.handle);
1489 				return;
1490 			}
1491 
1492 			.errorf("WSASend* on FD %d encountered socket error: %s", socket.handle, this.error);
1493 			m_status.code = Status.ABORT;
1494 			socket.kill();
1495 			socket.handleError();
1496 		}
1497 	}
1498 
1499 	pragma(inline, true)
1500 	long recv(in fd_t fd, void[] data)
1501 	{
1502 		m_status = StatusInfo.init;
1503 		retry:
1504             	long ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
1505 
1506 		//static if (LOG) try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {}
1507 		if (catchSocketError!".recv"(ret)) { // ret == -1
1508 			if (m_error == error_t.WSAEWOULDBLOCK)
1509 			{
1510 				m_status.code = Status.ASYNC;
1511 				Thread.sleep(1.seconds);
1512 				goto retry;
1513 			}
1514 			else if (m_error == error_t.WSAEINTR)
1515 			{
1516 			    goto retry;
1517 			}
1518 			return -1; // TODO: handle some errors more specifically
1519 		}
1520 
1521         m_status.code = Status.OK;
1522 		return ret;
1523 	}
1524 
1525 	pragma(inline, true)
1526 	long send(in fd_t fd, in void[] data)
1527 	{
1528 		m_status = StatusInfo.init;
1529 		static if (LOG) try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string);
1530 		catch (Throwable) {}
1531 		
1532 		retry:
1533             	long ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0);
1534 
1535 		if (catchSocketError!"send"(ret)) {
1536 			if (m_error == error_t.WSAEWOULDBLOCK)
1537 			{
1538 				m_status.code = Status.ASYNC;
1539 				Thread.sleep(1.seconds);
1540 				goto retry;
1541 			}
1542 			else if (m_error == error_t.WSAEINTR)
1543 			{
1544 			    goto retry;
1545 			}
1546 			return -1; // TODO: handle some errors more specifically
1547 		}
1548 		
1549 		m_status.code = Status.OK;
1550 		return ret;
1551 	}
1552 
1553 	bool broadcast(in fd_t fd, bool b) {
1554 	
1555 		int val = b?1:0;
1556 		socklen_t len = val.sizeof;
1557 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1558 		if (catchSocketError!"setsockopt"(err))
1559 			return false;
1560 	
1561 		return true;
1562 
1563 	}
1564 
1565 	long recvFrom(in fd_t fd, void[] data, ref NetworkAddress addr)
1566 	{
1567 		m_status = StatusInfo.init;
1568 
1569 		addr.family = AF_INET6;
1570 		socklen_t addrLen = addr.sockAddrLen;
1571 		
1572 		retry:
1573         		long ret = .recvfrom(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, &addrLen);
1574 
1575 		if (addrLen < addr.sockAddrLen) {
1576 			addr.family = AF_INET;
1577 		}
1578 
1579 		static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable) {}
1580 		if (catchSocketError!".recvfrom"(ret)) { // ret == -1
1581 			if (m_error == WSAEWOULDBLOCK)
1582 			{
1583 				m_status.code = Status.ASYNC;
1584 				Thread.sleep(1.seconds);
1585 				goto retry;
1586 			}
1587 			else if (m_error == error_t.WSAEINTR)
1588 			{
1589 			    goto retry;
1590 			}
1591 			return -1; // TODO: handle some errors more specifically
1592 		}
1593 		
1594 		m_status.code = Status.OK;
1595 		return ret;
1596 	}
1597 
1598 	long sendTo(in fd_t fd, in void[] data, in NetworkAddress addr)
1599 	{
1600 		m_status = StatusInfo.init;
1601 		static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B " ~ addr.toString()); catch (Throwable) {}
1602 		
1603 		retry:
1604 		long ret;
1605 		if (addr != NetworkAddress.init)
1606 			ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen);
1607 		else
1608 			ret = .send(fd, cast(void*) data.ptr, cast(INT) data.length, 0);
1609 
1610 		if (catchSocketError!".sendTo"(ret)) { // ret == -1
1611 			if (m_error == WSAEWOULDBLOCK)
1612 			{
1613 				m_status.code = Status.ASYNC;
1614 				Thread.sleep(1.seconds);
1615 				goto retry;
1616 			}
1617 			else if (m_error == error_t.WSAEINTR)
1618 			{
1619 			    goto retry;
1620 			}
1621 			return -1; // TODO: handle some errors more specifically
1622 		}
1623 
1624 		m_status.code = Status.OK;
1625 		return ret;
1626 	}
1627 
1628 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
1629 		NetworkAddress ret;
1630 		import async.internals.win32 : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
1631 		if (ipv6)
1632 			ret.family = AF_INET6;
1633 		else
1634 			ret.family = AF_INET;
1635 		socklen_t len = ret.sockAddrLen;
1636 		int err = getsockname(fd, ret.sockAddr, &len);
1637 		if (catchSocketError!"getsockname"(err))
1638 			return NetworkAddress.init;
1639 		if (len > ret.sockAddrLen)
1640 			ret.family = AF_INET6;
1641 		return ret;
1642 	}
1643 
1644 	void noDelay(in fd_t fd, bool b) {
1645 		m_status = StatusInfo.init;
1646 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &b, b.sizeof);
1647 	}
1648 
1649 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1650 
1651 		INT err;
1652 
1653 		static if (LOG) try log("Shutdown FD#" ~ fd.to!string);
1654 		catch (Throwable) {}
1655 		if (forced) {
1656 			err = shutdown(fd, SD_BOTH);
1657 			closesocket(fd);
1658 		}
1659 		else
1660 			err = shutdown(fd, SD_SEND);
1661 
1662 		try {
1663 			TCPEventHandler* evh = fd in m_tcpHandlers;
1664 			if (evh) {
1665 				if (evh.conn.inbound) {
1666 					try ThreadMem.free(evh.conn);
1667 					catch(Exception e) { assert(false, "Failed to free resources"); }
1668 				}
1669 
1670 				evh.conn = null;
1671 				//static if (LOG) log("Remove event handler for " ~ fd.to!string);
1672 				m_tcpHandlers.remove(fd);
1673 			}
1674 		}
1675 		catch (Exception e) {
1676 			setInternalError!"m_tcpHandlers.remove"(Status.ERROR);
1677 			return false;
1678 		}
1679 		if (catchSocketError!"shutdown"(err))
1680 			return false;
1681 		return true;
1682 	}
1683 
1684 	// for connected sockets
1685 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1686 	{
1687 		m_status = StatusInfo.init;
1688 		if (!connected && forced) {
1689 			try {
1690 				if (fd in m_connHandlers) {
1691 					static if (LOG) log("Removing connection handler for: " ~ fd.to!string);
1692 					m_connHandlers.remove(fd);
1693 				}
1694 			}
1695 			catch (Exception e) {
1696 				setInternalError!"m_connHandlers.remove"(Status.ERROR);
1697 				return false;
1698 			}
1699 		}
1700 		else if (connected)
1701 			closeRemoteSocket(fd, forced);
1702 
1703 		if (!connected || forced) {
1704 			// todo: flush the socket here?
1705 
1706 			INT err = closesocket(fd);
1707 			if (catchSocketError!"closesocket"(err))
1708 				return false;
1709 
1710 		}
1711 		return true;
1712 	}
1713 
1714 	bool closeConnection(fd_t fd) {
1715 		return closeSocket(fd, true);
1716 	}
1717 
1718 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
1719 	{
1720 		m_status = StatusInfo.init;
1721 
1722 		NetworkAddress addr;
1723 		WSAPROTOCOL_INFOW hints;
1724 		import std.conv : to;
1725 		if (ipv6) {
1726 			addr.family = AF_INET6;
1727 		}
1728 		else {
1729 			addr.family = AF_INET;
1730 		}
1731 
1732 		INT addrlen = addr.sockAddrLen;
1733 
1734 		LPWSTR str;
1735 		try {
1736 			str = cast(LPWSTR) toUTFz!(wchar*)(ipAddr);
1737 		} catch (Exception e) {
1738 			setInternalError!"toStringz"(Status.ERROR, e.msg);
1739 			return NetworkAddress.init;
1740 		}
1741 
1742 		INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen);
1743 		if (port != 0) addr.port = port;
1744 		static if (LOG) try log(addr.toString());
1745 		catch (Throwable) {}
1746 		if( catchSocketError!"getAddressFromIP"(err) )
1747 			return NetworkAddress.init;
1748 		else assert(addrlen == addr.sockAddrLen);
1749 		return addr;
1750 	}
1751 
1752 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
1753 		/*in {
1754 		debug import async.internals.validator : validateHost;
1755 		debug assert(validateHost(host), "Trying to connect to an invalid domain");
1756 	}
1757 	body */{
1758 		m_status = StatusInfo.init;
1759 		import std.conv : to;
1760 		NetworkAddress addr;
1761 		ADDRINFOW hints;
1762 		ADDRINFOW* infos;
1763 		LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);
1764 		if (ipv6) {
1765 			hints.ai_family = AF_INET6;
1766 			addr.family = AF_INET6;
1767 		}
1768 		else {
1769 			hints.ai_family = AF_INET;
1770 			addr.family = AF_INET;
1771 		}
1772 
1773 		if (tcp) {
1774 			hints.ai_protocol = IPPROTO_TCP;
1775 			hints.ai_socktype = SOCK_STREAM;
1776 		}
1777 		else {
1778 			hints.ai_protocol = IPPROTO_UDP;
1779 			hints.ai_socktype = SOCK_DGRAM;
1780 		}
1781 		if (port != 0) addr.port = port;
1782 
1783 		LPCWSTR str;
1784 
1785 		try {
1786 			str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
1787 		} catch (Exception e) {
1788 			setInternalError!"toUTFz"(Status.ERROR, e.msg);
1789 			return NetworkAddress.init;
1790 		}
1791 
1792 		error_t err = cast(error_t) GetAddrInfoW(str, cast(LPCWSTR) wPort, &hints, &infos);
1793 		scope(exit) FreeAddrInfoW(infos);
1794 		if (err != EWIN.WSA_OK) {
1795 			setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
1796 			return NetworkAddress.init;
1797 		}
1798 
1799 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
1800 		ubyte* data = cast(ubyte*) addr.sockAddr;
1801 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
1802 		static if (LOG) try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
1803 		catch (Exception e){}
1804 		return addr;
1805 	}
1806 
1807 	pragma(inline, true)
1808 	void setInternalError(string TRACE)(in Status s, in string details = "", in error_t error = EWIN.ERROR_ACCESS_DENIED)
1809 	{
1810 		if (details.length > 0)
1811 			m_status.text = TRACE ~ ": " ~ details;
1812 		else
1813 			m_status.text = TRACE;
1814 		m_error = error;
1815 		m_status.code = s;
1816 		static if(LOG) log(m_status);
1817 	}
1818 private:
1819 	bool onMessage(MSG msg)
1820 	{
1821 		m_status = StatusInfo.init;
1822 		switch (msg.message) {
1823 			case WM_TCP_SOCKET:
1824 				auto evt = LOWORD(msg.lParam);
1825 				auto err = HIWORD(msg.lParam);
1826 				if (!onTCPEvent(evt, err, cast(fd_t)msg.wParam)) {
1827 
1828 					if (evt == FD_ACCEPT)
1829 						setInternalError!"del@TCPAccept.ERROR"(Status.ERROR);
1830 					else {
1831 						try {
1832 							TCPEventHandler cb = m_tcpHandlers.get(cast(fd_t)msg.wParam);
1833 							cb(TCPEvent.ERROR);
1834 						}
1835 						catch (Exception e) {
1836 							// An Error callback should never fail...
1837 							setInternalError!"del@TCPEvent.ERROR"(Status.ERROR);
1838 							// assert(false, evt.to!string ~ " & " ~ m_status.to!string ~ " & " ~ m_error.to!string);
1839 						}
1840 					}
1841 				}
1842 				break;
1843 			case WM_UDP_SOCKET:
1844 				auto evt = LOWORD(msg.lParam);
1845 				auto err = HIWORD(msg.lParam);
1846 				if (!onUDPEvent(evt, err, cast(fd_t)msg.wParam)) {
1847 					try {
1848 						UDPHandler cb = m_udpHandlers.get(cast(fd_t)msg.wParam);
1849 						cb(UDPEvent.ERROR);
1850 					}
1851 					catch (Exception e) {
1852 						// An Error callback should never fail...
1853 						setInternalError!"del@UDPEvent.ERROR"(Status.ERROR);
1854 					}
1855 				}
1856 				break;
1857 			case WM_TIMER:
1858 				static if (LOG) try log("Timer callback: " ~ m_timer.fd.to!string); catch (Throwable) {}
1859 				TimerHandler cb;
1860 				bool cached = (m_timer.fd == cast(fd_t)msg.wParam);
1861 				try {
1862 					if (cached)
1863 						cb = m_timer.cb;
1864 					else
1865 						cb = m_timerHandlers.get(cast(fd_t)msg.wParam);
1866 					cb.ctxt.rearmed = false;
1867 					cb();
1868 
1869 					if (cb.ctxt.oneShot && !cb.ctxt.rearmed)
1870 						kill(cb.ctxt);
1871 
1872 				}
1873 				catch (Exception e) {
1874 					// An Error callback should never fail...
1875 					setInternalError!"del@TimerHandler"(Status.ERROR, e.msg);
1876 				}
1877 
1878 				break;
1879 			case WM_USER_EVENT:
1880 				static if (LOG) log("User event");
1881 
1882 				ulong uwParam = cast(ulong)msg.wParam;
1883 				ulong ulParam = cast(ulong)msg.lParam;
1884 
1885 				ulong payloadAddr = (ulParam << 32) | uwParam;
1886 				void* payloadPtr = cast(void*) payloadAddr;
1887 				shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr;
1888 
1889 				static if (LOG) try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch (Throwable) {}
1890 				try {
1891 					assert(ctxt.id != 0);
1892 					ctxt.handler();
1893 				}
1894 				catch (Exception e) {
1895 					setInternalError!"WM_USER_EVENT@handler"(Status.ERROR);
1896 				}
1897 				break;
1898 			case WM_USER_SIGNAL:
1899 				static if (LOG) log("User signal");
1900 
1901 				ulong uwParam = cast(ulong)msg.wParam;
1902 				ulong ulParam = cast(ulong)msg.lParam;
1903 
1904 				ulong payloadAddr = (ulParam << 32) | uwParam;
1905 				void* payloadPtr = cast(void*) payloadAddr;
1906 				AsyncNotifier ctxt = cast(AsyncNotifier) payloadPtr;
1907 
1908 				try {
1909 					ctxt.handler();
1910 				}
1911 				catch (Exception e) {
1912 					setInternalError!"WM_USER_SIGNAL@handler"(Status.ERROR);
1913 				}
1914 				break;
1915 			default: return false; // not handled, sends to wndProc
1916 		}
1917 		return true;
1918 	}
1919 
1920 	bool onUDPEvent(WORD evt, WORD err, fd_t sock) {
1921 		m_status = StatusInfo.init;
1922 		try{
1923 			if (m_udpHandlers.get(sock) == UDPHandler.init)
1924 				return false;
1925 		}	catch (Throwable) {}
1926 		if (sock == 0) { // highly unlikely...
1927 			setInternalError!"onUDPEvent"(Status.ERROR, "no socket defined");
1928 			return false;
1929 		}
1930 		if (err) {
1931 			setInternalError!"onUDPEvent"(Status.ERROR, string.init, cast(error_t)err);
1932 			try {
1933 				//log("CLOSE FD#" ~ sock.to!string);
1934 				(m_udpHandlers)[sock](UDPEvent.ERROR);
1935 			} catch (Throwable) { // can't do anything about this...
1936 			}
1937 			return false;
1938 		}
1939 
1940 		UDPHandler cb;
1941 		switch(evt) {
1942 			default: break;
1943 			case FD_READ:
1944 				try {
1945 					static if (LOG) log("READ FD#" ~ sock.to!string);
1946 					cb = m_udpHandlers.get(sock);
1947 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1948 					cb(UDPEvent.READ);
1949 				}
1950 				catch (Exception e) {
1951 					setInternalError!"del@TCPEvent.READ"(Status.ABORT);
1952 					return false;
1953 				}
1954 				break;
1955 			case FD_WRITE:
1956 				try {
1957 					static if (LOG) log("WRITE FD#" ~ sock.to!string);
1958 					cb = m_udpHandlers.get(sock);
1959 					assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
1960 					cb(UDPEvent.WRITE);
1961 				}
1962 				catch (Exception e) {
1963 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
1964 					return false;
1965 				}
1966 				break;
1967 		}
1968 		return true;
1969 	}
1970 
1971 	bool onTCPEvent(WORD evt, WORD err, fd_t sock) {
1972 		m_status = StatusInfo.init;
1973 		try{
1974 			if (m_tcpHandlers.get(sock) == TCPEventHandler.init && m_connHandlers.get(sock) == TCPAcceptHandler.init)
1975 				return false;
1976 		} catch (Throwable) {}
1977 		if (sock == 0) { // highly unlikely...
1978 			setInternalError!"onTCPEvent"(Status.ERROR, "no socket defined");
1979 			return false;
1980 		}
1981 		if (err) {
1982 			setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err);
1983 			try {
1984 				//log("CLOSE FD#" ~ sock.to!string);
1985 				(m_tcpHandlers)[sock](TCPEvent.ERROR);
1986 			} catch (Throwable) { // can't do anything about this...
1987 			}
1988 			return false;
1989 		}
1990 
1991 		TCPEventHandler cb;
1992 		switch(evt) {
1993 			default: break;
1994 			case FD_ACCEPT:
1995 				version(Distributed) gs_mtx.lock_nothrow();
1996 
1997 				static if (LOG) log("TCP Handlers: " ~ m_tcpHandlers.length.to!string);
1998 				static if (LOG) log("Accepting connection");
1999 				/// Let another listener take the next connection
2000 				TCPAcceptHandler list;
2001 				try list = m_connHandlers[sock]; catch (Throwable) { assert(false, "Listening on an invalid socket..."); }
2002 				scope(exit) {
2003 					/// The connection rotation mechanism is handled by the TCPListenerDistMixins
2004 					/// when registering the same AsyncTCPListener object on multiple event loops.
2005 					/// This allows to even out the CPU usage on a server instance.
2006 					version(Distributed)
2007 					{
2008 						HWND hwnd = list.ctxt.next(m_hwnd);
2009 						if (hwnd !is HWND.init) {
2010 							int error = WSAAsyncSelect(sock, hwnd, WM_TCP_SOCKET, FD_ACCEPT);
2011 							if (catchSocketError!"WSAAsyncSelect.NEXT()=> HWND"(error)) {
2012 								error = WSAAsyncSelect(sock, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
2013 								if (catchSocketError!"WSAAsyncSelect"(error))
2014 									assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string);
2015 							}
2016 						}
2017 						else static if (LOG) log("Returned init!!");
2018 						gs_mtx.unlock_nothrow();
2019 					}
2020 				}
2021 
2022 				NetworkAddress addr;
2023 				addr.family = AF_INET;
2024 				int addrlen = addr.sockAddrLen;
2025 				fd_t csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
2026 
2027 				if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET)) {
2028 					if (m_error == WSAEFAULT) { // not enough space for sockaddr
2029 						addr.family = AF_INET6;
2030 						addrlen = addr.sockAddrLen;
2031 						csock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
2032 						if (catchSocketError!"WSAAccept"(csock, INVALID_SOCKET))
2033 							return false;
2034 					}
2035 					else return false;
2036 				}
2037 
2038 				int ok = WSAAsyncSelect(csock, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
2039 				if ( catchSocketError!"WSAAsyncSelect"(ok) )
2040 					return false;
2041 
2042 				static if (LOG) log("Connection accepted: " ~ csock.to!string);
2043 
2044 				AsyncTCPConnection conn;
2045 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
2046 				catch (Exception e) { assert(false, "Failed allocation"); }
2047 				conn.peer = addr;
2048 				conn.socket = csock;
2049 				conn.inbound = true;
2050 
2051 				try {
2052 					// Do the callback to get a handler
2053 					cb = list(conn);
2054 				}
2055 				catch(Exception e) {
2056 					setInternalError!"onConnected"(Status.EVLOOP_FAILURE);
2057 					return false;
2058 				}
2059 
2060 				try {
2061 					m_tcpHandlers[csock] = cb; // keep the handler to setup the connection
2062 					static if (LOG) log("ACCEPT&CONNECT FD#" ~ csock.to!string);
2063 					*conn.connected = true;
2064 					cb(TCPEvent.CONNECT);
2065 				}
2066 				catch (Exception e) {
2067 					setInternalError!"m_tcpHandlers.opIndexAssign"(Status.ABORT);
2068 					return false;
2069 				}
2070 				break;
2071 			case FD_CONNECT:
2072 				try {
2073 					static if (LOG) log("CONNECT FD#" ~ sock.to!string);
2074 					cb = m_tcpHandlers.get(sock);
2075 					if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback");
2076 					*cb.conn.connecting = true;
2077 				}
2078 				catch(Exception e) {
2079 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2080 					return false;
2081 				}
2082 				break;
2083 			case FD_READ:
2084 				try {
2085 					static if (LOG) log("READ FD#" ~ sock.to!string);
2086 					cb = m_tcpHandlers.get(sock);
2087 					if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback");
2088 					if (!cb.conn) break;
2089 					if (*cb.conn.connected == false && *cb.conn.connecting) {
2090 						static if (LOG) log("TCPEvent CONNECT FD#" ~ sock.to!string);
2091 
2092 						*cb.conn.connecting = false;
2093 						*cb.conn.connected = true;
2094 						cb(TCPEvent.CONNECT);
2095 					}
2096 					else {
2097 						static if (LOG) log("TCPEvent READ FD#" ~ sock.to!string);
2098 						cb(TCPEvent.READ);
2099 					}
2100 				}
2101 				catch (Exception e) {
2102 					setInternalError!"del@TCPEvent.READ"(Status.ABORT);
2103 					return false;
2104 				}
2105 				break;
2106 			case FD_WRITE:
2107 				// todo: don't send the first write for consistency with epoll?
2108 
2109 				try {
2110 					//import std.stdio;
2111 					static if (LOG) log("WRITE FD#" ~ sock.to!string);
2112 					cb = m_tcpHandlers.get(sock);
2113 					if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback");
2114 					if (!cb.conn) break;
2115 					if (*cb.conn.connected == false && *cb.conn.connecting) {
2116 						*cb.conn.connecting = false;
2117 						*cb.conn.connected = true;
2118 						cb(TCPEvent.CONNECT);
2119 					}
2120 					else {
2121 						cb(TCPEvent.WRITE);
2122 					}
2123 				}
2124 				catch (Exception e) {
2125 					setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
2126 					return false;
2127 				}
2128 				break;
2129 			case FD_CLOSE:
2130 				// called after shutdown()
2131 				INT ret;
2132 				bool connected = true;
2133 				try {
2134 					static if (LOG) log("CLOSE FD#" ~ sock.to!string);
2135 					if (sock in m_tcpHandlers) {
2136 						cb = m_tcpHandlers.get(sock);
2137 						if (*cb.conn.connected || *cb.conn.connecting) {
2138 							cb(TCPEvent.CLOSE);
2139 							*cb.conn.connecting = false;
2140 							*cb.conn.connected = false;
2141 						} else
2142 							connected = false;
2143 					}
2144 					else
2145 						connected = false;
2146 				}
2147 				catch (Exception e) {
2148 					if (m_status.code == Status.OK)
2149 						setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
2150 					return false;
2151 				}
2152 
2153 				closeSocket(sock, connected, true); // as necessary: invokes m_tcpHandlers.remove(fd), shutdown, closesocket
2154 
2155 				break;
2156 		}
2157 		return true;
2158 	}
2159 
2160 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt)
2161 	{
2162 		INT err;
2163 		static if (LOG) log("Binding to UDP " ~ ctxt.local.toString());
2164 
2165 		if (!setOption(fd, TCPOption.REUSEADDR, true)) {
2166 			closesocket(fd);
2167 			return false;
2168 		}
2169 
2170 		err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2171 		if (catchSocketError!"bind"(err)) {
2172 			closesocket(fd);
2173 			return false;
2174 		}
2175 		err = WSAAsyncSelect(fd, m_hwnd, WM_UDP_SOCKET, FD_READ | FD_WRITE);
2176 		if (catchSocketError!"WSAAsyncSelect"(err)) {
2177 			closesocket(fd);
2178 			return false;
2179 		}
2180 
2181 		return true;
2182 	}
2183 
2184 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, bool reusing = false)
2185 	in {
2186 		assert(m_threadId == GetCurrentThreadId());
2187 		assert(ctxt.local !is NetworkAddress.init);
2188 	}
2189 	body {
2190 		INT err;
2191 		if (!reusing) {
2192 			err = .bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2193 			if (catchSocketError!"bind"(err)) {
2194 				closesocket(fd);
2195 				return false;
2196 			}
2197 
2198 			err = .listen(fd, 128);
2199 			if (catchSocketError!"listen"(err)) {
2200 				closesocket(fd);
2201 				return false;
2202 			}
2203 
2204 			err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_ACCEPT);
2205 			if (catchSocketError!"WSAAsyncSelect"(err)) {
2206 				closesocket(fd);
2207 				return false;
2208 			}
2209 		}
2210 
2211 		return true;
2212 	}
2213 
2214 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt)
2215 	in {
2216 		assert(ctxt.peer !is NetworkAddress.init);
2217 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2218 	}
2219 	body {
2220 		INT err;
2221 		NetworkAddress bind_addr;
2222 		bind_addr.family = ctxt.peer.family;
2223 
2224 		if (ctxt.peer.family == AF_INET)
2225 			bind_addr.sockAddrInet4.sin_addr.s_addr = 0;
2226 		else if (ctxt.peer.family == AF_INET6)
2227 			bind_addr.sockAddrInet6.sin6_addr.s6_addr[] = 0;
2228 		else {
2229 			status.code = Status.ERROR;
2230 			status.text = "Invalid NetworkAddress.family " ~ ctxt.peer.family.to!string;
2231 			return false;
2232 		}
2233 
2234 		err = .bind(fd, bind_addr.sockAddr, bind_addr.sockAddrLen);
2235 		if ( catchSocketError!"bind"(err) )
2236 			return false;
2237 		err = WSAAsyncSelect(fd, m_hwnd, WM_TCP_SOCKET, FD_CONNECT|FD_READ|FD_WRITE|FD_CLOSE);
2238 		if ( catchSocketError!"WSAAsyncSelect"(err) )
2239 			return false;
2240 		err = .connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2241 
2242 		auto errors = [	tuple(cast(size_t) SOCKET_ERROR, EWIN.WSAEWOULDBLOCK, Status.ASYNC) ];
2243 
2244 		if (catchSocketErrorsEq!"connectEQ"(err, errors)) {
2245 			*ctxt.connecting = true;
2246 			return true;
2247 		}
2248 		else if (catchSocketError!"connect"(err))
2249 			return false;
2250 
2251 		return true;
2252 	}
2253 
2254 	bool catchErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2255 		if (isIntegral!T)
2256 	{
2257 		foreach (validator ; cmp) {
2258 			if (val == validator[0]) {
2259 				m_status.text = TRACE;
2260 				m_status.code = validator[1];
2261 				if (m_status.code == Status.EVLOOP_TIMEOUT) {
2262 					static if (LOG) log(m_status);
2263 					break;
2264 				}
2265 				m_error = GetLastErrorSafe();
2266 				static if(LOG) log(m_status);
2267 				return true;
2268 			}
2269 		}
2270 		return false;
2271 	}
2272 
2273 	pragma(inline, true)
2274 	bool catchSocketErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2275 		if (isIntegral!T)
2276 	{
2277 		foreach (validator ; cmp) {
2278 			if (val == validator[0]) {
2279 				m_status.text = TRACE;
2280 				m_error = WSAGetLastErrorSafe();
2281 				m_status.status = validator[1];
2282 				static if(LOG) log(m_status);
2283 				return true;
2284 			}
2285 		}
2286 		return false;
2287 	}
2288 
2289 	bool catchSocketErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
2290 		if (isIntegral!T)
2291 	{
2292 		error_t err;
2293 		foreach (validator ; cmp) {
2294 			if (val == validator[0]) {
2295 				if (err is EWIN.init) err = WSAGetLastErrorSafe();
2296 				if (err == validator[1]) {
2297 					m_status.text = TRACE;
2298 					m_error = WSAGetLastErrorSafe();
2299 					m_status.code = validator[2];
2300 					static if(LOG) log(m_status);
2301 					return true;
2302 				}
2303 			}
2304 		}
2305 		return false;
2306 	}
2307 
2308 	pragma(inline, true)
2309 	bool catchSocketError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2310 		if (isIntegral!T)
2311 	{
2312 		if (val == cmp) {
2313 			m_status.text = TRACE;
2314 			m_error = WSAGetLastErrorSafe();
2315 			m_status.code = Status.ABORT;
2316 			static if(LOG) log(m_status);
2317 			return true;
2318 		}
2319 		return false;
2320 	}
2321 
2322 	pragma(inline, true)
2323 	error_t WSAGetLastErrorSafe() {
2324 		try {
2325 			return cast(error_t) WSAGetLastError();
2326 		} catch(Exception e) {
2327 			return EWIN.ERROR_ACCESS_DENIED;
2328 		}
2329 	}
2330 
2331 	pragma(inline, true)
2332 	error_t GetLastErrorSafe() {
2333 		try {
2334 			return cast(error_t) GetLastError();
2335 		} catch(Exception e) {
2336 			return EWIN.ERROR_ACCESS_DENIED;
2337 		}
2338 	}
2339 
2340 	void log(StatusInfo val)
2341 	{
2342 		static if (LOG) {
2343 			import std.stdio;
2344 			try {
2345 				writeln("Backtrace: ", m_status.text);
2346 				writeln(" | Status:  ", m_status.code);
2347 				writeln(" | Error: " , m_error);
2348 				if ((m_error in EWSAMessages) !is null)
2349 					writeln(" | Message: ", EWSAMessages[m_error]);
2350 			} catch(Exception e) {
2351 				return;
2352 			}
2353 		}
2354 	}
2355 
2356 	void log(T)(lazy T val)
2357 	{
2358 		static if (LOG) {
2359 			import std.stdio;
2360 			try {
2361 				writeln(val);
2362 			} catch(Exception e) {
2363 				return;
2364 			}
2365 		}
2366 	}
2367 
2368 }
2369 
2370 mixin template COSocketMixins() {
2371 
2372 	private CleanupData m_impl;
2373 
2374 	struct CleanupData {
2375 		bool connected;
2376 		bool connecting;
2377 	}
2378 
2379 	@property bool* connecting() {
2380 		return &m_impl.connecting;
2381 	}
2382 
2383 	@property bool* connected() {
2384 		return &m_impl.connected;
2385 	}
2386 
2387 }
2388 /*
2389 mixin template TCPListenerDistMixins()
2390 {
2391 	import core.sys.windows.windows : HWND;
2392 	import async.internals.hashmap : HashMap;
2393 	import core.sync.mutex;
2394 	private {
2395 		bool m_dist;
2396 
2397 		Tuple!(WinReference, bool*) m_handles;
2398 		__gshared HashMap!(fd_t, Tuple!(WinReference, bool*)) gs_dist;
2399 		__gshared Mutex gs_mutex;
2400 	}
2401 
2402 	/// The TCP Listener schedules distributed connection handlers based on
2403 	/// the event loops that are using the same AsyncTCPListener object.
2404 	/// This is done by using WSAAsyncSelect on a different window after each
2405 	/// accept TCPEvent.
2406 	class WinReference {
2407 		private {
2408 			struct Item {
2409 				HWND handle;
2410 				bool active;
2411 			}
2412 
2413 			Item[] m_items;
2414 		}
2415 
2416 		this(HWND hndl, bool b) {
2417 			append(hndl, b);
2418 		}
2419 
2420 		void append(HWND hndl, bool b) {
2421 			m_items ~= Item(hndl, b);
2422 		}
2423 
2424 		HWND next(HWND me) {
2425 			Item[] items;
2426 			synchronized(gs_mutex)
2427 				items = m_items;
2428 			if (items.length == 1)
2429 				return me;
2430 			foreach (i, item; items) {
2431 				if (item.active == true) {
2432 					m_items[i].active = false; // remove responsibility
2433 					if (m_items.length <= i + 1) {
2434 						m_items[0].active = true; // set responsibility
2435 						auto ret = m_items[0].handle;
2436 						return ret;
2437 					}
2438 					else {
2439 						m_items[i + 1].active = true;
2440 						auto ret = m_items[i + 1].handle;
2441 						return ret;
2442 					}
2443 				}
2444 
2445 			}
2446 			assert(false);
2447 		}
2448 
2449 	}
2450 
2451 	void init(HWND hndl, fd_t sock) {
2452 		try {
2453 			if (!gs_mutex) {
2454 				gs_mutex = new Mutex;
2455 			}
2456 			synchronized(gs_mutex) {
2457 				m_handles = gs_dist.get(sock);
2458 				if (m_handles == typeof(m_handles).init) {
2459 					gs_dist[sock] = Tuple!(WinReference, bool*)(new WinReference(hndl, true), &m_dist);
2460 					m_handles = gs_dist.get(sock);
2461 					assert(m_handles != typeof(m_handles).init);
2462 				}
2463 				else {
2464 					m_handles[0].append(hndl, false);
2465 					*m_handles[1] = true; // set first thread to dist
2466 					m_dist = true; // set this thread to dist
2467 				}
2468 			}
2469 		} catch (Exception e) {
2470 			assert(false, e.toString());
2471 		}
2472 
2473 	}
2474 
2475 	HWND next(HWND me) {
2476 		try {
2477 			if (!m_dist)
2478 				return HWND.init;
2479 			return m_handles[0].next(me);
2480 		}
2481 		catch (Exception e) {
2482 			assert(false, e.toString());
2483 		}
2484 	}
2485 
2486 }*/
2487 private class DWHandlerInfo {
2488 	DWHandler handler;
2489 	Array!DWChangeInfo buffer;
2490 
2491 	this(DWHandler cb) {
2492 		handler = cb;
2493 	}
2494 }
2495 
2496 private final class DWFolderWatcher {
2497 	import async.internals.path;
2498 private:
2499 	EventLoop m_evLoop;
2500 	fd_t m_fd;
2501 	bool m_recursive;
2502 	HANDLE m_handle;
2503 	Path m_path;
2504 	DWFileEvent m_events;
2505 	DWHandlerInfo m_handler; // contains buffer
2506 	shared AsyncSignal m_signal;
2507 	ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
2508 	DWORD m_bytesTransferred;
2509 public:
2510 	this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
2511 		m_fd = fd;
2512 		m_recursive = recursive;
2513 		m_handle = cast(HANDLE)hndl;
2514 		m_evLoop = evl;
2515 		m_path = path;
2516 		m_handler = handler;
2517 
2518 		m_signal = new shared AsyncSignal(m_evLoop);
2519 		m_signal.run(&onChanged);
2520 		triggerWatch();
2521 	}
2522 package:
2523 	void close() {
2524 		CloseHandle(m_handle);
2525 		m_signal.kill();
2526 	}
2527 
2528 	void triggerChanged() {
2529 		m_signal.trigger();
2530 	}
2531 
2532 	void onChanged() {
2533 		ubyte[] result = m_buffer.ptr[0 .. m_bytesTransferred];
2534 		do {
2535 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
2536 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
2537 			DWFileEvent kind;
2538 			switch( fni.Action ){
2539 				default: kind = DWFileEvent.MODIFIED; break;
2540 				case 0x1: kind = DWFileEvent.CREATED; break;
2541 				case 0x2: kind = DWFileEvent.DELETED; break;
2542 				case 0x3: kind = DWFileEvent.MODIFIED; break;
2543 				case 0x4: kind = DWFileEvent.MOVED_FROM; break;
2544 				case 0x5: kind = DWFileEvent.MOVED_TO; break;
2545 			}
2546 			string filename = to!string(fni.FileName.ptr[0 .. fni.FileNameLength/2]); // FileNameLength = #bytes, FileName=WCHAR[]
2547 			m_handler.buffer.insert(DWChangeInfo(kind, m_path ~ Path(filename)));
2548 			if( fni.NextEntryOffset == 0 ) break;
2549 			result = result[fni.NextEntryOffset .. $];
2550 		} while(result.length > 0);
2551 
2552 		triggerWatch();
2553 
2554 		m_handler.handler();
2555 	}
2556 
2557 	void triggerWatch() {
2558 
2559 		static UINT notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
2560 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
2561 
2562 		OVERLAPPED* overlapped = ThreadMem.alloc!OVERLAPPED();
2563 		overlapped.Internal = 0;
2564 		overlapped.InternalHigh = 0;
2565 		overlapped.Offset = 0;
2566 		overlapped.OffsetHigh = 0;
2567 		overlapped.Pointer = cast(void*)this;
2568 		import std.stdio;
2569 		DWORD bytesReturned;
2570 		BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);
2571 
2572 		static if (DEBUG) {
2573 			import std.stdio;
2574 			if (!success)
2575 				writeln("Failed to call ReadDirectoryChangesW: " ~ EWSAMessages[GetLastError().to!EWIN]);
2576 		}
2577 	}
2578 
2579 	@property fd_t fd() const {
2580 		return m_fd;
2581 	}
2582 
2583 	@property HANDLE handle() const {
2584 		return cast(HANDLE) m_handle;
2585 	}
2586 
2587 	static nothrow extern(System)
2588 	{
2589 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
2590 		{
2591 			import std.stdio;
2592 			DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
2593 			watcher.m_bytesTransferred = cbTransferred;
2594 			try ThreadMem.free(overlapped); catch (Throwable) {}
2595 
2596 			static if (DEBUG) {
2597 				if (dwError != 0)
2598 					try writeln("Diretory watcher error: "~EWSAMessages[dwError.to!EWIN]); catch (Throwable) {}
2599 			}
2600 			try watcher.triggerChanged();
2601 			catch (Exception e) {
2602 				static if (DEBUG) {
2603 					try writeln("Failed to trigger change"); catch (Throwable) {}
2604 				}
2605 			}
2606 		}
2607 	}
2608 }
2609 
2610 /// Information for a single Windows overlapped I/O request;
2611 /// uses a freelist to minimize allocations.
2612 struct AsyncOverlapped
2613 {
2614 	align (1):
2615 	/// Required for Windows overlapped I/O requests
2616 	OVERLAPPED overlapped;
2617 	align:
2618 
2619 	union
2620 	{
2621 		AsyncAcceptRequest* accept;
2622 		AsyncReceiveRequest* receive;
2623 		AsyncSendRequest* send;
2624 	}
2625 
2626 	@property void hEvent(HANDLE hEvent) @safe pure @nogc nothrow
2627 	{ overlapped.hEvent = hEvent; }
2628 
2629 	import async.internals.freelist;
2630 	mixin FreeList!1_000;
2631 }
2632 
2633 nothrow extern(System)
2634 {
2635 	void onOverlappedReceiveComplete(error_t error, DWORD recvCount, AsyncOverlapped* overlapped, DWORD flags)
2636 	{
2637 		.tracef("onOverlappedReceiveComplete: error: %s, recvCount: %s, flags: %s", error, recvCount, flags);
2638 
2639 		auto request = overlapped.receive;
2640 
2641 		if (error == EWIN.WSA_OPERATION_ABORTED) {
2642 			if (request.message) assumeWontThrow(NetworkMessage.free(request.message));
2643 			assumeWontThrow(AsyncReceiveRequest.free(request));
2644 			return;
2645 		}
2646 
2647 		auto socket = overlapped.receive.socket;
2648 		auto eventLoop = &socket.m_evLoop.m_evLoop;
2649 		if (eventLoop.m_status.code != Status.OK) return;
2650 
2651 		eventLoop.m_status = StatusInfo.init;
2652 
2653 		assumeWontThrow(AsyncOverlapped.free(overlapped));
2654 		if (error == 0) {
2655 			if (!request.message) {
2656 				eventLoop.m_completedSocketReceives.insertBack(request);
2657 				return;
2658 			} else if (recvCount > 0 || !socket.connectionOriented) {
2659 				request.message.count = request.message.count + recvCount;
2660 				if (request.exact && !request.message.receivedAll) {
2661 					eventLoop.submitRequest(request);
2662 					return;
2663 				} else {
2664 					eventLoop.m_completedSocketReceives.insertBack(request);
2665 					return;
2666 				}
2667 			} 
2668 		} else if (recvCount > 0) {
2669 			eventLoop.m_completedSocketReceives.insertBack(request);
2670 			return;
2671 		}
2672 
2673 		assumeWontThrow(NetworkMessage.free(request.message));
2674 		assumeWontThrow(AsyncReceiveRequest.free(request));
2675 
2676 		if (error == WSAECONNRESET || error == WSAECONNABORTED || recvCount == 0) {
2677 			socket.kill();
2678 			socket.handleClose();
2679 			return;
2680 		}
2681 
2682 		eventLoop.m_status.code = Status.ABORT;
2683 		socket.kill();
2684 		socket.handleError();
2685 	}
2686 
2687 	void onOverlappedSendComplete(error_t error, DWORD sentCount, AsyncOverlapped* overlapped, DWORD flags)
2688 	{
2689 		.tracef("onOverlappedSendComplete: error: %s, sentCount: %s, flags: %s", error, sentCount, flags);
2690 
2691 		auto request = overlapped.send;
2692 
2693 		if (error == EWIN.WSA_OPERATION_ABORTED) {
2694 			assumeWontThrow(NetworkMessage.free(request.message));
2695 			assumeWontThrow(AsyncSendRequest.free(request));
2696 			return;
2697 		}
2698 
2699 		auto socket = overlapped.send.socket;
2700 		auto eventLoop = &socket.m_evLoop.m_evLoop;
2701 		if (eventLoop.m_status.code != Status.OK) return;
2702 
2703 		eventLoop.m_status = StatusInfo.init;
2704 
2705 		assumeWontThrow(AsyncOverlapped.free(overlapped));
2706 		if (error == 0) {
2707 			request.message.count = request.message.count + sentCount;
2708 			assert(request.message.sent);
2709 			eventLoop.m_completedSocketSends.insertBack(request);
2710 			return;
2711 		}
2712 
2713 		assumeWontThrow(NetworkMessage.free(request.message));
2714 		assumeWontThrow(AsyncSendRequest.free(request));
2715 
2716 		if (error == WSAECONNRESET || error == WSAECONNABORTED) {
2717 			socket.kill();
2718 			socket.handleClose();
2719 			return;
2720 		}
2721 
2722 		eventLoop.m_status.code = Status.ABORT;
2723 		socket.kill();
2724 		socket.handleError();
2725 	}
2726 }
2727 
2728 enum WM_TCP_SOCKET = WM_USER+102;
2729 enum WM_UDP_SOCKET = WM_USER+103;
2730 enum WM_USER_EVENT = WM_USER+104;
2731 enum WM_USER_SIGNAL = WM_USER+105;
2732 
2733 nothrow:
2734 
2735 __gshared Vector!(size_t, Malloc) gs_availID;
2736 __gshared size_t gs_maxID;
2737 __gshared core.sync.mutex.Mutex gs_mutex;
2738 
2739 private size_t createIndex() {
2740 	size_t idx;
2741 	import std.algorithm : max;
2742 	try {
2743 		size_t getIdx() {
2744 			if (!gs_availID.empty) {
2745 				immutable size_t ret = gs_availID.back;
2746 				gs_availID.removeBack();
2747 				return ret;
2748 			}
2749 			return 0;
2750 		}
2751 
2752 		synchronized(gs_mutex) {
2753 			idx = getIdx();
2754 			if (idx == 0) {
2755 				import std.range : iota;
2756 				gs_availID.insert( iota(gs_maxID + 1, max(32, gs_maxID * 2 + 1), 1) );
2757 				gs_maxID = gs_availID[$-1];
2758 				idx = getIdx();
2759 			}
2760 		}
2761 	} catch (Exception e) {
2762 		assert(false, "Failed to generate necessary ID for Manual Event waiters: " ~ e.msg);
2763 	}
2764 
2765 	return idx;
2766 }
2767 
2768 void destroyIndex(AsyncTimer ctxt) {
2769 	try {
2770 		synchronized(gs_mutex) gs_availID ~= ctxt.id;
2771 	}
2772 	catch (Exception e) {
2773 		assert(false, "Error destroying index: " ~ e.msg);
2774 	}
2775 }
2776 
2777 shared static this() {
2778 
2779 	try {
2780 		if (!gs_mutex) {
2781 			import core.sync.mutex;
2782 			gs_mutex = new core.sync.mutex.Mutex;
2783 
2784 			gs_availID.reserve(32);
2785 
2786 			foreach (i; gs_availID.length .. gs_availID.capacity) {
2787 				gs_availID.insertBack(i + 1);
2788 			}
2789 
2790 			gs_maxID = 32;
2791 		}
2792 	}
2793 	catch (Throwable) {
2794 		assert(false, "Couldn't reserve necessary space for available Manual Events");
2795 	}
2796 
2797 }
2798 
2799 nothrow extern(System) {
2800 	LRESULT wndProc(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
2801 	{
2802 		auto ptr = cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
2803 		if (ptr is null)
2804 			return DefWindowProcA(wnd, msg, wparam, lparam);
2805 		auto appl = cast(EventLoopImpl*)ptr;
2806 		MSG obj = MSG(wnd, msg, wparam, lparam, DWORD.init, POINT.init);
2807 		if (appl.onMessage(obj)) {
2808 			static if (DEBUG) {
2809 				if (appl.status.code != Status.OK && appl.status.code != Status.ASYNC) {
2810 					import std.stdio : writeln;
2811 					try { writeln(appl.error, ": ", appl.m_status.text); } catch (Throwable) {}
2812 				}
2813 			}
2814 			return 0;
2815 		}
2816 		else return DefWindowProcA(wnd, msg, wparam, lparam);
2817 	}
2818 
2819 	BOOL PostMessageA(HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam);
2820 
2821 }