1 module async.event.kqueue; 2 3 debug import std.stdio; 4 5 version (Posix) 6 { 7 import core.sys.darwin.sys.event; 8 } 9 else version (FreeBSD) 10 { 11 import core.sys.freebsd.sys.event; 12 } 13 else version (DragonFlyBSD) 14 { 15 import core.sys.dragonflybsd.sys.event; 16 } 17 18 version (OSX) 19 { 20 version = KQUEUE; 21 } 22 else version (iOS) 23 { 24 version = KQUEUE; 25 } 26 else version (TVOS) 27 { 28 version = KQUEUE; 29 } 30 else version (WatchOS) 31 { 32 version = KQUEUE; 33 } 34 else version (FreeBSD) 35 { 36 version = KQUEUE; 37 } 38 else version (OpenBSD) 39 { 40 version = KQUEUE; 41 } 42 else version (DragonFlyBSD) 43 { 44 version = KQUEUE; 45 } 46 47 version (KQUEUE): 48 49 import core.stdc.errno; 50 51 import core.sys.posix.signal; 52 import core.sys.posix.netinet.tcp; 53 import core.sys.posix.netinet.in_; 54 import core.sys.posix.time; 55 56 import std.socket; 57 58 import async.event.selector; 59 import async.net.tcpstream; 60 import async.net.tcplistener; 61 import async.net.tcpclient; 62 import async.codec; 63 64 alias LoopSelector = Kqueue; 65 66 class Kqueue : Selector 67 { 68 this(TcpListener listener, 69 OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, 70 OnSocketError onSocketError, Codec codec, const int workerThreadNum) 71 { 72 super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum); 73 74 _eventHandle = kqueue(); 75 register(_listener.fd, EventType.ACCEPT); 76 } 77 78 override bool register(const int fd, EventType et) 79 { 80 kevent_t[2] ev = void; 81 short filter; 82 ushort flags; 83 84 if (et == EventType.ACCEPT) 85 { 86 filter = EVFILT_READ; 87 flags = EV_ADD | EV_ENABLE; 88 EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null); 89 90 return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0); 91 } 92 else 93 { 94 filter = EVFILT_READ; 95 flags = EV_ADD | EV_ENABLE | EV_CLEAR; 96 EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null); 97 98 filter = EVFILT_WRITE; 99 flags = EV_ADD | EV_CLEAR; 100 flags |= (et == EventType.READ) ? EV_DISABLE : EV_ENABLE; 101 EV_SET(&(ev[1]), fd, filter, flags, 0, 0, null); 102 103 return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0); 104 } 105 } 106 107 override bool reregister(const int fd, EventType et) 108 { 109 if (fd < 0) 110 { 111 return false; 112 } 113 114 return register(fd, et); 115 } 116 117 override bool unregister(const int fd) 118 { 119 if (fd < 0) 120 { 121 return false; 122 } 123 124 kevent_t[2] ev = void; 125 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, null); 126 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, null); 127 128 if (fd == _listener.fd) 129 { 130 return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0); 131 } 132 else 133 { 134 return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0); 135 } 136 } 137 138 override protected void handleEvent() 139 { 140 kevent_t[64] events; 141 //auto tspec = timespec(1, 1000 * 10); 142 auto len = kevent(_eventHandle, null, 0, events.ptr, events.length, null);//&tspec); 143 144 foreach (i; 0 .. len) 145 { 146 auto fd = cast(int)events[i].ident; 147 148 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR)) 149 { 150 if (fd == _listener.fd) 151 { 152 debug writeln("Listener event error.", fd); 153 } 154 else 155 { 156 if (events[i].flags & EV_ERROR) 157 { 158 int err; 159 socklen_t errlen = err.sizeof; 160 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 161 removeClient(fd, err); 162 } 163 else 164 { 165 removeClient(fd); 166 } 167 168 debug writeln("Close event: ", fd); 169 } 170 171 continue; 172 } 173 174 if (fd == _listener.fd) 175 { 176 accept(); 177 } 178 else if (events[i].filter == EVFILT_READ) 179 { 180 read(fd); 181 } 182 else if (events[i].filter == EVFILT_WRITE) 183 { 184 write(fd); 185 } 186 } 187 } 188 } 189 190 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow 191 { 192 *kevp = kevent_t(args); 193 }