1 module async.posix;
2 
3 import core.thread;
4 
5 version (Posix):
6 
7 import async.types;
8 import std..string : toStringz;
9 import std.conv : to;
10 import std.datetime : Duration, msecs, seconds, SysTime;
11 import std.traits : isIntegral;
12 import std.typecons : Tuple, tuple;
13 import std.container : Array;
14 import std.exception;
15 
16 import core.stdc.errno;
17 import async.events;
18 import async.internals.path;
19 import core.sys.posix.signal;
20 import async.posix2;
21 import async.internals.logging;
22 import core.sync.mutex;
23 import memutils.utils;
24 import memutils.hashmap;
25 
26 alias fd_t = int;
27 
28 
29 version(linux) {
30 	import async.internals.epoll;
31 	const EPOLL = true;
32 	extern(C) nothrow @nogc {
33 		int __libc_current_sigrtmin();
34 		int __libc_current_sigrtmax();
35 	}
36 	bool g_signalsBlocked;
37 	package nothrow void blockSignals() {
38 		try {
39 			/// Block signals to reserve SIGRTMIN .. " +30 for AsyncSignal
40 			sigset_t mask;
41 			// todo: use more signals for more event loops per thread.. (is this necessary?)
42 			//foreach (j; __libc_current_sigrtmin() .. __libc_current_sigrtmax() + 1) {
43 			//import std.stdio : writeln;
44 			//try writeln("Blocked signal " ~ (__libc_current_sigrtmin() + j).to!string ~ " in instance " ~ m_instanceId.to!string); catch {}
45 			sigemptyset(&mask);
46 			sigaddset(&mask, cast(int) __libc_current_sigrtmin());
47 			pthread_sigmask(SIG_BLOCK, &mask, null);
48 			
49 			sigset_t mask1;
50             sigemptyset(&mask1);
51             sigaddset(&mask1, SIGPIPE);
52             sigaddset(&mask1, SIGILL);
53             sigprocmask(SIG_BLOCK, &mask1, null);
54 			//}
55 		} catch (Throwable) {}
56 	}
57 	static this() {
58 		blockSignals();
59 		g_signalsBlocked = true;
60 	}
61 }
62 version(OSX) {
63 	import async.internals.kqueue;
64 	const EPOLL = false;
65 }
66 version(FreeBSD) {
67 	import async.internals.kqueue;
68 	const EPOLL = false;
69 }
70 
71 __gshared Mutex g_mutex;
72 
73 static if (!EPOLL) {
74 	private struct DWFileInfo {
75 		fd_t folder;
76 		Path path;
77 		SysTime lastModified;
78 		bool is_dir;
79 	}
80 }
81 
82 private struct DWFolderInfo {
83 	WatchInfo wi;
84 	fd_t fd;
85 }
86 
87 package struct EventLoopImpl {
88 	static if (EPOLL) {
89 		pragma(msg, "Using Linux EPOLL for events");
90 	}
91 	else /* if KQUEUE */
92 	{
93 		pragma(msg, "Using FreeBSD KQueue for events");
94 	}
95 
96 package:
97 	alias error_t = EPosix;
98 
99 nothrow:
100 private:
101 
102 	/// members
103 	EventLoop m_evLoop;
104 	ushort m_instanceId;
105 	bool m_started;
106 	StatusInfo m_status;
107 	error_t m_error = EPosix.EOK;
108 	EventInfo* m_evSignal;
109 	static if (EPOLL){
110 		fd_t m_epollfd;
111 		HashMap!(Tuple!(fd_t, uint), DWFolderInfo) m_dwFolders; // uint = inotify_add_watch(Path)
112 	}
113 	else /* if KQUEUE */
114 	{
115 		fd_t m_kqueuefd;
116 		HashMap!(fd_t, EventInfo*) m_watchers; // fd_t = id++ per AsyncDirectoryWatcher
117 		HashMap!(fd_t, DWFolderInfo) m_dwFolders; // fd_t = open(folder)
118 		HashMap!(fd_t, DWFileInfo) m_dwFiles; // fd_t = open(file)
119 		HashMap!(fd_t, Array!(DWChangeInfo)*) m_changes; // fd_t = id++ per AsyncDirectoryWatcher
120 
121 	}
122 
123 	AsyncAcceptRequest.Queue m_completedSocketAccepts;
124 	AsyncReceiveRequest.Queue m_completedSocketReceives;
125 	AsyncSendRequest.Queue m_completedSocketSends;
126 
127 package:
128 
129 	/// workaround for IDE indent bug on too big files
130 	mixin RunKill!();
131 
132 	@property bool started() const {
133 		return m_started;
134 	}
135 
136 	bool init(EventLoop evl)
137 	in { assert(!m_started); }
138 	body
139 	{
140 
141 		import core.atomic;
142 		shared static ushort i;
143 		string* failer = null;
144 
145 
146 		m_instanceId = i;
147 		static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
148 
149 		core.atomic.atomicOp!"+="(i, cast(ushort) 1);
150 		m_evLoop = evl;
151 
152 		import core.thread;
153 		try Thread.getThis().priority = Thread.PRIORITY_MAX;
154 		catch (Exception e) { assert(false, "Could not set thread priority"); }
155 
156 		try
157 			if (!g_mutex)
158 				g_mutex = new Mutex;
159 		catch (Throwable) {}
160 
161 		static if (EPOLL)
162 		{
163 
164 			if (!g_signalsBlocked)
165 				blockSignals();
166 			assert(m_instanceId <= __libc_current_sigrtmax(), "An additional event loop is unsupported due to SIGRTMAX restrictions in Linux Kernel");
167 			m_epollfd = epoll_create1(EPOLL_CLOEXEC);
168 
169 			if (catchError!"epoll_create1"(m_epollfd))
170 				return false;
171 
172 			import core.sys.linux.sys.signalfd;
173 			import core.thread : getpid;
174 
175 			fd_t err;
176 			fd_t sfd;
177 
178 			sigset_t mask;
179 
180 			try {
181 				sigemptyset(&mask);
182 				sigaddset(&mask, __libc_current_sigrtmin());
183 				err = pthread_sigmask(SIG_BLOCK, &mask, null);
184 				if (catchError!"sigprocmask"(err))
185 				{
186 					m_status.code = Status.EVLOOP_FAILURE;
187 					return false;
188 				}
189 			} catch (Throwable) { }
190 
191 
192 
193 			sfd = signalfd(-1, &mask, SFD_NONBLOCK);
194 			assert(sfd > 0, "Failed to setup signalfd in epoll");
195 
196 			EventType evtype;
197 
198 			epoll_event _event;
199 			_event.events = EPOLLIN;
200 			evtype = EventType.Signal;
201 			try
202 				m_evSignal = ThreadMem.alloc!EventInfo(sfd, evtype, EventObject.init, m_instanceId);
203 			catch (Exception e){
204 				assert(false, "Allocation error");
205 			}
206 			_event.data.ptr = cast(void*) m_evSignal;
207 
208 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, sfd, &_event);
209 			if (catchError!"EPOLL_CTL_ADD(sfd)"(err))
210 			{
211 				return false;
212 			}
213 
214 		}
215 			else /* if KQUEUE */
216 		{
217 			try {
218 				if (!gs_queueMutex) {
219 					gs_queueMutex = ThreadMem.alloc!ReadWriteMutex();
220 					gs_signalQueue = Array!(Array!AsyncSignal)();
221 					gs_idxQueue = Array!(Array!size_t)();
222 				}
223 				if (g_evIdxAvailable.empty) {
224 					g_evIdxAvailable.reserve(32);
225 
226 					foreach (k; g_evIdxAvailable.length .. g_evIdxAvailable.capacity) {
227 						g_evIdxAvailable.insertBack(k + 1);
228 					}
229 					g_evIdxCapacity = 32;
230 					g_idxCapacity = 32;
231 				}
232 			} catch (Throwable) { assert(false, "Initialization failed"); }
233 			m_kqueuefd = kqueue();
234 			int err;
235 			try {
236 				sigset_t mask;
237 				sigemptyset(&mask);
238 				sigaddset(&mask, SIGXCPU);
239 
240 				err = sigprocmask(SIG_BLOCK, &mask, null);
241 			} catch (Throwable) {}
242 
243 			EventType evtype = EventType.Signal;
244 
245 			// use GC because ThreadMem fails at emplace for shared objects
246 			try
247 				m_evSignal = ThreadMem.alloc!EventInfo(SIGXCPU, evtype, EventObject.init, m_instanceId);
248 			catch (Exception e) {
249 				assert(false, "Failed to allocate resources");
250 			}
251 
252 			if (catchError!"siprocmask"(err))
253 				return 0;
254 
255 			kevent_t _event;
256 			EV_SET(&_event, SIGXCPU, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, m_evSignal);
257 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
258 			if (catchError!"kevent_add(SIGXCPU)"(err))
259 				assert(false, "Add SIGXCPU failed at kevent call");
260 		}
261 
262 		static if (LOG) try log("init in thread " ~ Thread.getThis().name); catch (Throwable) {}
263 
264 		return true;
265 	}
266 
267 	void exit() {
268 		import core.sys.posix.unistd : close;
269 		static if (EPOLL) {
270 			close(m_epollfd); // not necessary?
271 
272 			// not necessary:
273 			//try ThreadMem.free(m_evSignal);
274 			//catch (Exception e) { assert(false, "Failed to free resources"); }
275 
276 		}
277 		else
278 			close(m_kqueuefd);
279 	}
280 
281 	@property const(StatusInfo) status() const {
282 		return m_status;
283 	}
284 
285 	@property string error() const {
286 		string* ptr;
287 		return ((ptr = (m_error in EPosixMessages)) !is null) ? *ptr : string.init;
288 	}
289 
290 	bool loop(Duration timeout = 0.seconds)
291 		//in { assert(Fiber.getThis() is null); }
292 	{
293 		import async.internals.memory;
294 
295 		int num = void;
296 
297 		static if (EPOLL) {
298 			static align(1) epoll_event[] events;
299 			if (events is null)
300 			{
301 				try events = new epoll_event[128];
302 				catch (Exception e) {
303 					assert(false, "Could not allocate events array: " ~ e.msg);
304 				}
305 			}
306 		} else /* if KQUEUE */ {
307 			import core.sys.posix.time : time_t;
308 			import core.sys.posix.config : c_long;
309 
310 			static kevent_t[] events;
311 			if (events.length == 0) {
312 				try events = allocArray!kevent_t(manualAllocator(), 128);
313 				catch (Exception e) { assert(false, "Could not allocate events array"); }
314 			}
315 		}
316 
317 		auto waitForEvents(Duration timeout)
318 		{
319 			static if (EPOLL) {
320 				int timeout_ms;
321 				if (timeout == 0.seconds) // return immediately
322 					timeout_ms = 0;
323 				else if (timeout == -1.seconds) // wait indefinitely
324 					timeout_ms = -1;
325 				else timeout_ms = cast(int) timeout.total!"msecs";
326 				/// Retrieve pending events
327 				scope (exit) assert(events !is null && events.length <= 128);
328 				return epoll_wait(m_epollfd, cast(epoll_event*) &events[0], 128, timeout_ms);
329 			} else /* if KQUEUE */ {
330 				if (timeout != -1.seconds) {
331 					time_t secs = timeout.split!("seconds", "nsecs")().seconds;
332 					c_long ns = timeout.split!("seconds", "nsecs")().nsecs;
333 					auto tspec = async.internals.kqueue.timespec(secs, ns);
334 
335 					return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, &tspec);
336 				} else {
337 					return kevent(m_kqueuefd, null, 0, cast(kevent_t*) events, cast(int) events.length, null);
338 				}
339 			}
340 		}
341 
342 		auto handleEvents()
343 		{
344 			bool success = true;
345 
346 			static Tuple!(int, Status)[] errors = [	tuple(EINTR, Status.EVLOOP_TIMEOUT) ];
347 
348 			if (catchEvLoopErrors!"event_poll'ing"(num, errors))
349 				return false;
350 
351 			if (num > 0)
352 				static if (LOG) log("Got " ~ num.to!string ~ " event(s)");
353 
354 			foreach(i; 0 .. num) {
355 				success = false;
356 				m_status = StatusInfo.init;
357 				static if (EPOLL)
358 				{
359 					epoll_event _event = events[i];
360 					static if (LOG) try log("Event " ~ i.to!string ~ " of: " ~ events.length.to!string); catch {}
361 					EventInfo* info = cast(EventInfo*) _event.data.ptr;
362 					int event_flags = cast(int) _event.events;
363 				}
364 				else /* if KQUEUE */
365 				{
366 					kevent_t _event = events[i];
367 					EventInfo* info = cast(EventInfo*) _event.udata;
368 					//log("Got info");
369 					int event_flags = (_event.filter << 16) | (_event.flags & 0xffff);
370 					//log("event flags");
371 				}
372 
373 				//if (info.owner != m_instanceId)
374 				//	static if (LOG) try log("Event " ~ (cast(int)(info.evType)).to!string ~ " is invalid: supposidly created in instance #" ~ info.owner.to!string ~ ", received in " ~ m_instanceId.to!string ~ " event: " ~ event_flags.to!string);
375 				//	catch{}
376 				//log("owner");
377 				switch (info.evType) {
378 					case EventType.Event:
379 						if (info.fd == 0)
380 							break;
381 
382 						import core.sys.posix.unistd : close;
383 						success = onEvent(info.fd, info.evObj.eventHandler, event_flags);
384 
385 						if (!success) {
386 							close(info.fd);
387 							assumeWontThrow(ThreadMem.free(info));
388 						}
389 						break;
390 					case EventType.Socket:
391 						auto socket = info.evObj.socket;
392 						if (socket.passive) {
393 							success = onCOPSocketEvent(socket, event_flags);
394 						} else if (socket.connectionOriented) {
395 							success = onCOASocketEvent(socket, event_flags);
396 						} else {
397 							success = onCLSocketEvent(socket, event_flags);
398 						}
399 						break;
400 					case EventType.TCPAccept:
401 						if (info.fd == 0)
402 							break;
403 						success = onTCPAccept(info.fd, info.evObj.tcpAcceptHandler, event_flags);
404 						break;
405 
406 					case EventType.Notifier:
407 
408 						static if (LOG) log("Got notifier!");
409 						try info.evObj.notifierHandler();
410 						catch (Exception e) {
411 							setInternalError!"notifierHandler"(Status.ERROR);
412 						}
413 						break;
414 
415 					case EventType.DirectoryWatcher:
416 						static if (LOG) log("Got DirectoryWatcher event!");
417 						static if (!EPOLL) {
418 							// in KQUEUE all events will be consumed here, because they must be pre-processed
419 							try {
420 								DWFileEvent fevent;
421 								if (_event.fflags & (NOTE_LINK | NOTE_WRITE))
422 									fevent = DWFileEvent.CREATED;
423 								else if (_event.fflags & NOTE_DELETE)
424 									fevent = DWFileEvent.DELETED;
425 								else if (_event.fflags & (NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE))
426 									fevent = DWFileEvent.MODIFIED;
427 								else if (_event.fflags & NOTE_RENAME)
428 									fevent = DWFileEvent.MOVED_FROM;
429 								else if (_event.fflags & NOTE_RENAME)
430 									fevent = DWFileEvent.MOVED_TO;
431 								else
432 									assert(false, "No event found?");
433 
434 								DWFolderInfo fi = m_dwFolders.get(cast(fd_t)_event.ident, DWFolderInfo.init);
435 
436 								if (fi == DWFolderInfo.init) {
437 									DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
438 									assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
439 									fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
440 									assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
441 								}
442 
443 								// all recursive events will be generated here
444 								if (!compareFolderFiles(fi, fevent)) {
445 									continue;
446 								}
447 
448 							} catch (Exception e) {
449 								static if (LOG) log("Could not process DirectoryWatcher event: " ~ e.msg);
450 								break;
451 							}
452 
453 						}
454 
455 						try info.evObj.dwHandler();
456 						catch (Exception e) {
457 							setInternalError!"dwHandler"(Status.ERROR);
458 						}
459 						break;
460 
461 					case EventType.Timer:
462 						static if (LOG) try log("Got timer! " ~ info.fd.to!string); catch {}
463 						static if (EPOLL) {
464 							static long val;
465 							import core.sys.posix.unistd : read;
466 							read(info.evObj.timerHandler.ctxt.id, &val, long.sizeof);
467 						} else {
468 						}
469 						try info.evObj.timerHandler();
470 						catch (Exception e) {
471 							setInternalError!"timerHandler"(Status.ERROR);
472 						}
473 						static if (!EPOLL) {
474 							auto ctxt = info.evObj.timerHandler.ctxt;
475 							if (ctxt && ctxt.oneShot && !ctxt.rearmed) {
476 								kevent_t __event;
477 								EV_SET(&__event, ctxt.id, EVFILT_TIMER, EV_DELETE, 0, 0, null);
478 								int err = kevent(m_kqueuefd, &__event, 1, null, 0, null);
479 								if (catchError!"kevent_del(timer)"(err))
480 									return false;
481 							}
482 						}
483 						break;
484 
485 					case EventType.Signal:
486 						static if (LOG) try log("Got signal!"); catch {}
487 
488 						static if (EPOLL) {
489 
490 							static if (LOG) try log("Got signal: " ~ info.fd.to!string ~ " of type: " ~ info.evType.to!string); catch {}
491 							import core.sys.linux.sys.signalfd : signalfd_siginfo;
492 							import core.sys.posix.unistd : read;
493 							signalfd_siginfo fdsi;
494 							fd_t err = cast(fd_t)read(info.fd, &fdsi, fdsi.sizeof);
495 							shared AsyncSignal sig = cast(shared AsyncSignal) cast(void*) fdsi.ssi_ptr;
496 
497 							try sig.handler();
498 							catch (Exception e) {
499 								setInternalError!"signal handler"(Status.ERROR);
500 							}
501 
502 
503 						}
504 						else /* if KQUEUE */
505 						{
506 							static AsyncSignal[] sigarr;
507 
508 							if (sigarr.length == 0) {
509 								try sigarr = new AsyncSignal[32];
510 								catch (Exception e) { assert(false, "Could not allocate signals array"); }
511 							}
512 
513 							bool more = popSignals(sigarr);
514 							foreach (AsyncSignal sig; sigarr)
515 							{
516 								shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
517 								if (ptr is null)
518 									break;
519 								try (cast(shared AsyncSignal)sig).handler();
520 								catch (Exception e) {
521 									setInternalError!"signal handler"(Status.ERROR);
522 								}
523 							}
524 						}
525 						break;
526 
527 					case EventType.UDPSocket:
528 						import core.sys.posix.unistd : close;
529 						success = onUDPTraffic(info.fd, info.evObj.udpHandler, event_flags);
530 
531 						nothrow void abortHandler(bool graceful) {
532 
533 							close(info.fd);
534 							info.evObj.udpHandler.conn.socket = 0;
535 							try info.evObj.udpHandler(UDPEvent.ERROR);
536 							catch (Exception e) { }
537 							try ThreadMem.free(info);
538 							catch (Exception e){ assert(false, "Error freeing resources"); }
539 						}
540 
541 						if (!success && m_status.code == Status.ABORT) {
542 							abortHandler(true);
543 
544 						}
545 						else if (!success && m_status.code == Status.ERROR) {
546 							abortHandler(false);
547 						}
548 						break;
549 					case EventType.TCPTraffic:
550 						assert(info.evObj.tcpEvHandler.conn !is null, "TCP Connection invalid");
551 
552 						success = onTCPTraffic(info.fd, info.evObj.tcpEvHandler, event_flags, info.evObj.tcpEvHandler.conn);
553 
554 						nothrow void abortTCPHandler(bool graceful) {
555 
556 							nothrow void closeAll() {
557 								static if (LOG) try log("closeAll()"); catch {}
558 								if (info.evObj.tcpEvHandler.conn.connected)
559 									closeSocket(info.fd, true, true);
560 
561 								info.evObj.tcpEvHandler.conn.socket = 0;
562 							}
563 
564 							/// Close the connection after an unexpected socket error
565 							if (graceful) {
566 								try info.evObj.tcpEvHandler(TCPEvent.CLOSE);
567 								catch (Exception e) { static if(LOG) log("Close failure"); }
568 								closeAll();
569 							}
570 
571 							/// Kill the connection after an internal error
572 							else {
573 								try info.evObj.tcpEvHandler(TCPEvent.ERROR);
574 								catch (Exception e) { static if(LOG) log("Error failure"); }
575 								closeAll();
576 							}
577 
578 							if (info.evObj.tcpEvHandler.conn.inbound) {
579 								static if (LOG) log("Freeing inbound connection FD#" ~ info.fd.to!string);
580 								try ThreadMem.free(info.evObj.tcpEvHandler.conn);
581 								catch (Exception e){ assert(false, "Error freeing resources"); }
582 							}
583 							try ThreadMem.free(info);
584 							catch (Exception e){ assert(false, "Error freeing resources"); }
585 						}
586 
587 						if (!success && m_status.code == Status.ABORT) {
588 							abortTCPHandler(true);
589 						}
590 						else if (!success && m_status.code == Status.ERROR) {
591 							abortTCPHandler(false);
592 						}
593 						break;
594 					default:
595 						break;
596 				}
597 
598 			}
599 
600 			return success;
601 		}
602 
603 		if (m_completedSocketAccepts.empty && m_completedSocketReceives.empty && m_completedSocketSends.empty) {
604 			num = waitForEvents(timeout);
605 			return handleEvents();
606 		} else {
607 			num = waitForEvents(0.seconds);
608 			if (num != 0 && !handleEvents()) return false;
609 
610 			foreach (request; m_completedSocketAccepts) {
611 				m_completedSocketAccepts.removeFront();
612 				auto socket = request.socket;
613 				auto peer = request.onComplete(request.peer, request.family, socket.info.type, socket.info.protocol);
614 				assumeWontThrow(AsyncAcceptRequest.free(request));
615 				if (!peer.run) {
616 					m_status.code = Status.ABORT;
617 					peer.kill();
618 					peer.handleError();
619 					return false;
620 				}
621 			}
622 
623 			foreach (request; m_completedSocketReceives) {
624 				if (request.socket.receiveContinuously) {
625 					m_completedSocketReceives.removeFront();
626 					assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
627 					if (request.socket.receiveContinuously && request.socket.alive) {
628 						request.message.count = 0;
629 						submitRequest(request);
630 					} else {
631 						assumeWontThrow(NetworkMessage.free(request.message));
632 						assumeWontThrow(AsyncReceiveRequest.free(request));
633 					}
634 				} else {
635 					m_completedSocketReceives.removeFront();
636 					if (request.message) {
637 						assumeWontThrow(request.onComplete.get!0)(request.message.transferred);
638 						assumeWontThrow(NetworkMessage.free(request.message));
639 					} else {
640 						assumeWontThrow(request.onComplete.get!1)();
641 					}
642 					assumeWontThrow(AsyncReceiveRequest.free(request));
643 				}
644 			}
645 
646 			foreach (request; m_completedSocketSends) {
647 				m_completedSocketSends.removeFront();
648 				request.onComplete();
649 				assumeWontThrow(NetworkMessage.free(request.message));
650 				assumeWontThrow(AsyncSendRequest.free(request));
651 			}
652 
653 			return true;
654 		}
655 	}
656 
657 	bool setOption(T)(fd_t fd, TCPOption option, in T value) {
658 		m_status = StatusInfo.init;
659 		import std.traits : isIntegral;
660 
661 		import async.internals.socket_compat : socklen_t, setsockopt, SO_REUSEADDR, SO_KEEPALIVE, SO_RCVBUF, SO_SNDBUF, SO_RCVTIMEO, SO_SNDTIMEO, SO_LINGER, SOL_SOCKET, IPPROTO_TCP, TCP_NODELAY, TCP_QUICKACK, TCP_KEEPCNT, TCP_KEEPINTVL, TCP_KEEPIDLE, TCP_CONGESTION, TCP_CORK, TCP_DEFER_ACCEPT;
662 		int err;
663 		nothrow bool errorHandler() {
664 			if (catchError!"setOption:"(err)) {
665 				try m_status.text ~= option.to!string;
666 				catch (Exception e){ assert(false, "to!string conversion failure"); }
667 				return false;
668 			}
669 
670 			return true;
671 		}
672 		final switch (option) {
673 			case TCPOption.NODELAY: // true/false
674 				static if (!is(T == bool))
675 					assert(false, "NODELAY value type must be bool, not " ~ T.stringof);
676 				else {
677 					int val = value?1:0;
678 					socklen_t len = val.sizeof;
679 					err = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len);
680 					return errorHandler();
681 				}
682 			case TCPOption.REUSEADDR: // true/false
683 				static if (!is(T == bool))
684 					assert(false, "REUSEADDR value type must be bool, not " ~ T.stringof);
685 				else {
686 					int val = value?1:0;
687 					socklen_t len = val.sizeof;
688 					err = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len);
689 					if (!errorHandler())
690 						return false;
691 					version (Posix) {
692 						version (linux) {
693 							return true;
694 						} else {
695 							// BSD systems have SO_REUSEPORT
696 							import async.internals.socket_compat : SO_REUSEPORT;
697 							err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, len);
698 							return errorHandler();
699 						}
700 					}
701 				}
702 			case TCPOption.REUSEPORT: // true/false
703 				// use a standalone REUSEPORT option to handle SO_REUSEPORT on linux
704 				version (linux) {
705 					static if (!is(T == bool))
706 						assert(false, "REUSEPORT value type must be bool, not " ~ T.stringof);
707 					else {
708 						// BSD systems have SO_REUSEPORT
709 						import async.internals.socket_compat : SO_REUSEPORT;
710 						int val = value?1:0;
711 						err = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, val.sizeof);
712 
713 						// Not all linux kernels support SO_REUSEPORT
714 						// ignore invalid and not supported errors on linux
715 						if (errno == EINVAL || errno == ENOPROTOOPT) {
716 							return true;
717 						}
718 
719 						return errorHandler();
720 					}
721 				} else return true;
722 			case TCPOption.QUICK_ACK:
723 				static if (!is(T == bool))
724 					assert(false, "QUICK_ACK value type must be bool, not " ~ T.stringof);
725 				else {
726 					static if (EPOLL) {
727 						int val = value?1:0;
728 						socklen_t len = val.sizeof;
729 						err = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, len);
730 						return errorHandler();
731 					}
732 					else /* not linux */ {
733 						return false;
734 					}
735 				}
736 			case TCPOption.KEEPALIVE_ENABLE: // true/false
737 				static if (!is(T == bool))
738 					assert(false, "KEEPALIVE_ENABLE value type must be bool, not " ~ T.stringof);
739 				else
740 				{
741 					int val = value?1:0;
742 					socklen_t len = val.sizeof;
743 					err = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len);
744 					return errorHandler();
745 				}
746 			case TCPOption.KEEPALIVE_COUNT: // ##
747 				static if (!isIntegral!T)
748 					assert(false, "KEEPALIVE_COUNT value type must be integral, not " ~ T.stringof);
749 				else {
750 					int val = value.total!"msecs".to!uint;
751 					socklen_t len = val.sizeof;
752 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, len);
753 					return errorHandler();
754 				}
755 			case TCPOption.KEEPALIVE_INTERVAL: // wait ## seconds
756 				static if (!is(T == Duration))
757 					assert(false, "KEEPALIVE_INTERVAL value type must be Duration, not " ~ T.stringof);
758 				else {
759 					int val;
760 					try val = value.total!"seconds".to!uint; catch { return false; }
761 					socklen_t len = val.sizeof;
762 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, len);
763 					return errorHandler();
764 				}
765 			case TCPOption.KEEPALIVE_DEFER: // wait ## seconds until start
766 				static if (!is(T == Duration))
767 					assert(false, "KEEPALIVE_DEFER value type must be Duration, not " ~ T.stringof);
768 				else {
769 					int val;
770 					try val = value.total!"seconds".to!uint; catch { return false; }
771 					socklen_t len = val.sizeof;
772 					err = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, len);
773 					return errorHandler();
774 				}
775 			case TCPOption.BUFFER_RECV: // bytes
776 				static if (!isIntegral!T)
777 					assert(false, "BUFFER_RECV value type must be integral, not " ~ T.stringof);
778 				else {
779 					int val = value.to!int;
780 					socklen_t len = val.sizeof;
781 					err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len);
782 					return errorHandler();
783 				}
784 			case TCPOption.BUFFER_SEND: // bytes
785 				static if (!isIntegral!T)
786 					assert(false, "BUFFER_SEND value type must be integral, not " ~ T.stringof);
787 				else {
788 					int val = value.to!int;
789 					socklen_t len = val.sizeof;
790 					err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len);
791 					return errorHandler();
792 				}
793 			case TCPOption.TIMEOUT_RECV:
794 				static if (!is(T == Duration))
795 					assert(false, "TIMEOUT_RECV value type must be Duration, not " ~ T.stringof);
796 				else {
797 					import core.sys.posix.sys.time : timeval;
798 					time_t secs = cast(time_t) value.split!("seconds", "usecs")().seconds;
799 					suseconds_t us;
800 					try us = value.split!("seconds", "usecs")().usecs.to!suseconds_t; catch {}
801 					timeval t = timeval(secs, us);
802 					socklen_t len = t.sizeof;
803 					err = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &t, len);
804 					return errorHandler();
805 				}
806 			case TCPOption.TIMEOUT_SEND:
807 				static if (!is(T == Duration))
808 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
809 				else {
810 					import core.sys.posix.sys.time : timeval;
811 					auto timeout = value.split!("seconds", "usecs")();
812 					timeval t;
813 					try t = timeval(timeout.seconds.to!time_t, timeout.usecs.to!suseconds_t);
814 					catch (Exception) { return false; }
815 					socklen_t len = t.sizeof;
816 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &t, len);
817 					return errorHandler();
818 				}
819 			case TCPOption.TIMEOUT_HALFOPEN:
820 				static if (!is(T == Duration))
821 					assert(false, "TIMEOUT_SEND value type must be Duration, not " ~ T.stringof);
822 				else {
823 					uint val;
824 					try val = value.total!"msecs".to!uint; catch {
825 						return false;
826 					}
827 					socklen_t len = val.sizeof;
828 					err = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &val, len);
829 					return errorHandler();
830 				}
831 			case TCPOption.LINGER: // bool onOff, int seconds
832 				static if (!is(T == Tuple!(bool, int)))
833 					assert(false, "LINGER value type must be Tuple!(bool, int), not " ~ T.stringof);
834 				else {
835 					linger l = linger(val[0]?1:0, val[1]);
836 					socklen_t llen = l.sizeof;
837 					err = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, llen);
838 					return errorHandler();
839 				}
840 			case TCPOption.CONGESTION:
841 				static if (!isIntegral!T)
842 					assert(false, "CONGESTION value type must be integral, not " ~ T.stringof);
843 				else {
844 					int val = value.to!int;
845 					len = int.sizeof;
846 					err = setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, &val, len);
847 					return errorHandler();
848 				}
849 			case TCPOption.CORK:
850 				static if (!isIntegral!T)
851 					assert(false, "CORK value type must be int, not " ~ T.stringof);
852 				else {
853 					static if (EPOLL) {
854 						int val = value.to!int;
855 						socklen_t len = val.sizeof;
856 						err = setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, len);
857 						return errorHandler();
858 					}
859 					else /* if KQUEUE */ {
860 						int val = value.to!int;
861 						socklen_t len = val.sizeof;
862 						err = setsockopt(fd, IPPROTO_TCP, TCP_NOPUSH, &val, len);
863 						return errorHandler();
864 
865 					}
866 				}
867 			case TCPOption.DEFER_ACCEPT: // seconds
868 				static if (!isIntegral!T)
869 					assert(false, "DEFER_ACCEPT value type must be integral, not " ~ T.stringof);
870 				else {
871 					static if (EPOLL) {
872 						int val = value.to!int;
873 						socklen_t len = val.sizeof;
874 						err = setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
875 						return errorHandler();
876 					}
877 					else /* if KQUEUE */ {
878 						// todo: Emulate DEFER_ACCEPT with ACCEPT_FILTER(9)
879 						/*int val = value.to!int;
880 						 socklen_t len = val.sizeof;
881 						 err = setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &val, len);
882 						 return errorHandler();
883 						 */
884 						assert(false, "TCPOption.DEFER_ACCEPT is not implemented");
885 					}
886 				}
887 		}
888 
889 	}
890 
891 	long recv(in fd_t fd, ubyte[] data)
892 	{
893 		static if (LOG) try log("Recv from FD: " ~ fd.to!string); catch {}
894 		m_status = StatusInfo.init;
895 		import async.internals.socket_compat : recv;
896 		retry:
897 			auto ret = cast(long)recv(fd, cast(void*) data.ptr, data.length, 0);
898 
899 		static if (LOG) try log(".recv " ~ ret.to!string ~ " bytes of " ~ data.length.to!string ~ " @ " ~ fd.to!string); catch {}
900 		if (catchError!".recv"(ret)) {
901 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
902 				m_status.code = Status.ASYNC;
903 				Thread.sleep(1.seconds);
904 				goto retry;
905 			} else switch (m_error) with (EPosix) {
906 				case EINTR:
907 					goto retry;
908 				case EBADF, EFAULT, EINVAL, ENOTCONN, ENOTSOCK:
909 					// Encountering any of these in the wild means it's bug hunting season
910 					// assert(false, ".recv encountered terminal socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string);
911 				default:
912 					static if (LOG) try log(".recv encountered socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string); catch {}
913 					break;
914 			}
915 
916 			return -1;
917 		}
918 
919 		m_status.code = Status.OK;
920 		// FIXME: This may overflow
921 		return ret;
922 	}
923 
924 	long send(in fd_t fd, in ubyte[] data)
925 	{
926 		static if (LOG) try log("Send to FD: " ~ fd.to!string); catch {}
927 		m_status = StatusInfo.init;
928 		import async.internals.socket_compat : send;
929 		retry:
930 			auto ret = cast(long)send(fd, cast(const(void)*) data.ptr, data.length, 0);
931 
932 		static if (LOG) try log("Sent: " ~ ret.to!string); catch {}
933 		if (catchError!".send"(ret)) {
934 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
935 				m_status.code = Status.ASYNC;
936 				Thread.sleep(1.seconds);
937 				goto retry;
938 			} else switch (m_error) with (EPosix) {
939 				case EINTR:
940 					goto retry;
941 				case EBADF, ECONNRESET, EDESTADDRREQ, EFAULT, EINVAL, EISCONN, EMSGSIZE, ENOTCONN, ENOTSOCK, EOPNOTSUPP, EPIPE:
942 					// Encountering any of these in the wild means it's bug hunting season
943 					// assert(false, ".send encountered terminal socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string);
944 				default:
945 					static if (LOG) try log(".send encountered socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string); catch {}
946 					break;
947 			}
948 
949 			return -1;
950 		}
951 
952 		m_status.code = Status.OK;
953 		// FIXME: This may overflow
954 		return ret;
955 	}
956 
957 	size_t recvMsg(in fd_t fd, NetworkMessage* msg)
958 	{
959 		import async.internals.socket_compat : recvmsg, msghdr, iovec, sockaddr_storage;
960 
961 		while (true) {
962 			auto err = recvmsg(fd, msg.header, 0);
963 
964 			.tracef("recvmsg system call on FD %d returned %d", fd, err);
965 			if (err == SOCKET_ERROR) {
966 				m_error = lastError();
967 
968 				if (m_error == EPosix.EINTR) {
969 					.tracef("recvmsg system call on FD %d was interrupted before any transfer occured", fd);
970 					continue;
971 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
972 					.tracef("recvmsg system call on FD %d would have blocked", fd);
973 					m_status.code = Status.ASYNC;
974 					Thread.sleep(1.seconds);
975 					continue;
976 				} else if (m_error == EBADF ||
977 				           m_error == EFAULT ||
978 				           m_error == EINVAL ||
979 				           m_error == ENOTCONN ||
980 				           m_error == ENOTSOCK) {
981 					.errorf("recvmsg system call on FD %d encountered fatal socket error: %s", fd, this.error);
982 					return -1; //assert(false);
983 				} else if (catchError!"Receive message"(err)) {
984 					.errorf("recvmsg system call on FD %d encountered socket error: %s", fd, this.error);
985 					return -1;
986 				}
987 			} else {
988 				.tracef("Received %d bytes on FD %d", err, fd);
989 				m_status.code = Status.OK;
990 				return err;
991 			}
992 		}
993 	}
994 
995 	size_t sendMsg(in fd_t fd, NetworkMessage* msg) {
996 		import async.internals.socket_compat : sendmsg;
997 
998 		.tracef("Send message on FD %d with size %d", fd, msg.header.msg_iov.iov_len);
999 		m_status = StatusInfo.init;
1000 
1001 		while (true) {
1002 			auto err = sendmsg(fd, msg.header, 0);
1003 
1004 			.tracef("sendmsg system call on FD %d returned %d", fd, err);
1005 			if (err == SOCKET_ERROR) {
1006 				m_error = lastError();
1007 
1008 				if (m_error == EPosix.EINTR) {
1009 					.tracef("sendmsg system call on FD %d was interrupted before any transfer occured", fd);
1010 					continue;
1011 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1012 					.tracef("sendmsg system call on FD %d would have blocked", fd);
1013 					m_status.code = Status.ASYNC;
1014 					Thread.sleep(1.seconds);
1015 					continue;//return 0;
1016 				} else if (m_error == ECONNRESET ||
1017 				           m_error == EPIPE) {
1018 					return 0;
1019 				} else if (m_error == EBADF ||
1020 				           m_error == EDESTADDRREQ ||
1021 				           m_error == EFAULT ||
1022 				           m_error == EINVAL ||
1023 				           m_error == EISCONN ||
1024 				           m_error == ENOTSOCK ||
1025 				           m_error == EOPNOTSUPP) {
1026 					.errorf("sendmsg system call on FD %d encountered fatal socket error: %s", fd, this.error);
1027 					return -1;//assert(false);
1028 				// ENOTCONN, EMSGSIZE
1029 				} else if (catchError!"Send message"(err)) {
1030 					.errorf("sendmsg system call on FD %d encountered socket error: %s", fd, this.error);
1031 					return 0;
1032 				}
1033 			} else {
1034 				.tracef("Sent %d bytes on FD %d", err, fd);
1035 				m_status.code = Status.OK;
1036 				return err;
1037 			}
1038 		}
1039 	}
1040 
1041 	long recvFrom(in fd_t fd, ubyte[] data, ref NetworkAddress addr)
1042 	{
1043 		import async.internals.socket_compat : recvfrom, AF_INET6, AF_INET, socklen_t;
1044 
1045 		m_status = StatusInfo.init;
1046 
1047 		retry:
1048 			auto addrLen = NetworkAddress.sockAddrMaxLen();
1049 			auto ret = cast(long)recvfrom(fd, cast(void*) data.ptr, data.length, 0, addr.sockAddr, &addrLen);
1050 
1051 		static if (LOG) log(".recvFrom " ~ ret.to!string ~ " bytes @ " ~ fd.to!string);
1052 		if (catchError!".recvFrom"(ret)) {
1053 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1054 				m_status.code = Status.ASYNC;
1055 
1056 				Thread.sleep(1.seconds);
1057 				goto retry;
1058 			} else switch (m_error) with (EPosix) {
1059 				case EINTR:
1060 					goto retry;
1061 				case EBADF, EFAULT, EINVAL, ENOTCONN, ENOTSOCK:
1062 					// Encountering any of these in the wild means it's bug hunting season
1063 					// assert(false, ".recvFrom encountered terminal socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string);
1064 				default:
1065 					static if (LOG) try log(".recvFrom encountered socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string); catch {}
1066 					break;
1067 			}
1068 
1069 			return -1;
1070 		}
1071 
1072 		m_status.code = Status.OK;
1073 		// FIXME: This may overflow
1074 		return ret;
1075 	}
1076 
1077 	long sendTo(in fd_t fd, in ubyte[] data, in NetworkAddress addr)
1078 	{
1079 		import async.internals.socket_compat : sendto;
1080 
1081 		m_status = StatusInfo.init;
1082 
1083 		static if (LOG) try log(".sendTo " ~ data.length.to!string ~ "bytes"); catch{}
1084 		retry:
1085 			auto ret = cast(long)sendto(fd, data.ptr, data.length, 0, addr.sockAddr, addr.sockAddrLen);
1086 
1087 		if (catchError!".sendTo"(ret)) {
1088 			if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1089 				m_status.code = Status.ASYNC;
1090 				Thread.sleep(1.seconds);
1091 				goto retry;
1092 			} else switch (m_error) with (EPosix) {
1093 				case EINTR:
1094 					goto retry;
1095 				case EBADF, ECONNRESET, EDESTADDRREQ, EFAULT, EINVAL, EISCONN, EMSGSIZE, ENOTCONN, ENOTSOCK, EOPNOTSUPP, EPIPE:
1096 					// Encountering any of these in the wild means it's bug hunting season
1097 					// assert(false, ".sendTo encountered terminal socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string);
1098 				default:
1099 					static if (LOG) try log(".sendTo encountered socket error " ~ m_error.to!string ~ " @ " ~ fd.to!string); catch {}
1100 					break;
1101 			}
1102 
1103 			return -1;
1104 		}
1105 
1106 		m_status.code = Status.OK;
1107 		// FIXME: This may overflow
1108 		return ret;
1109 	}
1110 
1111 	NetworkAddress localAddr(in fd_t fd, bool ipv6) {
1112 		NetworkAddress ret;
1113 		import async.internals.socket_compat : getsockname, AF_INET, AF_INET6, socklen_t, sockaddr;
1114 
1115 		if (ipv6)
1116 			ret.family = AF_INET6;
1117 		else
1118 			ret.family = AF_INET;
1119 
1120 		socklen_t len = ret.sockAddrLen;
1121 		int err = getsockname(fd, ret.sockAddr, &len);
1122 		if (catchError!"getsockname"(err))
1123 			return NetworkAddress.init;
1124 		if (len > ret.sockAddrLen)
1125 			ret.family = AF_INET6;
1126 
1127 		return ret;
1128 	}
1129 
1130 	bool notify(in fd_t fd, AsyncNotifier ctxt)
1131 	{
1132 		static if (EPOLL)
1133 		{
1134 			import core.sys.posix.unistd : write;
1135 
1136 			long val = 1;
1137 			fd_t err = cast(fd_t) write(fd, &val, long.sizeof);
1138 
1139 			if (catchError!"write(notify)"(err)) {
1140 				return false;
1141 			}
1142 			return true;
1143 		}
1144 		else /* if KQUEUE */
1145 		{
1146 			kevent_t _event;
1147 			EV_SET(&_event, fd, EVFILT_USER, EV_ENABLE | EV_CLEAR, NOTE_TRIGGER | 0x1, 0, ctxt.evInfo);
1148 			int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1149 
1150 			if (catchError!"kevent_notify"(err)) {
1151 				return false;
1152 			}
1153 			return true;
1154 		}
1155 	}
1156 
1157 	bool notify(in fd_t fd, shared AsyncSignal ctxt)
1158 	{
1159 		static if (EPOLL)
1160 		{
1161 
1162 			sigval sigvl;
1163 			fd_t err;
1164 			sigvl.sival_ptr = cast(void*) ctxt;
1165 			try err = pthread_sigqueue(ctxt.pthreadId, fd, sigvl); catch (Throwable) {}
1166 			if (catchError!"sigqueue"(err)) {
1167 				return false;
1168 			}
1169 		}
1170 		else /* if KQUEUE */
1171 		{
1172 
1173 			import core.thread : getpid;
1174 
1175 			addSignal(ctxt);
1176 
1177 			try {
1178 				static if (LOG) log("Notified fd: " ~ fd.to!string ~ " of PID " ~ getpid().to!string);
1179 				int err = core.sys.posix.signal.kill(getpid(), SIGXCPU);
1180 				if (catchError!"notify(signal)"(err))
1181 					assert(false, "Signal could not be raised");
1182 			} catch (Throwable) {}
1183 		}
1184 
1185 		return true;
1186 	}
1187 
1188 	// no known uses
1189 	uint read(in fd_t fd, ref ubyte[] data)
1190 	{
1191 		m_status = StatusInfo.init;
1192 		return 0;
1193 	}
1194 
1195 	// no known uses
1196 	uint write(in fd_t fd, in ubyte[] data)
1197 	{
1198 		m_status = StatusInfo.init;
1199 		return 0;
1200 	}
1201 
1202 	uint watch(in fd_t fd, in WatchInfo info) {
1203 		// note: info.wd is still 0 at this point.
1204 		m_status = StatusInfo.init;
1205 		import core.sys.linux.sys.inotify;
1206 		import std.file : dirEntries, isDir, SpanMode;
1207 
1208 		static if (EPOLL) {
1209 			// Manually handle recursivity... All events show up under the same inotify
1210 			uint events = info.events; // values for this API were pulled from inotify
1211 			if (events & IN_DELETE)
1212 				events |= IN_DELETE_SELF;
1213 			if (events & IN_MOVED_FROM)
1214 				events |= IN_MOVE_SELF;
1215 
1216 			nothrow fd_t addFolderRecursive(Path path) {
1217 				fd_t ret;
1218 				try {
1219 					ret = inotify_add_watch(fd, path.toNativeString().toStringz, events);
1220 					if (catchError!"inotify_add_watch"(ret))
1221 						return fd_t.init;
1222 					static if (LOG) try log("inotify_add_watch(" ~ DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd).to!string ~ ")"); catch (Throwable) {}
1223 					assert(m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint)ret), DWFolderInfo.init) == DWFolderInfo.init, "Could not get a unique watch descriptor for path, got: " ~ m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)].to!string);
1224 					m_dwFolders[tuple(cast(fd_t)fd, cast(uint)ret)] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1225 				} catch (Exception e) {
1226 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {}
1227 					return 0;
1228 				}
1229 
1230 				if (info.recursive) {
1231 					try {
1232 						foreach (de; path.toNativeString().dirEntries(SpanMode.shallow))
1233 						{
1234 							Path de_path = Path(de.name);
1235 							if (!de_path.absolute)
1236 								de_path = path ~ Path(de.name);
1237 							if (isDir(de_path.toNativeString()))
1238 								if (addFolderRecursive(de_path) == 0)
1239 									continue;
1240 						}
1241 					} catch (Exception e) {
1242 						try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add sub-directories of " ~ path.toNativeString() ~ ": " ~ e.toString() ); catch (Throwable) {}
1243 					}
1244 				}
1245 
1246 				return ret;
1247 			}
1248 
1249 			return addFolderRecursive(info.path);
1250 
1251 		} else /* if KQUEUE */ {
1252 			/// Manually handle recursivity & file tracking. Each folder is an event!
1253 			/// E.g. file creation shows up as a folder change, we must be prepared to seek the file.
1254 			import core.sys.posix.fcntl;
1255 			import async.internals.kqueue;
1256 
1257 			uint events;
1258 			if (info.events & DWFileEvent.CREATED)
1259 				events |= NOTE_LINK | NOTE_WRITE;
1260 			if (info.events & DWFileEvent.DELETED)
1261 				events |= NOTE_DELETE;
1262 			if (info.events & DWFileEvent.MODIFIED)
1263 				events |= NOTE_ATTRIB | NOTE_EXTEND | NOTE_WRITE;
1264 			if (info.events & DWFileEvent.MOVED_FROM)
1265 				events |= NOTE_RENAME;
1266 			if (info.events & DWFileEvent.MOVED_TO)
1267 				events |= NOTE_RENAME;
1268 
1269 			EventInfo* evinfo;
1270 			try {
1271 				evinfo = m_watchers[fd];
1272 			} catch (Throwable) {
1273 				assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher.");
1274 			}
1275 
1276 			/// we need a file descriptor for the containers, so we open files but we don't monitor them
1277 			/// todo: track indexes internally?
1278 			nothrow fd_t addRecursive(Path path, bool is_dir) {
1279 				int ret;
1280 				try {
1281 					static if (LOG) log("Adding path: " ~ path.toNativeString());
1282 
1283 					ret = open(path.toNativeString().toStringz, O_EVTONLY);
1284 					if (catchError!"open(watch)"(ret))
1285 						return 0;
1286 
1287 					if (is_dir)
1288 						m_dwFolders[ret] = DWFolderInfo(WatchInfo(info.events, path, info.recursive, ret), fd);
1289 
1290 					kevent_t _event;
1291 
1292 					EV_SET(&_event, ret, EVFILT_VNODE, EV_ADD | EV_CLEAR, events, 0, cast(void*) evinfo);
1293 
1294 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1295 
1296 					if (catchError!"kevent_timer_add"(err))
1297 						return 0;
1298 
1299 
1300 					if (is_dir) foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
1301 						Path filePath = Path(de.name);
1302 						if (!filePath.absolute)
1303 							filePath = path ~ filePath;
1304 						fd_t fwd;
1305 						if (info.recursive && isDir(filePath.toNativeString()))
1306 							fwd = addRecursive(filePath, true);
1307 						else {
1308 							fwd = addRecursive(filePath, false); // gets an ID but will not scan
1309 							m_dwFiles[fwd] = DWFileInfo(ret, filePath, de.timeLastModified, isDir(filePath.toNativeString()));
1310 						}
1311 
1312 					}
1313 
1314 				} catch (Exception e) {
1315 					try setInternalError!"inotify_add_watch"(Status.ERROR, "Could not add directory " ~ path.toNativeString() ~ ": " ~ e.msg); 
1316 					catch (Throwable) {}
1317 					return 0;
1318 				}
1319 				return ret;
1320 			}
1321 
1322 			fd_t wd;
1323 
1324 			try {
1325 				wd = addRecursive(info.path, isDir(info.path.toNativeString()));
1326 
1327 				if (wd == 0)
1328 					return 0;
1329 
1330 			}
1331 			catch (Exception e) {
1332 				setInternalError!"dw.watch"(Status.ERROR, "Failed to watch directory: " ~ e.msg);
1333 			}
1334 
1335 			return cast(uint) wd;
1336 		}
1337 	}
1338 
1339 	bool unwatch(in fd_t fd, in uint wd) {
1340 		// the wd can be used with m_dwFolders to find the DWFolderInfo
1341 		// and unwatch everything recursively.
1342 
1343 		m_status = StatusInfo.init;
1344 		static if (EPOLL) {
1345 			/// If recursive, all subfolders must also be unwatched recursively by removing them
1346 			/// from containers and from inotify
1347 			import core.sys.linux.sys.inotify;
1348 
1349 			nothrow bool removeAll(DWFolderInfo fi) {
1350 				int err;
1351 				try {
1352 
1353 					bool inotify_unwatch(uint wd) {
1354 						err = inotify_rm_watch(fd, wd);
1355 
1356 						if (catchError!"inotify_rm_watch"(err))
1357 							return false;
1358 						return true;
1359 					}
1360 
1361 					if (!inotify_unwatch(fi.wi.wd))
1362 						return false;
1363 
1364 					/*foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles)
1365 					 {
1366 					 if (file.folder == fi.wi.wd) {
1367 					 inotify_unwatch(id);
1368 					 m_dwFiles.remove(id);
1369 					 }
1370 					 }*/
1371 					m_dwFolders.remove(tuple(cast(fd_t)fd, fi.wi.wd));
1372 
1373 					if (fi.wi.recursive) {
1374 						// find all subdirectories by comparing the path
1375 						Array!(Tuple!(fd_t, uint)) remove_list;
1376 						foreach (ref const key, ref const DWFolderInfo folder; m_dwFolders) {
1377 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1378 
1379 								if (!inotify_unwatch(folder.wi.wd))
1380 									return false;
1381 
1382 								remove_list.insertBack(key);
1383 							}
1384 						}
1385 						foreach (rm_wd; remove_list[])
1386 							m_dwFolders.remove(rm_wd);
1387 
1388 					}
1389 					return true;
1390 				} catch (Exception e) {
1391 					try setInternalError!"inotify_rm_watch"(Status.ERROR, "Could not unwatch directory: " ~ e.toString());
1392 					catch (Throwable) {}
1393 					return false;
1394 				}
1395 			}
1396 
1397 			DWFolderInfo info;
1398 
1399 			try {
1400 				info = m_dwFolders.get(tuple(cast(fd_t) fd, cast(uint) wd), DWFolderInfo.init);
1401 				if (info == DWFolderInfo.init) {
1402 					setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not find watch info for wd " ~ wd.to!string);
1403 					return false;
1404 				}
1405 			} catch (Throwable) { }
1406 
1407 			return removeAll(info);
1408 		}
1409 		else /* if KQUEUE */ {
1410 
1411 			/// Recursivity must be handled manually, so we must unwatch subfiles and subfolders
1412 			/// recursively, remove the container entries, close the file descriptor, and disable the vnode events.
1413 
1414 			nothrow bool removeAll(DWFolderInfo fi) {
1415 				import core.sys.posix.unistd : close;
1416 
1417 
1418 				bool event_unset(uint id) {
1419 					kevent_t _event;
1420 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
1421 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
1422 					if (catchError!"kevent_unwatch"(err))
1423 						return false;
1424 					return true;
1425 				}
1426 
1427 				bool removeFolder(uint wd) {
1428 					if (!event_unset(fi.wi.wd))
1429 						return false;
1430 					m_dwFolders.remove(fi.wi.wd);
1431 					int err = close(fi.wi.wd);
1432 					if (catchError!"close dir"(err))
1433 						return false;
1434 					return true;
1435 				}
1436 
1437 				try {
1438 					removeFolder(fi.wi.wd);
1439 
1440 					if (fi.wi.recursive) {
1441 						import std.container.array;
1442 						Array!fd_t remove_list; // keep track of unwatched folders recursively
1443 						Array!fd_t remove_file_list;
1444 						// search for subfolders and unset them / close their wd
1445 						foreach (ref const DWFolderInfo folder; m_dwFolders) {
1446 							if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
1447 
1448 								if (!event_unset(folder.wi.wd))
1449 									return false;
1450 
1451 								// search for subfiles, close their descriptors and remove them from the file list
1452 								foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {
1453 									if (file.folder == folder.wi.wd) {
1454 										close(fwd);
1455 										remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
1456 									}
1457 								}
1458 
1459 								remove_list.insertBack(folder.wi.wd); // to be removed from m_dwFolders without affecting the loop
1460 							}
1461 						}
1462 
1463 						foreach (wd; remove_file_list[])
1464 							m_dwFiles.remove(wd);
1465 
1466 						foreach (rm_wd; remove_list[])
1467 							removeFolder(rm_wd);
1468 
1469 
1470 					}
1471 				} catch (Exception e) {
1472 					try setInternalError!"dwFolders.get(wd)"(Status.ERROR, "Could not close the folder " ~ fi.to!string ~ ": " ~ e.toString());
1473 					catch (Throwable) {}
1474 					return false;
1475 				}
1476 
1477 				return true;
1478 			}
1479 
1480 			DWFolderInfo info;
1481 			try info = m_dwFolders.get(wd, DWFolderInfo.init); catch (Throwable) {}
1482 
1483 			if (!removeAll(info))
1484 				return false;
1485 			return true;
1486 		}
1487 	}
1488 
1489 	// returns the amount of changes
1490 	uint readChanges(in fd_t fd, ref DWChangeInfo[] dst) {
1491 		m_status = StatusInfo.init;
1492 
1493 		static if (EPOLL) {
1494 			assert(dst.length > 0, "DirectoryWatcher called with 0 length DWChangeInfo array");
1495 			import core.sys.linux.sys.inotify;
1496 			import core.sys.posix.unistd : read;
1497 			import core.stdc.stdio : FILENAME_MAX;
1498 			import core.stdc..string : strlen;
1499 			ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1500 			ssize_t nread = read(fd, buf.ptr, cast(uint)buf.sizeof);
1501 			if (catchError!"read()"(nread))
1502 			{
1503 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1504 					m_status.code = Status.ASYNC;
1505 				return 0;
1506 			}
1507 			assert(nread > 0);
1508 
1509 
1510 			/// starts (recursively) watching all newly created folders in a recursive entry,
1511 			/// creates events for additional files/folders founds, and unwatches all deleted folders
1512 			void recurseInto(DWFolderInfo fi, DWFileEvent ev, ref Array!DWChangeInfo changes) {
1513 				import std.file : dirEntries, SpanMode, isDir;
1514 				assert(fi.wi.recursive);
1515 				// get a list of stuff in the created/moved folder
1516 				if (ev == DWFileEvent.CREATED || ev == DWFileEvent.MOVED_TO) {
1517 					foreach (de; dirEntries(fi.wi.path.toNativeString(), SpanMode.shallow)) {
1518 						Path entryPath = Path(de.name);
1519 						if (!entryPath.absolute)
1520 							entryPath = fi.wi.path ~ entryPath;
1521 
1522 						if (fi.wi.recursive && isDir(entryPath.toNativeString())) {
1523 
1524 							watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
1525 							void genEvents(Path subpath) {
1526 								foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
1527 									auto subsubpath = Path(de.name);
1528 									if (!subsubpath.absolute)
1529 										subsubpath = subpath ~ subsubpath;
1530 									changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
1531 									if (isDir(subsubpath.toNativeString()))
1532 										genEvents(subsubpath);
1533 								}
1534 							}
1535 
1536 							genEvents(entryPath);
1537 
1538 						}
1539 					}
1540 				}
1541 			}
1542 
1543 			size_t i;
1544 			do
1545 			{
1546 				for (auto p = buf.ptr; p < buf.ptr + nread; )
1547 				{
1548 					inotify_event* ev = cast(inotify_event*)p;
1549 					p += inotify_event.sizeof + ev.len;
1550 
1551 					DWFileEvent evtype;
1552 					evtype = DWFileEvent.CREATED;
1553 					if (ev.mask & IN_CREATE)
1554 						evtype = DWFileEvent.CREATED;
1555 					if (ev.mask & IN_DELETE || ev.mask & IN_DELETE_SELF)
1556 						evtype = DWFileEvent.DELETED;
1557 					if (ev.mask & IN_MOVED_FROM || ev.mask & IN_MOVE_SELF)
1558 						evtype = DWFileEvent.MOVED_FROM;
1559 					if (ev.mask & (IN_MOVED_TO))
1560 						evtype = DWFileEvent.MOVED_TO;
1561 					if (ev.mask & IN_MODIFY)
1562 						evtype = DWFileEvent.MODIFIED;
1563 
1564 					import std.path : buildPath;
1565 					import core.stdc..string : strlen;
1566 					string name = cast(string) ev.name.ptr[0 .. cast(size_t) ev.name.ptr.strlen].idup;
1567 					DWFolderInfo fi;
1568 					Path path;
1569 					try {
1570 						fi = m_dwFolders.get(tuple(cast(fd_t)fd,cast(uint)ev.wd), DWFolderInfo.init);
1571 						if (fi == DWFolderInfo.init) {
1572 							setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders: " ~ ev.wd.to!string);
1573 							continue;
1574 						}
1575 						path = fi.wi.path ~ Path(name);
1576 					}
1577 					catch (Exception e) {
1578 						setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders");
1579 						return 0;
1580 					}
1581 
1582 					dst[i] = DWChangeInfo(evtype, path);
1583 					import std.file : isDir;
1584 					bool is_dir;
1585 					try is_dir = isDir(path.toNativeString()); catch (Throwable) {}
1586 					if (fi.wi.recursive && is_dir) {
1587 
1588 						try {
1589 							Array!DWChangeInfo changes;
1590 							recurseInto(fi, evtype, changes);
1591 							// stop watching if the folder was deleted
1592 							if (evtype == DWFileEvent.DELETED || evtype == DWFileEvent.MOVED_FROM) {
1593 								unwatch(fi.fd, fi.wi.wd);
1594 							}
1595 							foreach (change; changes[]) {
1596 								i++;
1597 								if (dst.length <= i)
1598 									dst ~= change;
1599 								else dst[i] = change;
1600 							}
1601 						}
1602 						catch (Exception e) {
1603 							setInternalError!"recurseInto"(Status.ERROR, "Failed to watch/unwatch contents of folder recursively.");
1604 							return 0;
1605 						}
1606 
1607 					}
1608 
1609 
1610 					i++;
1611 					if (i >= dst.length)
1612 						return cast(uint) i;
1613 				}
1614 				static if (LOG) foreach (j; 0 .. i) {
1615 					static if (LOG) try log("Change occured for FD#" ~ fd.to!string ~ ": " ~ dst[j].to!string); catch {}
1616 				}
1617 				nread = read(fd, buf.ptr, buf.sizeof);
1618 				if (catchError!"read()"(nread)) {
1619 					if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN)
1620 						m_status.code = Status.ASYNC;
1621 					return cast(uint) i;
1622 				}
1623 			} while (nread > 0);
1624 
1625 			return cast(uint) i;
1626 		}
1627 		else /* if KQUEUE */ {
1628 			Array!(DWChangeInfo)* changes;
1629 			size_t i;
1630 			try {
1631 				changes = m_changes[fd];
1632 				import std.algorithm : min;
1633 				size_t cnt = min(dst.length, changes.length);
1634 				foreach (DWChangeInfo change; (*changes)[0 .. cnt]) {
1635 					dst[i] = (*changes)[i];
1636 					i++;
1637 				}
1638 				changes.linearRemove((*changes)[0 .. cnt]);
1639 			}
1640 			catch (Exception e) {
1641 				setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
1642 				return false;
1643 			}
1644 			return cast(uint) i;
1645 		}
1646 	}
1647 
1648 	void submitRequest(AsyncAcceptRequest* request)
1649 	{
1650 		request.socket.m_pendingAccepts.insertBack(request);
1651 		processPendingAccepts(request.socket);
1652 	}
1653 
1654 	void submitRequest(AsyncReceiveRequest* request)
1655 	{
1656 		request.socket.m_pendingReceives.insertBack(request);
1657 		processPendingReceives(request.socket);
1658 	}
1659 
1660 	void submitRequest(AsyncSendRequest* request)
1661 	{
1662 		request.socket.m_pendingSends.insertBack(request);
1663 		processPendingSends(request.socket);
1664 	}
1665 
1666 	bool broadcast(in fd_t fd, bool b) {
1667 		m_status = StatusInfo.init;
1668 
1669 		import async.internals.socket_compat : socklen_t, setsockopt, SO_BROADCAST, SOL_SOCKET;
1670 
1671 		int val = b?1:0;
1672 		socklen_t len = val.sizeof;
1673 		int err = setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &val, len);
1674 		if (catchError!"setsockopt"(err))
1675 			return false;
1676 
1677 		return true;
1678 	}
1679 
1680 	private bool closeRemoteSocket(fd_t fd, bool forced) {
1681 
1682 		int err;
1683 		static if (LOG) log("shutdown");
1684 		import async.internals.socket_compat : shutdown, SHUT_WR, SHUT_RDWR, SHUT_RD;
1685 		if (forced)
1686 			err = shutdown(fd, SHUT_RDWR);
1687 		else
1688 			err = shutdown(fd, SHUT_WR);
1689 
1690 		static if (!EPOLL) {
1691 			kevent_t[2] events;
1692 			static if (LOG) try log("!!DISC delete events"); catch {}
1693 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
1694 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
1695 			kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
1696 		}
1697 
1698 		if (err == SOCKET_ERROR && errno == ENOTCONN) {
1699 			// The socket has already been shut down, we can recover from that
1700 		} else  if (catchError!"shutdown"(err)) {
1701 			return false;
1702 		}
1703 
1704 		return true;
1705 	}
1706 
1707 	// for connected sockets
1708 	bool closeSocket(fd_t fd, bool connected, bool forced = false)
1709 	{
1710 		static if (LOG) log("closeSocket");
1711 		if (connected && !closeRemoteSocket(fd, forced) && !forced)
1712 			return false;
1713 
1714 		if (!connected || forced) {
1715 			// todo: flush the socket here?
1716 
1717 			import core.sys.posix.unistd : close;
1718 			static if (LOG) log("close");
1719 			int err = close(fd);
1720 			if (catchError!"closesocket"(err))
1721 				return false;
1722 		}
1723 		return true;
1724 	}
1725 
1726 
1727 	NetworkAddress getAddressFromIP(in string ipAddr, in ushort port = 0, in bool ipv6 = false, in bool tcp = true)
1728 	{
1729 		import async.internals.socket_compat : addrinfo, AI_NUMERICHOST, AI_NUMERICSERV;
1730 		addrinfo hints;
1731 		hints.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV; // Specific to an IP resolver!
1732 
1733 		return getAddressInfo(ipAddr, port, ipv6, tcp, hints);
1734 	}
1735 
1736 
1737 	NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true)
1738 		/*in {
1739 		 debug import async.internals.validator : validateHost;
1740 		 debug assert(validateHost(host), "Trying to connect to an invalid domain");
1741 		 }
1742 		body */{
1743 		import async.internals.socket_compat : addrinfo;
1744 		addrinfo hints;
1745 		return getAddressInfo(host, port, ipv6, tcp, hints);
1746 	}
1747 
1748 	void setInternalError(string TRACE)(in Status s, string details = "", error_t error = cast(EPosix) errno())
1749 	{
1750 		if (details.length > 0)
1751 			m_status.text = TRACE ~ ": " ~ details;
1752 		else m_status.text = TRACE;
1753 		m_error = error;
1754 		m_status.code = s;
1755 		static if(LOG) log(m_status);
1756 	}
1757 private:
1758 
1759 	void processPendingAccepts(AsyncSocket socket)
1760 	{
1761 		if (socket.readBlocked) return;
1762 		foreach (request; socket.m_pendingAccepts) {
1763 			// Try to accept a single connection on the socket
1764 			auto result = attemptConnectionAcceptance(socket);
1765 			request.peer = result[0];
1766 			request.family = result[1];
1767 
1768 			if (status.code != Status.OK && !socket.readBlocked) {
1769 				socket.kill();
1770 				socket.handleError();
1771 				return;
1772 			} else if (request.peer != INVALID_SOCKET) {
1773 				socket.m_pendingAccepts.removeFront();
1774 				m_completedSocketAccepts.insertBack(request);
1775 			} else {
1776 				break;
1777 			}
1778 		}
1779 	}
1780 
1781 	void processPendingReceives(AsyncSocket socket)
1782 	{
1783 		if (socket.readBlocked) return;
1784 		foreach (request; socket.m_pendingReceives) {
1785 			// Try to fit all bytes available in the OS receive buffer
1786 			// into the current request's message's buffer, or try a
1787 			// a zero byte receive, should there be no such message.
1788 			bool received = void;
1789 			if (request.message) received = attemptMessageReception(socket, request.message);
1790 			else received = attemptZeroByteReceive(socket);
1791 
1792 			if (status.code != Status.OK && !socket.readBlocked) {
1793 				if (received) m_completedSocketReceives.insertBack(request);
1794 				socket.kill();
1795 				socket.handleError();
1796 				return;
1797 			} else if (request.exact) {
1798 				if (request.message.receivedAll) {
1799 					socket.m_pendingReceives.removeFront();
1800 					m_completedSocketReceives.insertBack(request);
1801 				} else {
1802 					break;
1803 				}
1804 			// New bytes or zero-sized connectionless datagram
1805 			} else if (received || !socket.connectionOriented && !socket.readBlocked) {
1806 				socket.m_pendingReceives.removeFront();
1807 				m_completedSocketReceives.insertBack(request);
1808 			} else {
1809 				break;
1810 			}
1811 		}
1812 	}
1813 
1814 	void processPendingSends(AsyncSocket socket)
1815 	{
1816 		if (socket.writeBlocked) return;
1817 		foreach (request; socket.m_pendingSends) {
1818 			// Try to fit all bytes of the current request's buffer
1819 			// into the OS send buffer.
1820 			auto sent = attemptMessageTransmission(socket, request.message);
1821 
1822 			if (status.code != Status.OK && !socket.writeBlocked) {
1823 				socket.kill();
1824 				socket.handleError();
1825 				return;
1826 			} else if (sent) {
1827 				socket.m_pendingSends.removeFront();
1828 				m_completedSocketSends.insertBack(request);
1829 			} else {
1830 				break;
1831 			}
1832 		}
1833 	}
1834 
1835 	auto attemptConnectionAcceptance(AsyncSocket socket)
1836 	{
1837 		import core.sys.posix.fcntl : O_NONBLOCK;
1838 		import async.internals.socket_compat : accept, accept4, sockaddr_storage, socklen_t;
1839 
1840 		fd_t peer = void;
1841 		sockaddr_storage remote = void;
1842 		socklen_t remoteLength = remote.sizeof;
1843 
1844 		enum common = q{
1845 			if (peer == SOCKET_ERROR) {
1846 				m_error = lastError();
1847 
1848 				if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1849 					m_status.code = Status.ASYNC;
1850 					socket.readBlocked = true;
1851 					return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1852 				} else if (m_error == EBADF ||
1853 				           m_error == EINTR ||
1854 				           m_error == EINVAL ||
1855 				           m_error == ENOTSOCK ||
1856 				           m_error == EOPNOTSUPP ||
1857 				           m_error == EFAULT) {
1858 					assert(false, "accept{4} system call on FD " ~ socket.handle.to!string ~ " encountered fatal socket error: " ~ this.error);
1859 				} else if (catchError!"accept"(peer)) {
1860 					.errorf("accept{4} system call on FD %d encountered socket error: %s", socket.handle, this.error);
1861 					return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1862 				}
1863 			}
1864 		};
1865 
1866 		version (linux) {
1867 			peer = accept4(socket.handle, cast(sockaddr*) &remote, &remoteLength, O_NONBLOCK);
1868 			mixin(common);
1869 		} else {
1870 			peer = accept(socket.handle, cast(sockaddr*) &remote, &remoteLength);
1871 			mixin(common);
1872 			if (!setNonBlock(peer)) {
1873 				.error("Failed to set accepted peer socket non-blocking");
1874 				return tuple(INVALID_SOCKET, sockaddr.sa_family.init);
1875 			}
1876 		}
1877 
1878 		return tuple(peer, remote.ss_family);
1879 	}
1880 
1881 	bool attemptZeroByteReceive(AsyncSocket socket)
1882 	{
1883 		import async.internals.socket_compat : recv, MSG_PEEK;
1884 
1885 		ubyte buffer = void;
1886 		auto fd = socket.handle;
1887 
1888 		while (true) {
1889 			auto err = recv(fd, &buffer, 1, MSG_PEEK);
1890 
1891 			.tracef("recv system call on FD %d returned %d", fd, err);
1892 			if (err == SOCKET_ERROR) {
1893 				m_error = lastError();
1894 
1895 				if (m_error == EPosix.EINTR) {
1896 					.tracef("recv system call on FD %d was interrupted before any transfer occured", fd);
1897 					continue;
1898 				} else if (m_error == EPosix.EWOULDBLOCK || m_error == EPosix.EAGAIN) {
1899 					.tracef("recv system call on FD %d would have blocked", fd);
1900 					m_status.code = Status.ASYNC;
1901 					socket.readBlocked = true;
1902 					return false;
1903 				} else if (m_error == EBADF ||
1904 				           m_error == EFAULT ||
1905 				           m_error == EINVAL ||
1906 				           m_error == ENOTCONN ||
1907 				           m_error == ENOTSOCK) {
1908 					.errorf("recv system call on FD %d encountered fatal socket error: %s", fd, this.error);
1909 					assert(false);
1910 				} else if (catchError!"Receive message"(err)) {
1911 					.errorf("recv system call on FD %d encountered socket error: %s", fd, this.error);
1912 					return false;
1913 				}
1914 			} else {
1915 				.tracef("Received %d bytes on FD %d", err, fd);
1916 				m_status.code = Status.OK;
1917 				if (socket.connectionOriented && !err) {
1918 					socket.readBlocked = true;
1919 				}
1920 				return err > 0;
1921 			}
1922 		}
1923 	}
1924 
1925 	/**
1926 	 * Appends as much of the bytes currently available in the OS receive
1927 	 * buffer to the given message's transferred bytes as the message's
1928 	 * buffer's remaining free bytes and the state of the OS receive buffer
1929 	 * allow for, advancing the message's count of transferred bytes in the process.
1930 	 * Sets $(D readBlocked) on indication by the OS that there were
1931 	 * not enough bytes available in the OS receive buffer.
1932 	 * Returns: $(D true) if any bytes were transferred.
1933 	 */
1934 	bool attemptMessageReception(AsyncSocket socket, NetworkMessage* msg)
1935 	in {
1936 		assert(socket.connectionOriented && !msg.receivedAll || !msg.receivedAny, "Message already received");
1937 	} body {
1938 		bool received;
1939 		size_t recvCount = void;
1940 
1941 		if (socket.datagramOriented) {
1942 			recvCount = recvMsg(socket.handle, msg);
1943 			msg.count = msg.count + recvCount;
1944 			received = received || recvCount > 0;
1945 		} else do {
1946 			recvCount = recvMsg(socket.handle, msg);
1947 			msg.count = msg.count + recvCount;
1948 			received = received || recvCount > 0;
1949 		} while (recvCount > 0 && !msg.receivedAll);
1950 
1951 		// More bytes may yet become available in the future
1952 		if (status.code == Status.ASYNC) {
1953 			socket.readBlocked = true;
1954 		// Connection was shutdown in an orderly fashion by the remote peer
1955 		} else if (socket.connectionOriented && status.code == Status.OK && !recvCount) {
1956 			socket.readBlocked = true;
1957 		}
1958 
1959 		return received;
1960 	}
1961 
1962 	/**
1963 	 * Transfers as much of the given message's untransferred bytes
1964 	 * into the OS send buffer as the latter's state allows for,
1965 	 * advancing the message's count of transferred bytes in the process.
1966 	 * Sets $(DDOC_MEMBERS writeBlocked) on indication by the OS that
1967 	 * there was not enough space available in the OS send buffer.
1968 	 * Returns: $(D true) if all of the message's bytes
1969 	 *          have been transferred.
1970 	 */
1971 	bool attemptMessageTransmission(AsyncSocket socket, NetworkMessage* msg)
1972 	in { assert(!msg.sent, "Message already sent"); }
1973 	body {
1974 		size_t sentCount = void;
1975 
1976 		do {
1977 			sentCount = sendMsg(socket.handle, msg);
1978 			msg.count = msg.count + sentCount;
1979 		} while (sentCount > 0 && !msg.sent);
1980 
1981 		if (status.code == Status.ASYNC) {
1982 			socket.writeBlocked = true;
1983 		}
1984 
1985 		return msg.sent;
1986 	}
1987 
1988 	/// For DirectoryWatcher
1989 	/// In kqueue/vnode, all we get is the folder in which changes occured.
1990 	/// We have to figure out what changed exactly and put the results in a container
1991 	/// for the readChanges call.
1992 	static if (!EPOLL) bool compareFolderFiles(DWFolderInfo fi, DWFileEvent events) {
1993 		import std.file;
1994 		import std.path : buildPath;
1995 		try {
1996 			Array!Path currFiles;
1997 			auto wd = fi.wi.wd;
1998 			auto path = fi.wi.path;
1999 			auto fd = fi.fd;
2000 			Array!(DWChangeInfo)* changes = m_changes.get(fd, null);
2001 			assert(changes !is null, "Invalid wd, could not find changes array.");
2002 			//import std.stdio : writeln;
2003 			//writeln("Scanning path: ", path.toNativeString());
2004 			//writeln("m_dwFiles length: ", m_dwFiles.length);
2005 
2006 			// get a list of the folder
2007 			foreach (de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
2008 				//writeln(de.name);
2009 				Path entryPath = Path(de.name);
2010 				if (!entryPath.absolute)
2011 					entryPath = path ~ entryPath;
2012 				bool found;
2013 
2014 				if (!de.isDir()) {
2015 					// compare it to the cached list fixme: make it faster using another container?
2016 					foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
2017 						if (file.folder != wd) continue; // this file isn't in the evented folder
2018 						if (file.path == entryPath) {
2019 							found = true;
2020 							static if (LOG) log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
2021 							// Check if it was modified
2022 							if (!isDir(entryPath.toNativeString()) && de.timeLastModified > file.lastModified)
2023 							{
2024 								DWFileInfo dwf = file;
2025 								dwf.lastModified = de.timeLastModified;
2026 								m_dwFiles[id] = dwf;
2027 								changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
2028 							}
2029 							break;
2030 						}
2031 					}
2032 				} else {
2033 					foreach (ref const DWFolderInfo folder; m_dwFolders) {
2034 						if (folder.wi.path == entryPath) {
2035 							found = true;
2036 							break;
2037 						}
2038 					}
2039 				}
2040 
2041 				// This file/folder is new in the folder
2042 				if (!found) {
2043 					changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, entryPath));
2044 
2045 					if (fi.wi.recursive && de.isDir()) {
2046 						/// This is the complicated part. The folder needs to be watched, and all the events
2047 						/// generated for every file/folder found recursively inside it,
2048 						/// Useful e.g. when mkdir -p is used.
2049 						watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, wd) );
2050 						void genEvents(Path subpath) {
2051 							foreach (de; dirEntries(subpath.toNativeString(), SpanMode.shallow)) {
2052 								auto subsubpath = Path(de.name);
2053 								if (!subsubpath.absolute())
2054 									subsubpath = subpath ~ subsubpath;
2055 								changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
2056 								if (isDir(subsubpath.toNativeString()))
2057 									genEvents(subsubpath);
2058 							}
2059 						}
2060 
2061 						genEvents(entryPath);
2062 
2063 					}
2064 					else {
2065 						EventInfo* evinfo;
2066 						try evinfo = m_watchers[fd]; catch(Throwable) { assert(false, "Could retrieve event info, directory watcher was not initialized properly, or you are operating on a closed directory watcher."); }
2067 
2068 						static if (LOG) log("Adding path: " ~ path.toNativeString());
2069 
2070 						import core.sys.posix.fcntl : open;
2071 						fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
2072 						if (catchError!"open(watch)"(fwd))
2073 							return 0;
2074 
2075 						kevent_t _event;
2076 
2077 						EV_SET(&_event, fwd, EVFILT_VNODE, EV_ADD | EV_CLEAR, fi.wi.events, 0, cast(void*) evinfo);
2078 
2079 						int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2080 
2081 						if (catchError!"kevent_timer_add"(err))
2082 							return 0;
2083 
2084 						m_dwFiles[fwd] = DWFileInfo(fi.wi.wd, entryPath, de.timeLastModified, false);
2085 
2086 					}
2087 				}
2088 
2089 				// This file/folder is now current. This avoids a deletion event.
2090 				currFiles.insert(entryPath);
2091 			}
2092 
2093 			/// Now search for files/folders that were deleted in this directory (no recursivity needed).
2094 			/// Unwatch this directory and generate delete event only for the root dir
2095 			foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
2096 				if (file.folder != wd) continue; // skip those files in another folder than the evented one
2097 				bool found;
2098 				foreach (Path curr; currFiles) {
2099 					if (file.path == curr){
2100 						found = true;
2101 						break;
2102 					}
2103 				}
2104 				// this file/folder was in the folder but it's not there anymore
2105 				if (!found) {
2106 					// writeln("Deleting: ", file.path.toNativeString());
2107 					kevent_t _event;
2108 					EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
2109 					int err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2110 					if (catchError!"kevent_unwatch"(err))
2111 						return false;
2112 					import core.sys.posix.unistd : close;
2113 					err = close(id);
2114 					if (catchError!"close(dwFile)"(err))
2115 						return false;
2116 					changes.insert(DWChangeInfo(DWFileEvent.DELETED, file.path));
2117 
2118 					if (fi.wi.recursive && file.is_dir)
2119 						unwatch(fd, id);
2120 
2121 					m_dwFiles.remove(id);
2122 
2123 				}
2124 
2125 			}
2126 			if(changes.empty)
2127 				return false; // unhandled event, skip the callback
2128 
2129 			// fixme: how to implement moved_from moved_to for rename?
2130 		}
2131 		catch (Exception e)
2132 		{
2133 			try setInternalError!"compareFiles"(Status.ERROR, "Fatal error in file comparison: " ~ e.toString()); catch(Throwable) {}
2134 			return false;
2135 		}
2136 		return true;
2137 	}
2138 
2139 	// socket must not be connected
2140 	bool setNonBlock(fd_t fd) {
2141 		import core.sys.posix.fcntl : fcntl, F_GETFL, F_SETFL, O_NONBLOCK;
2142 		int flags = fcntl(fd, F_GETFL);
2143 		flags |= O_NONBLOCK;
2144 		int err = fcntl(fd, F_SETFL, flags);
2145 		if (catchError!"F_SETFL O_NONBLOCK"(err)) {
2146 			closeSocket(fd, false);
2147 			return false;
2148 		}
2149 		return true;
2150 	}
2151 
2152 	bool onTCPAccept(fd_t fd, TCPAcceptHandler del, int events)
2153 	{
2154 		import async.internals.socket_compat : AF_INET, AF_INET6, socklen_t, accept4, accept;
2155 		enum O_NONBLOCK     = 0x800;    // octal    04000
2156 
2157 		static if (EPOLL)
2158 		{
2159 			const uint epoll_events = cast(uint) events;
2160 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
2161 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2162 		}
2163 		else
2164 		{
2165 			const short kqueue_events = cast(short) (events >> 16);
2166 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2167 			const bool incoming = cast(bool)(kqueue_events & EVFILT_READ);
2168 			const bool error = cast(bool)(kqueue_flags & EV_ERROR);
2169 		}
2170 
2171 		if (incoming) { // accept incoming connection
2172 			do {
2173 				NetworkAddress addr;
2174 				addr.family = AF_INET;
2175 				socklen_t addrlen = addr.sockAddrLen;
2176 
2177 				bool ret;
2178 				static if (EPOLL) {
2179 					/// Accept the connection and create a client socket
2180 					fd_t csock = accept4(fd, addr.sockAddr, &addrlen, O_NONBLOCK);
2181 
2182 					if (catchError!".accept"(csock)) {
2183 						return true;// this way we know there's nothing left to accept
2184 					}
2185 				} else /* if KQUEUE */ {
2186 					fd_t csock = accept(fd, addr.sockAddr, &addrlen);
2187 
2188 					if (catchError!".accept"(csock)) {
2189 						return true;
2190 					}
2191 
2192 					// Make non-blocking so subsequent calls to recv/send return immediately
2193 					if (!setNonBlock(csock)) {
2194 						continue;
2195 					}
2196 				}
2197 
2198 				// Set client address family based on address length
2199 				if (addrlen > addr.sockAddrLen)
2200 					addr.family = AF_INET6;
2201 				if (addrlen == socklen_t.init) {
2202 					setInternalError!"addrlen"(Status.ABORT);
2203 					import core.sys.posix.unistd : close;
2204 					close(csock);
2205 					continue;
2206 				}
2207 
2208 				// Allocate a new connection handler object
2209 				AsyncTCPConnection conn;
2210 				try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop);
2211 				catch (Exception e){ assert(false, "Allocation failure"); }
2212 				conn.peer = addr;
2213 				conn.socket = csock;
2214 				conn.inbound = true;
2215 
2216 				nothrow void closeClient() {
2217 					try ThreadMem.free(conn);
2218 					catch (Exception e){ assert(false, "Free failure"); }
2219 					closeSocket(csock, true, true);
2220 				}
2221 
2222 				// Get the connection handler from the callback
2223 				TCPEventHandler evh;
2224 				try {
2225 					evh = del(conn);
2226 					if (evh == TCPEventHandler.init || !initTCPConnection(csock, conn, evh, true)) {
2227 						static if (LOG) try log("Failed to connect"); catch {}
2228 						closeClient();
2229 						continue;
2230 					}
2231 					static if (LOG) try log("Connection Started with " ~ csock.to!string); catch {}
2232 				}
2233 				catch (Exception e) {
2234 					static if (LOG) log("Close socket");
2235 					closeClient();
2236 					continue;
2237 				}
2238 
2239 				// Announce connection state to the connection handler
2240 				try {
2241 					static if (LOG) log("Connected to: " ~ addr.toString());
2242 					evh.conn.connected = true;
2243 					evh(TCPEvent.CONNECT);
2244 				}
2245 				catch (Exception e) {
2246 					closeClient();
2247 					setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2248 				}
2249 				/*if (m_status.code == Status.ABORT)
2250 				{
2251 					try evh(TCPEvent.ERROR);
2252 					catch {}
2253 				}*/
2254 			} while(true);
2255 
2256 		}
2257 
2258 		if (error) { // socket failure
2259 			m_status.text = "listen socket error";
2260 			int err;
2261 			import async.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2262 			socklen_t len = int.sizeof;
2263 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2264 			m_error = cast(error_t) err;
2265 			m_status.code = Status.ABORT;
2266 			static if(LOG) log(m_status);
2267 
2268 			// call with null to announce a failure
2269 			try del(null);
2270 			catch(Exception e){ assert(false, "Failure calling TCPAcceptHandler(null)"); }
2271 
2272 			/// close the listener?
2273 			// closeSocket(fd, false);
2274 		}
2275 		return true;
2276 	}
2277 
2278 	bool onUDPTraffic(fd_t fd, UDPHandler del, int events)
2279 	{
2280 		static if (EPOLL)
2281 		{
2282 			const uint epoll_events = cast(uint) events;
2283 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2284 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2285 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2286 		}
2287 		else
2288 		{
2289 			const short kqueue_events = cast(short) (events >> 16);
2290 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2291 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2292 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2293 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2294 		}
2295 
2296 		if (read) {
2297 			try {
2298 				del(UDPEvent.READ);
2299 			}
2300 			catch (Exception e) {
2301 				setInternalError!"del@UDPEvent.READ"(Status.ABORT);
2302 				return false;
2303 			}
2304 		}
2305 
2306 		if (write) {
2307 
2308 			try {
2309 				del(UDPEvent.WRITE);
2310 			}
2311 			catch (Exception e) {
2312 				setInternalError!"del@UDPEvent.WRITE"(Status.ABORT);
2313 				return false;
2314 			}
2315 		}
2316 
2317 		if (error) // socket failure
2318 		{
2319 
2320 			import async.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
2321 			import core.sys.posix.unistd : close;
2322 			int err;
2323 			socklen_t errlen = err.sizeof;
2324 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
2325 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
2326 			close(fd);
2327 			return false;
2328 		}
2329 
2330 		return true;
2331 	}
2332 
2333 	bool onEvent(fd_t fd, EventHandler del, int events)
2334 	{
2335 		bool connect = void, close = void;
2336 		auto conn = del.ev;
2337 
2338 		static if (EPOLL)
2339 		{
2340 			const uint epoll_events = cast(uint) events;
2341 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2342 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2343 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2344 			if (conn.stateful) {
2345 				connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
2346 				close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2347 			}
2348 		}
2349 		else
2350 		{
2351 			const short kqueue_events = cast(short) (events >> 16);
2352 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2353 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2354 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2355 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2356 			if (conn.stateful) {
2357 				connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
2358 				close = cast(bool) (kqueue_flags & EV_EOF);
2359 			}
2360 		}
2361 
2362 		if (write && (!conn.stateful || conn.connected && !conn.disconnecting && conn.writeBlocked)) {
2363 			if (conn.stateful) conn.writeBlocked = false;
2364 			static if (LOG) try log("!write"); catch {}
2365 			try {
2366 				del(EventCode.WRITE);
2367 			}
2368 			catch (Exception e) {
2369 				setInternalError!"del@Event.WRITE"(Status.ABORT);
2370 				return false;
2371 			}
2372 		}
2373 
2374 		if (read && (!conn.stateful || conn.connected && !conn.disconnecting)) {
2375 			static if (LOG) try log("!read"); catch {}
2376 			try {
2377 				del(EventCode.READ);
2378 			}
2379 			catch (Exception e) {
2380 				setInternalError!"del@Event.READ"(Status.ABORT);
2381 				return false;
2382 			}
2383 		}
2384 
2385 		if (conn.stateful && close && conn.connected && !conn.disconnecting)
2386 		{
2387 			static if (LOG) try log("!close"); catch {}
2388 			// todo: See if this hack is still necessary
2389 			if (!conn.connected && conn.disconnecting)
2390 				return true;
2391 
2392 			try del(EventCode.CLOSE);
2393 			catch (Exception e) {
2394 				setInternalError!"del@Event.CLOSE"(Status.ABORT);
2395 				return false;
2396 			}
2397 
2398 			// Careful here, the delegate might have closed the connection already
2399 			if (conn.connected) {
2400 				closeSocket(fd, !conn.disconnecting, conn.connected);
2401 
2402 				m_status.code = Status.ABORT;
2403 				conn.disconnecting = true;
2404 				conn.connected = false;
2405 				conn.writeBlocked = true;
2406 				conn.id = 0;
2407 
2408 				try ThreadMem.free(conn.evInfo);
2409 				catch (Exception e){ assert(false, "Error freeing resources"); }
2410 			}
2411 			return true;
2412 		}
2413 
2414 		if (error) // failure
2415 		{
2416 			setInternalError!"EPOLLERR"(Status.ABORT, null);
2417 			try {
2418 				del(EventCode.ERROR);
2419 			}
2420 			catch (Exception e)
2421 			{
2422 				setInternalError!"del@Event.ERROR"(Status.ABORT);
2423 				// ignore failure...
2424 			}
2425 			return false;
2426 		}
2427 
2428 		if (conn.stateful && connect) {
2429 			static if (LOG) try log("!connect"); catch {}
2430 			conn.connected = true;
2431 			try del(EventCode.CONNECT);
2432 			catch (Exception e) {
2433 				setInternalError!"del@Event.CONNECT"(Status.ABORT);
2434 				return false;
2435 			}
2436 			return true;
2437 		}
2438 
2439 		return true;
2440 	}
2441 
2442 	/// Handle an event for a connectionless socket
2443 	bool onCLSocketEvent(AsyncSocket socket, int events)
2444 	{
2445 		static if (EPOLL)
2446 		{
2447 			const uint epoll_events = cast(uint) events;
2448 			const bool read = cast(bool) (epoll_events & EPOLLIN);
2449 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2450 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2451 		}
2452 		else
2453 		{
2454 			const short kqueue_events = cast(short) (events >> 16);
2455 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2456 			const bool read = cast(bool) (kqueue_events & EVFILT_READ);
2457 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2458 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2459 		}
2460 
2461 		if (read) {
2462 			tracef("Read on FD %d", socket.handle);
2463 
2464 			socket.readBlocked = false;
2465 			processPendingReceives(socket);
2466 		}
2467 
2468 		if (write) {
2469 			tracef("Write on FD %d", socket.handle);
2470 
2471 			socket.writeBlocked = false;
2472 			processPendingSends(socket);
2473 		}
2474 
2475 		if (error) {
2476 			tracef("Error on FD %d", socket.handle);
2477 
2478 			auto err = cast(error_t) socket.lastError;
2479 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2480 			socket.kill();
2481 			socket.handleError();
2482 			return false;
2483 		}
2484 
2485 		return true;
2486 	}
2487 
2488 	/// Handle an event for a connection-oriented, active socket
2489 	bool onCOASocketEvent(AsyncSocket socket, int events)
2490 	{
2491 		static if (EPOLL) {
2492 			const uint epoll_events = cast(uint) events;
2493 			bool read = cast(bool) (epoll_events & EPOLLIN);
2494 			bool write = cast(bool) (epoll_events & EPOLLOUT);
2495 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2496 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !socket.disconnecting && !socket.connected;
2497 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2498 		} else {
2499 			const short kqueue_events = cast(short) (events >> 16);
2500 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2501 			bool read = cast(bool) (kqueue_events & EVFILT_READ);
2502 			bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2503 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2504 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !socket.disconnecting && !socket.connected);
2505 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
2506 		}
2507 
2508 		tracef("AsyncSocket events: (read: %s, write: %s, error: %s, connect: %s, close: %s)", read, write, error, connect, close);
2509 
2510 		if (error) {
2511 			tracef("Error on FD %d", socket.handle);
2512 
2513 			auto err = cast(error_t) socket.lastError;
2514 			if (err == ECONNRESET ||
2515 			    err == EPIPE) {
2516 				socket.kill();
2517 				socket.handleClose();
2518 				return true;
2519 			}
2520 
2521 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2522 			socket.kill();
2523 			socket.handleError();
2524 			return false;
2525 		}
2526 
2527 		if (connect) {
2528 			tracef("Connect on FD %d", socket.handle);
2529 
2530 			socket.connected = true;
2531 			socket.readBlocked = false;
2532 			socket.writeBlocked = false;
2533 			socket.handleConnect();
2534 			read = false;
2535 			write = false;
2536 		}
2537 
2538 		if ((/+read ||+/ write) && socket.connected && !socket.disconnecting && socket.writeBlocked) {
2539 			tracef("Write on FD %d", socket.handle);
2540 
2541 			socket.writeBlocked = false;
2542 			processPendingSends(socket);
2543 		}/+ else {
2544 			read = true;
2545 		}+/
2546 
2547 		if (read && socket.connected && !socket.disconnecting && socket.readBlocked) {
2548 			tracef("Read on FD %d", socket.handle);
2549 
2550 			socket.readBlocked = false;
2551 			processPendingReceives(socket);
2552 		}
2553 
2554 		if (close && socket.connected && !socket.disconnecting)
2555 		{
2556 			tracef("Close on FD %d", socket.handle);
2557 			socket.kill();
2558 			socket.handleClose();
2559 			return true;
2560 		}
2561 
2562 		return true;
2563 	}
2564 
2565 	/// Handle an event for a connection-oriented, passive socket
2566 	bool onCOPSocketEvent(AsyncSocket socket, int events)
2567 	{
2568 		import core.sys.posix.fcntl : O_NONBLOCK;
2569 		import async.internals.socket_compat : accept, accept4, sockaddr, socklen_t;
2570 
2571 		static if (EPOLL) {
2572 			const uint epoll_events = cast(uint) events;
2573 			const bool incoming = cast(bool) (epoll_events & EPOLLIN);
2574 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2575 		} else {
2576 			const short kqueue_events = cast(short) (events >> 16);
2577 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2578 			const bool incoming = cast(bool) (kqueue_events & EVFILT_READ);
2579 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2580 		}
2581 
2582 		tracef("AsyncSocket events: (incoming: %s, error: %s)", incoming, error);
2583 
2584 		if (incoming) {
2585 			tracef("Incoming on FD %d", socket.handle);
2586 
2587 			socket.readBlocked = false;
2588 			processPendingAccepts(socket);
2589 		}
2590 
2591 		if (error) {
2592 			tracef("Error on FD %d", socket.handle);
2593 
2594 			auto err = cast(error_t) socket.lastError;
2595 			setInternalError!"AsyncSocket.ERROR"(Status.ABORT, null, cast(error_t) err);
2596 			socket.kill();
2597 			socket.handleError();
2598 			return false;
2599 		}
2600 
2601 		return true;
2602 	}
2603 
2604 	bool onTCPTraffic(fd_t fd, TCPEventHandler del, int events, AsyncTCPConnection conn)
2605 	{
2606 		//log("TCP Traffic at FD#" ~ fd.to!string);
2607 
2608 		static if (EPOLL)
2609 		{
2610 			const uint epoll_events = cast(uint) events;
2611 			const bool connect = ((cast(bool) (epoll_events & EPOLLIN)) || (cast(bool) (epoll_events & EPOLLOUT))) && !conn.disconnecting && !conn.connected;
2612 			bool read = cast(bool) (epoll_events & EPOLLIN);
2613 			const bool write = cast(bool) (epoll_events & EPOLLOUT);
2614 			const bool error = cast(bool) (epoll_events & EPOLLERR);
2615 			const bool close = (cast(bool) (epoll_events & EPOLLRDHUP)) || (cast(bool) (events & EPOLLHUP));
2616 		}
2617 		else /* if KQUEUE */
2618 		{
2619 			const short kqueue_events = cast(short) (events >> 16);
2620 			const ushort kqueue_flags = cast(ushort) (events & 0xffff);
2621 			const bool connect = cast(bool) ((kqueue_events & EVFILT_READ || kqueue_events & EVFILT_WRITE) && !conn.disconnecting && !conn.connected);
2622 			bool read = cast(bool) (kqueue_events & EVFILT_READ) && !connect;
2623 			const bool write = cast(bool) (kqueue_events & EVFILT_WRITE);
2624 			const bool error = cast(bool) (kqueue_flags & EV_ERROR);
2625 			const bool close = cast(bool) (kqueue_flags & EV_EOF);
2626 		}
2627 
2628 		if (error)
2629 		{
2630 			import async.internals.socket_compat : socklen_t, getsockopt, SOL_SOCKET, SO_ERROR;
2631 			int err;
2632 			static if (LOG) try log("Also got events: " ~ connect.to!string ~ " c " ~ read.to!string ~ " r " ~ write.to!string ~ " write"); catch {}
2633 			socklen_t errlen = err.sizeof;
2634 			getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
2635 			setInternalError!"EPOLLERR"(Status.ABORT, null, cast(error_t)err);
2636 			try
2637 				del(TCPEvent.ERROR);
2638 			catch (Exception e)
2639 			{
2640 				setInternalError!"del@TCPEvent.ERROR"(Status.ABORT);
2641 				// ignore failure...
2642 			}
2643 			return false;
2644 		}
2645 
2646 
2647 		if (connect)
2648 		{
2649 			static if (LOG) try log("!connect"); catch {}
2650 			conn.connected = true;
2651 			try del(TCPEvent.CONNECT);
2652 			catch (Exception e) {
2653 				setInternalError!"del@TCPEvent.CONNECT"(Status.ABORT);
2654 				return false;
2655 			}
2656 			return true;
2657 		}
2658 
2659 
2660 		if ((read || write) && conn.connected && !conn.disconnecting && conn.writeBlocked)
2661 		{
2662 			conn.writeBlocked = false;
2663 			static if (LOG) try log("!write"); catch {}
2664 			try del(TCPEvent.WRITE);
2665 			catch (Exception e) {
2666 				setInternalError!"del@TCPEvent.WRITE"(Status.ABORT);
2667 				return false;
2668 			}
2669 		}
2670 		else {
2671 			read = true;
2672 		}
2673 
2674 		if (read && conn.connected && !conn.disconnecting)
2675 		{
2676 			static if (LOG) try log("!read"); catch {}
2677 			try del(TCPEvent.READ);
2678 			catch (Exception e) {
2679 				setInternalError!"del@TCPEvent.READ"(Status.ABORT);
2680 				return false;
2681 			}
2682 		}
2683 
2684 		if (close && conn.connected && !conn.disconnecting)
2685 		{
2686 			static if (LOG) try log("!close"); catch {}
2687 			// todo: See if this hack is still necessary
2688 			if (!conn.connected && conn.disconnecting)
2689 				return true;
2690 
2691 			try del(TCPEvent.CLOSE);
2692 			catch (Exception e) {
2693 				setInternalError!"del@TCPEvent.CLOSE"(Status.ABORT);
2694 				return false;
2695 			}
2696 
2697 			// Careful here, the delegate might have closed the connection already
2698 			if (conn.connected) {
2699 				closeSocket(fd, !conn.disconnecting, conn.connected);
2700 
2701 				m_status.code = Status.ABORT;
2702 				conn.disconnecting = true;
2703 				conn.connected = false;
2704 				conn.writeBlocked = true;
2705 				del.conn.socket = 0;
2706 
2707 				try ThreadMem.free(del.conn.evInfo);
2708 				catch (Exception e){ assert(false, "Error freeing resources"); }
2709 
2710 				if (del.conn.inbound) {
2711 					static if (LOG) log("Freeing inbound connection");
2712 					try ThreadMem.free(del.conn);
2713 					catch (Exception e){ assert(false, "Error freeing resources"); }
2714 				}
2715 			}
2716 		}
2717 		return true;
2718 	}
2719 
2720 	bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt, UDPHandler del)
2721 	{
2722 		import async.internals.socket_compat : bind;
2723 		import core.sys.posix.unistd;
2724 
2725 		fd_t err;
2726 
2727 		EventObject eo;
2728 		eo.udpHandler = del;
2729 		EventInfo* ev;
2730 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.UDPSocket, eo, m_instanceId);
2731 		catch (Exception e){ assert(false, "Allocation error"); }
2732 		ctxt.evInfo = ev;
2733 		nothrow bool closeAll() {
2734 			try ThreadMem.free(ev);
2735 			catch(Exception e){ assert(false, "Failed to free resources"); }
2736 			ctxt.evInfo = null;
2737 			// socket will be closed by caller if return false
2738 			return false;
2739 		}
2740 
2741 		static if (EPOLL)
2742 		{
2743 			epoll_event _event;
2744 			_event.data.ptr = ev;
2745 			_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
2746 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2747 			if (catchError!"epoll_ctl"(err)) {
2748 				return closeAll();
2749 			}
2750 			nothrow void deregisterEvent()
2751 			{
2752 				epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &_event);
2753 			}
2754 		}
2755 		else /* if KQUEUE */
2756 		{
2757 			kevent_t[2] _event;
2758 			EV_SET(&(_event[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
2759 			EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, ev);
2760 			err = kevent(m_kqueuefd, &(_event[0]), 2, null, 0, null);
2761 			if (catchError!"kevent_add_udp"(err))
2762 				return closeAll();
2763 
2764 			nothrow void deregisterEvent() {
2765 				EV_SET(&(_event[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2766 				EV_SET(&(_event[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2767 				kevent(m_kqueuefd, &(_event[0]), 2, null, 0, cast(async.internals.kqueue.timespec*) null);
2768 			}
2769 
2770 		}
2771 
2772 		/// Start accepting packets
2773 		err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2774 		if (catchError!"bind"(err)) {
2775 			deregisterEvent();
2776 			return closeAll();
2777 		}
2778 
2779 		return true;
2780 	}
2781 
2782 	bool initTCPListener(fd_t fd, AsyncTCPListener ctxt, TCPAcceptHandler del, bool reusing = false)
2783 	in {
2784 		assert(ctxt.local !is NetworkAddress.init);
2785 	}
2786 	body {
2787 		import async.internals.socket_compat : bind, listen, SOMAXCONN;
2788 		fd_t err;
2789 
2790 		/// Create callback object
2791 		EventObject eo;
2792 		eo.tcpAcceptHandler = del;
2793 		EventInfo* ev;
2794 
2795 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPAccept, eo, m_instanceId);
2796 		catch (Exception e){ assert(false, "Allocation error"); }
2797 		ctxt.evInfo = ev;
2798 		nothrow bool closeAll() {
2799 			try ThreadMem.free(ev);
2800 			catch(Exception e){ assert(false, "Failed free"); }
2801 			ctxt.evInfo = null;
2802 			// Socket is closed by run()
2803 			//closeSocket(fd, false);
2804 			return false;
2805 		}
2806 
2807 		/// Add socket to event loop
2808 		static if (EPOLL)
2809 		{
2810 			epoll_event _event;
2811 			_event.data.ptr = ev;
2812 			_event.events = EPOLLIN | EPOLLET;
2813 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2814 			if (catchError!"epoll_ctl_add"(err))
2815 				return closeAll();
2816 
2817 			nothrow void deregisterEvent() {
2818 				// epoll cleans itself when closing the socket
2819 			}
2820 		}
2821 		else /* if KQUEUE */
2822 		{
2823 			kevent_t _event;
2824 			EV_SET(&_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, ev);
2825 			err = kevent(m_kqueuefd, &_event, 1, null, 0, null);
2826 			if (catchError!"kevent_add_listener"(err))
2827 				return closeAll();
2828 
2829 			nothrow void deregisterEvent() {
2830 				EV_SET(&_event, fd, EVFILT_READ, EV_CLEAR | EV_DISABLE, 0, 0, null);
2831 				kevent(m_kqueuefd, &_event, 1, null, 0, null);
2832 				// wouldn't know how to deal with errors here...
2833 			}
2834 		}
2835 
2836 		/// Bind and listen to socket
2837 		if (!reusing) {
2838 			err = bind(fd, ctxt.local.sockAddr, ctxt.local.sockAddrLen);
2839 			if (catchError!"bind"(err)) {
2840 				deregisterEvent();
2841 				return closeAll();
2842 			}
2843 
2844 			err = listen(fd, SOMAXCONN);
2845 			if (catchError!"listen"(err)) {
2846 				deregisterEvent();
2847 				return closeAll();
2848 			}
2849 
2850 		}
2851 		return true;
2852 	}
2853 
2854 	bool initTCPConnection(fd_t fd, AsyncTCPConnection ctxt, TCPEventHandler del, bool inbound = false)
2855 	in {
2856 		assert(ctxt.peer.port != 0, "Connecting to an invalid port");
2857 	}
2858 	body {
2859 
2860 		fd_t err;
2861 
2862 		/// Create callback object
2863 		import async.internals.socket_compat : connect;
2864 		EventObject eo;
2865 		eo.tcpEvHandler = del;
2866 		EventInfo* ev;
2867 
2868 		try ev = ThreadMem.alloc!EventInfo(fd, EventType.TCPTraffic, eo, m_instanceId);
2869 		catch (Exception e){ assert(false, "Allocation error"); }
2870 		assert(ev !is null);
2871 		ctxt.evInfo = ev;
2872 		nothrow bool destroyEvInfo() {
2873 			try ThreadMem.free(ev);
2874 			catch(Exception e){ assert(false, "Failed to free resources"); }
2875 			ctxt.evInfo = null;
2876 
2877 			// Socket will be closed by run()
2878 			// closeSocket(fd, false);
2879 			return false;
2880 		}
2881 
2882 		/// Add socket and callback object to event loop
2883 		static if (EPOLL)
2884 		{
2885 			epoll_event _event = void;
2886 			_event.data.ptr = ev;
2887 			_event.events = 0 | EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
2888 			err = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &_event);
2889 			static if (LOG) log("Connection FD#" ~ fd.to!string ~ " added to " ~ m_epollfd.to!string);
2890 			if (catchError!"epoll_ctl_add"(err))
2891 				return destroyEvInfo();
2892 
2893 			nothrow void deregisterEvent() {
2894 				// will be handled automatically when socket is closed
2895 			}
2896 		}
2897 		else /* if KQUEUE */
2898 		{
2899 			kevent_t[2] events = void;
2900 			static if (LOG) try log("Register event ptr " ~ ev.to!string); catch {}
2901 			assert(ev.evType == EventType.TCPTraffic, "Bad event type for TCP Connection");
2902 			EV_SET(&(events[0]), fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2903 			EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, cast(void*) ev);
2904 			assert((cast(EventInfo*)events[0].udata) == ev && (cast(EventInfo*)events[1].udata) == ev);
2905 			assert((cast(EventInfo*)events[0].udata).owner == m_instanceId && (cast(EventInfo*)events[1].udata).owner == m_instanceId);
2906 			err = kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2907 			if (catchError!"kevent_add_tcp"(err))
2908 				return destroyEvInfo();
2909 
2910 			// todo: verify if this allocates on the GC?
2911 			nothrow void deregisterEvent() {
2912 				EV_SET(&(events[0]), fd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, null);
2913 				EV_SET(&(events[1]), fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, null);
2914 				kevent(m_kqueuefd, &(events[0]), 2, null, 0, null);
2915 				// wouldn't know how to deal with errors here...
2916 			}
2917 		}
2918 
2919 		// Inbound objects are already connected
2920 		if (inbound) return true;
2921 
2922 		// Connect is blocking, but this makes the socket non-blocking for send/recv
2923 		if (!setNonBlock(fd)) {
2924 			deregisterEvent();
2925 			return destroyEvInfo();
2926 		}
2927 
2928 		/// Start the connection
2929 		err = connect(fd, ctxt.peer.sockAddr, ctxt.peer.sockAddrLen);
2930 		if (catchErrorsEq!"connect"(err, [ tuple(cast(fd_t)SOCKET_ERROR, EPosix.EINPROGRESS, Status.ASYNC) ]))
2931 			return true;
2932 		if (catchError!"connect"(err)) {
2933 			deregisterEvent();
2934 			return destroyEvInfo();
2935 		}
2936 
2937 		return true;
2938 	}
2939 
2940 	pragma(inline, true)
2941 	bool catchError(string TRACE, T)(T val, T cmp = SOCKET_ERROR)
2942 		if (isIntegral!T)
2943 	{
2944 		if (val == cmp) {
2945 			m_status.text = TRACE;
2946 			m_error = lastError();
2947 			m_status.code = Status.ABORT;
2948 			static if(LOG) log(m_status);
2949 			return true;
2950 		}
2951 		return false;
2952 	}
2953 
2954 	pragma(inline, true)
2955 	bool catchSocketError(string TRACE)(fd_t fd)
2956 	{
2957 		m_status.text = TRACE;
2958 		int err;
2959 		import async.internals.socket_compat : getsockopt, socklen_t, SOL_SOCKET, SO_ERROR;
2960 		socklen_t len = int.sizeof;
2961 		getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
2962 		m_error = cast(error_t) err;
2963 		if (m_error != EPosix.EOK) {
2964 			m_status.code = Status.ABORT;
2965 			static if(LOG) log(m_status);
2966 			return true;
2967 		}
2968 
2969 		return false;
2970 	}
2971 
2972 	bool catchEvLoopErrors(string TRACE, T)(T val, Tuple!(T, Status)[] cmp ...)
2973 		if (isIntegral!T)
2974 	{
2975 		if (val == SOCKET_ERROR) {
2976 			int err = errno;
2977 			foreach (validator ; cmp) {
2978 				if (errno == validator[0]) {
2979 					m_status.text = TRACE;
2980 					m_error = lastError();
2981 					m_status.code = validator[1];
2982 					static if(LOG) log(m_status);
2983 					return true;
2984 				}
2985 			}
2986 
2987 			m_status.text = TRACE;
2988 			m_status.code = Status.EVLOOP_FAILURE;
2989 			m_error = lastError();
2990 			static if (LOG) log(m_status);
2991 			return true;
2992 		}
2993 		return false;
2994 	}
2995 
2996 	/**
2997 	 * If the value at val matches the tuple first argument T, get the last error,
2998 	 * and if the last error matches tuple second argument error_t, set the Status as
2999 	 * tuple third argument Status.
3000 	 *
3001 	 * Repeats for each comparison tuple until a match in which case returns true.
3002 	 */
3003 	bool catchErrorsEq(string TRACE, T)(T val, Tuple!(T, error_t, Status)[] cmp ...)
3004 		if (isIntegral!T)
3005 	{
3006 		error_t err;
3007 		foreach (validator ; cmp) {
3008 			if (val == validator[0]) {
3009 				if (err is EPosix.init) err = lastError();
3010 				if (err == validator[1]) {
3011 					m_status.text = TRACE;
3012 					m_status.code = validator[2];
3013 					if (m_status.code == Status.EVLOOP_TIMEOUT) {
3014 						static if (LOG) log(m_status);
3015 						break;
3016 					}
3017 					m_error = lastError();
3018 					static if(LOG) log(m_status);
3019 					return true;
3020 				}
3021 			}
3022 		}
3023 		return false;
3024 	}
3025 
3026 	pragma(inline, true)
3027 	error_t lastError() {
3028 		try {
3029 			return cast(error_t) errno;
3030 		} catch(Exception e) {
3031 			return EPosix.EACCES;
3032 		}
3033 
3034 	}
3035 
3036 	void log(StatusInfo val)
3037 	{
3038 		static if (LOG) {
3039 			import std.stdio;
3040 			try {
3041 				writeln("Backtrace: ", m_status.text);
3042 				writeln(" | Status:  ", m_status.code);
3043 				writeln(" | Error: " , m_error);
3044 				if ((m_error in EPosixMessages) !is null)
3045 					writeln(" | Message: ", EPosixMessages[m_error]);
3046 			} catch(Exception e) {
3047 				return;
3048 			}
3049 		}
3050 	}
3051 
3052 	void log(T)(T val)
3053 	{
3054 		static if (LOG) {
3055 			import std.stdio;
3056 			try {
3057 				writeln(val);
3058 			} catch(Exception e) {
3059 				return;
3060 			}
3061 		}
3062 	}
3063 
3064 	NetworkAddress getAddressInfo(addrinfo)(in string host, ushort port, bool ipv6, bool tcp, ref addrinfo hints)
3065 	{
3066 		m_status = StatusInfo.init;
3067 		import async.internals.socket_compat : AF_INET, AF_INET6, SOCK_DGRAM, SOCK_STREAM, IPPROTO_TCP, IPPROTO_UDP, freeaddrinfo, getaddrinfo;
3068 
3069 		NetworkAddress addr;
3070 		addrinfo* infos;
3071 		error_t err;
3072 		if (ipv6) {
3073 			addr.family = AF_INET6;
3074 			hints.ai_family = AF_INET6;
3075 		}
3076 		else {
3077 			addr.family = AF_INET;
3078 			hints.ai_family = AF_INET;
3079 		}
3080 		if (tcp) {
3081 			hints.ai_socktype = SOCK_STREAM;
3082 			hints.ai_protocol = IPPROTO_TCP;
3083 		}
3084 		else {
3085 			hints.ai_socktype = SOCK_DGRAM;
3086 			hints.ai_protocol = IPPROTO_UDP;
3087 		}
3088 
3089 		static if (LOG) {
3090 			log("Resolving " ~ host ~ ":" ~ port.to!string);
3091 		}
3092 
3093 		auto chost = host.toStringz();
3094 
3095 		if (port != 0) {
3096 			addr.port = port;
3097 			const(char)* cPort = cast(const(char)*) port.to!string.toStringz;
3098 			err = cast(error_t) getaddrinfo(chost, cPort, &hints, &infos);
3099 		}
3100 		else {
3101 			err = cast(error_t) getaddrinfo(chost, null, &hints, &infos);
3102 		}
3103 
3104 		if (err != EPosix.EOK) {
3105 			setInternalError!"getAddressInfo"(Status.ERROR, string.init, err);
3106 			return NetworkAddress.init;
3107 		}
3108 		ubyte* pAddr = cast(ubyte*) infos.ai_addr;
3109 		ubyte* data = cast(ubyte*) addr.sockAddr;
3110 		data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
3111 		freeaddrinfo(infos);
3112 		return addr;
3113 	}
3114 
3115 
3116 
3117 }
3118 
3119 
3120 static if (!EPOLL)
3121 {
3122 	import std.container : Array;
3123 	import core.sync.mutex : Mutex;
3124 	import core.sync.rwmutex : ReadWriteMutex;
3125 	size_t g_evIdxCapacity;
3126 	Array!size_t g_evIdxAvailable;
3127 
3128 	// called on run
3129 	nothrow size_t createIndex() {
3130 		size_t idx;
3131 		import std.algorithm : max;
3132 		try {
3133 
3134 			size_t getIdx() {
3135 
3136 				if (!g_evIdxAvailable.empty) {
3137 					immutable size_t ret = g_evIdxAvailable.back;
3138 					g_evIdxAvailable.removeBack();
3139 					return ret;
3140 				}
3141 				return 0;
3142 			}
3143 
3144 			idx = getIdx();
3145 			if (idx == 0) {
3146 				import std.range : iota;
3147 				g_evIdxAvailable.insert( iota(g_evIdxCapacity, max(32, g_evIdxCapacity * 2), 1) );
3148 				g_evIdxCapacity = max(32, g_evIdxCapacity * 2);
3149 				idx = getIdx();
3150 			}
3151 
3152 		} catch (Throwable e) {
3153 			static if (DEBUG) {
3154 				import std.stdio : writeln;
3155 				try writeln(e.toString()); catch {}
3156 			}
3157 
3158 		}
3159 		return idx;
3160 	}
3161 
3162 	nothrow void destroyIndex(AsyncNotifier ctxt) {
3163 		try {
3164 			g_evIdxAvailable.insert(ctxt.id);
3165 		}
3166 		catch (Exception e) {
3167 			assert(false, "Error destroying index: " ~ e.msg);
3168 		}
3169 	}
3170 
3171 	nothrow void destroyIndex(AsyncTimer ctxt) {
3172 		try {
3173 			g_evIdxAvailable.insert(ctxt.id);
3174 		}
3175 		catch (Exception e) {
3176 			assert(false, "Error destroying index: " ~ e.msg);
3177 		}
3178 	}
3179 
3180 	size_t* g_threadId;
3181 	size_t g_idxCapacity;
3182 	Array!size_t g_idxAvailable;
3183 
3184 	__gshared ReadWriteMutex gs_queueMutex;
3185 	__gshared Array!(Array!AsyncSignal) gs_signalQueue;
3186 	__gshared Array!(Array!size_t) gs_idxQueue; // signals notified
3187 
3188 
3189 	// loop
3190 	nothrow bool popSignals(ref AsyncSignal[] sigarr) {
3191 		bool more;
3192 		try {
3193 			foreach (ref AsyncSignal sig; sigarr) {
3194 				if (!sig)
3195 					break;
3196 				sig = null;
3197 			}
3198 			size_t len;
3199 			synchronized(gs_queueMutex.reader) {
3200 
3201 				if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
3202 					return false;
3203 
3204 				len = gs_idxQueue[*g_threadId].length;
3205 				import std.stdio;
3206 				if (sigarr.length < len) {
3207 					more = true;
3208 					len = sigarr.length;
3209 				}
3210 
3211 				size_t i;
3212 				foreach (size_t idx; gs_idxQueue[*g_threadId][0 .. len]){
3213 					sigarr[i] = gs_signalQueue[*g_threadId][idx];
3214 					i++;
3215 				}
3216 			}
3217 
3218 			synchronized (gs_queueMutex.writer) {
3219 				gs_idxQueue[*g_threadId].linearRemove(gs_idxQueue[*g_threadId][0 .. len]);
3220 			}
3221 		}
3222 		catch (Exception e) {
3223 			assert(false, "Could not get pending signals: " ~ e.msg);
3224 		}
3225 		return more;
3226 	}
3227 
3228 	// notify
3229 	nothrow void addSignal(shared AsyncSignal ctxt) {
3230 		try {
3231 			size_t thread_id = ctxt.threadId;
3232 			bool must_resize;
3233 			import std.stdio;
3234 			synchronized (gs_queueMutex.writer) {
3235 				if (gs_idxQueue.empty || gs_idxQueue.length < thread_id + 1) {
3236 					gs_idxQueue.reserve(thread_id + 1);
3237 					foreach (i; gs_idxQueue.length .. gs_idxQueue.capacity) {
3238 						gs_idxQueue.insertBack(Array!size_t.init);
3239 					}
3240 				}
3241 				if (gs_idxQueue[thread_id].empty)
3242 				{
3243 					gs_idxQueue[thread_id].reserve(32);
3244 				}
3245 
3246 				gs_idxQueue[thread_id].insertBack(ctxt.id);
3247 
3248 			}
3249 
3250 		}
3251 		catch (Exception e) {
3252 			assert(false, "Array error: " ~ e.msg);
3253 		}
3254 	}
3255 
3256 	// called on run
3257 	nothrow size_t createIndex(shared AsyncSignal ctxt) {
3258 		size_t idx;
3259 		import std.algorithm : max;
3260 		try {
3261 			bool must_resize;
3262 
3263 			synchronized (gs_queueMutex.reader) {
3264 				if (gs_signalQueue.length < *g_threadId)
3265 					must_resize = true;
3266 			}
3267 
3268 			/// make sure the signal queue is big enough for this thread ID
3269 			if (must_resize) {
3270 				synchronized (gs_queueMutex.writer) {
3271 					while (gs_signalQueue.length <= *g_threadId)
3272 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
3273 				}
3274 			}
3275 
3276 			size_t getIdx() {
3277 
3278 				if (!g_idxAvailable.empty) {
3279 					immutable size_t ret = g_idxAvailable.back;
3280 					g_idxAvailable.removeBack();
3281 					return ret;
3282 				}
3283 				return 0;
3284 			}
3285 
3286 			idx = getIdx();
3287 			if (idx == 0) {
3288 				import std.range : iota;
3289 				g_idxAvailable.insert( iota(g_idxCapacity + 1,  max(32, g_idxCapacity * 2), 1) );
3290 				g_idxCapacity = g_idxAvailable[$-1];
3291 				idx = getIdx();
3292 			}
3293 
3294 			synchronized (gs_queueMutex.writer) {
3295 				if (gs_signalQueue.empty || gs_signalQueue.length < *g_threadId + 1) {
3296 
3297 					gs_signalQueue.reserve(*g_threadId + 1);
3298 					foreach (i; gs_signalQueue.length .. gs_signalQueue.capacity) {
3299 						gs_signalQueue.insertBack(Array!AsyncSignal.init);
3300 					}
3301 
3302 				}
3303 
3304 				if (gs_signalQueue[*g_threadId].empty || gs_signalQueue[*g_threadId].length < idx + 1) {
3305 
3306 					gs_signalQueue[*g_threadId].reserve(idx + 1);
3307 					foreach (i; gs_signalQueue[*g_threadId].length .. gs_signalQueue[*g_threadId].capacity) {
3308 						gs_signalQueue[*g_threadId].insertBack(cast(AsyncSignal)null);
3309 					}
3310 
3311 				}
3312 
3313 				gs_signalQueue[*g_threadId][idx] = cast(AsyncSignal) ctxt;
3314 			}
3315 		} catch(Throwable) {}
3316 
3317 		return idx;
3318 	}
3319 
3320 	// called on kill
3321 	nothrow void destroyIndex(shared AsyncSignal ctxt) {
3322 		try {
3323 			g_idxAvailable.insert(ctxt.id);
3324 			synchronized (gs_queueMutex.writer) {
3325 				gs_signalQueue[*g_threadId][ctxt.id] = null;
3326 			}
3327 		}
3328 		catch (Exception e) {
3329 			assert(false, "Error destroying index: " ~ e.msg);
3330 		}
3331 	}
3332 }
3333 
3334 mixin template COSocketMixins() {
3335 
3336 	private CleanupData m_impl;
3337 
3338 	struct CleanupData {
3339 		EventInfo* evInfo;
3340 		bool connected;
3341 		bool disconnecting;
3342 		bool writeBlocked;
3343 		bool readBlocked;
3344 	}
3345 
3346 	@property bool disconnecting() const @safe pure @nogc {
3347 		return m_impl.disconnecting;
3348 	}
3349 
3350 	@property void disconnecting(bool b) @safe pure @nogc {
3351 		m_impl.disconnecting = b;
3352 	}
3353 
3354 	@property bool connected() const @safe pure @nogc {
3355 		return m_impl.connected;
3356 	}
3357 
3358 	@property void connected(bool b) @safe pure @nogc {
3359 		m_impl.connected = b;
3360 	}
3361 
3362 	@property bool writeBlocked() const @safe pure @nogc {
3363 		return m_impl.writeBlocked;
3364 	}
3365 
3366 	@property void writeBlocked(bool b) @safe pure @nogc {
3367 		m_impl.writeBlocked = b;
3368 	}
3369 
3370 	@property bool readBlocked() const @safe pure @nogc {
3371 		return m_impl.readBlocked;
3372 	}
3373 
3374 	@property void readBlocked(bool b) @safe pure @nogc {
3375 		m_impl.readBlocked = b;
3376 	}
3377 
3378 	@property EventInfo* evInfo() @safe pure @nogc {
3379 		return m_impl.evInfo;
3380 	}
3381 
3382 	@property void evInfo(EventInfo* info) @safe pure @nogc {
3383 		m_impl.evInfo = info;
3384 	}
3385 
3386 }
3387 
3388 mixin template EvInfoMixinsShared() {
3389 
3390 	private CleanupData m_impl;
3391 
3392 	shared struct CleanupData {
3393 		EventInfo* evInfo;
3394 	}
3395 
3396 	static if (EPOLL) {
3397 		import core.sys.posix.pthread : pthread_t;
3398 		private pthread_t m_pthreadId;
3399 		synchronized @property pthread_t pthreadId() {
3400 			return cast(pthread_t) m_pthreadId;
3401 		}
3402 		/* todo: support multiple event loops per thread?
3403 		 private ushort m_sigId;
3404 		 synchronized @property ushort sigId() {
3405 		 return cast(ushort)m_loopId;
3406 		 }
3407 		 synchronized @property void sigId(ushort id) {
3408 		 m_loopId = cast(shared)id;
3409 		 }
3410 		 */
3411 	}
3412 	else /* if KQUEUE */
3413 	{
3414 		private shared(size_t)* m_owner_id;
3415 		synchronized @property size_t threadId() {
3416 			return cast(size_t) *m_owner_id;
3417 		}
3418 	}
3419 
3420 	@property shared(EventInfo*) evInfo() {
3421 		return m_impl.evInfo;
3422 	}
3423 
3424 	@property void evInfo(shared(EventInfo*) info) {
3425 		m_impl.evInfo = info;
3426 	}
3427 
3428 }
3429 
3430 mixin template EvInfoMixins() {
3431 
3432 	private CleanupData m_impl;
3433 
3434 	struct CleanupData {
3435 		EventInfo* evInfo;
3436 	}
3437 
3438 	@property EventInfo* evInfo() {
3439 		return m_impl.evInfo;
3440 	}
3441 
3442 	@property void evInfo(EventInfo* info) {
3443 		m_impl.evInfo = info;
3444 	}
3445 }
3446 
3447 union EventObject {
3448 	TCPAcceptHandler tcpAcceptHandler;
3449 	TCPEventHandler tcpEvHandler;
3450 	AsyncSocket socket;
3451 	TimerHandler timerHandler;
3452 	DWHandler dwHandler;
3453 	UDPHandler udpHandler;
3454 	NotifierHandler notifierHandler;
3455 	EventHandler eventHandler;
3456 }
3457 
3458 enum EventType : char {
3459 	TCPAccept,
3460 	TCPTraffic,
3461 	UDPSocket,
3462 	Socket,
3463 	Notifier,
3464 	Signal,
3465 	Timer,
3466 	DirectoryWatcher,
3467 	Event // custom
3468 }
3469 
3470 struct EventInfo {
3471 	fd_t fd;
3472 	EventType evType;
3473 	EventObject evObj;
3474 	ushort owner;
3475 }