1 module async.event.kqueue; 2 3 debug import std.stdio; 4 5 version (Posix) 6 { 7 import core.sys.darwin.sys.event; 8 } 9 else version (FreeBSD) 10 { 11 import core.sys.freebsd.sys.event; 12 } 13 else version (DragonFlyBSD) 14 { 15 import core.sys.dragonflybsd.sys.event; 16 } 17 18 version (OSX) 19 { 20 version = KQUEUE; 21 } 22 else version (iOS) 23 { 24 version = KQUEUE; 25 } 26 else version (TVOS) 27 { 28 version = KQUEUE; 29 } 30 else version (WatchOS) 31 { 32 version = KQUEUE; 33 } 34 else version (FreeBSD) 35 { 36 version = KQUEUE; 37 } 38 else version (OpenBSD) 39 { 40 version = KQUEUE; 41 } 42 else version (DragonFlyBSD) 43 { 44 version = KQUEUE; 45 } 46 47 version (KQUEUE): 48 49 import core.stdc.errno; 50 import core.sys.posix.signal; 51 import core.sys.posix.netinet.tcp; 52 import core.sys.posix.netinet.in_; 53 import core.sys.posix.time; 54 55 import std.socket; 56 57 import async.event.selector; 58 import async.net.tcpstream; 59 import async.net.tcplistener; 60 import async.net.tcpclient; 61 62 alias LoopSelector = Kqueue; 63 64 class Kqueue : Selector 65 { 66 this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum) 67 { 68 super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, workerThreadNum); 69 70 _eventHandle = kqueue(); 71 register(_listener.fd, EventType.ACCEPT); 72 } 73 74 override bool register(int fd, EventType et) 75 { 76 kevent_t[2] ev = void; 77 short filter; 78 ushort flags; 79 80 if (et == EventType.ACCEPT) 81 { 82 filter = EVFILT_READ; 83 flags = EV_ADD | EV_ENABLE; 84 EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null); 85 86 return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0); 87 } 88 else 89 { 90 filter = EVFILT_READ; 91 flags = EV_ADD | EV_ENABLE | EV_CLEAR; 92 EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null); 93 94 filter = EVFILT_WRITE; 95 flags = EV_ADD | EV_CLEAR; 96 flags |= (et == EventType.READ) ? EV_DISABLE : EV_ENABLE; 97 EV_SET(&(ev[1]), fd, filter, flags, 0, 0, null); 98 99 return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0); 100 } 101 } 102 103 override bool reregister(int fd, EventType et) 104 { 105 if (fd < 0) 106 { 107 return false; 108 } 109 110 return register(fd, et); 111 } 112 113 override bool unregister(int fd) 114 { 115 if (fd < 0) 116 { 117 return false; 118 } 119 120 kevent_t[2] ev = void; 121 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, null); 122 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, null); 123 124 if (fd == _listener.fd) 125 { 126 return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0); 127 } 128 else 129 { 130 return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0); 131 } 132 } 133 134 override protected void handleEvent() 135 { 136 kevent_t[64] events; 137 //auto tspec = timespec(1, 1000 * 10); 138 auto len = kevent(_eventHandle, null, 0, events.ptr, events.length, null);//&tspec); 139 140 foreach (i; 0 .. len) 141 { 142 auto fd = cast(int)events[i].ident; 143 144 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR)) 145 { 146 if (fd == _listener.fd) 147 { 148 debug writeln("Listener event error.", fd); 149 } 150 else 151 { 152 if (events[i].flags & EV_ERROR) 153 { 154 int err; 155 socklen_t errlen = err.sizeof; 156 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 157 removeClient(fd, err); 158 } 159 else 160 { 161 removeClient(fd); 162 } 163 164 debug writeln("Close event: ", fd); 165 } 166 167 continue; 168 } 169 170 if (fd == _listener.fd) 171 { 172 accept(); 173 } 174 else if (events[i].filter == EVFILT_READ) 175 { 176 read(fd); 177 } 178 else if (events[i].filter == EVFILT_WRITE) 179 { 180 write(fd); 181 } 182 } 183 } 184 } 185 186 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow 187 { 188 *kevp = kevent_t(args); 189 }