1 module async.event.kqueue;
2 
3 debug import std.stdio;
4 
5 version (Posix)
6 {
7     import core.sys.darwin.sys.event;
8 }
9 else version (FreeBSD)
10 {
11     import core.sys.freebsd.sys.event;
12 }
13 else version (DragonFlyBSD)
14 {
15     import core.sys.dragonflybsd.sys.event;
16 }
17 
18 version (OSX)
19 {
20     version = KQUEUE;
21 }
22 else version (iOS)
23 {
24     version = KQUEUE;
25 }
26 else version (TVOS)
27 {
28     version = KQUEUE;
29 }
30 else version (WatchOS)
31 {
32     version = KQUEUE;
33 }
34 else version (FreeBSD)
35 {
36     version = KQUEUE;
37 }
38 else version (OpenBSD)
39 {
40     version = KQUEUE;
41 }
42 else version (DragonFlyBSD)
43 {
44     version = KQUEUE;
45 }
46 
47 version (KQUEUE):
48 
49 import core.stdc.errno;
50 import core.sys.posix.signal;
51 import core.sys.posix.netinet.tcp;
52 import core.sys.posix.netinet.in_;
53 import core.sys.posix.time;
54 
55 import std.socket;
56 
57 import async.event.selector;
58 import async.net.tcpstream;
59 import async.net.tcplistener;
60 import async.net.tcpclient;
61 
62 alias LoopSelector = Kqueue;
63 
64 class Kqueue : Selector
65 {
66     this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum)
67     {
68         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, workerThreadNum);
69 
70         _eventHandle = kqueue();
71         register(_listener.fd, EventType.ACCEPT);
72     }
73 
74     override bool register(int fd, EventType et)
75     {
76         kevent_t[2] ev = void;
77         short  filter;
78         ushort flags;
79 
80         if (et == EventType.ACCEPT)
81         {
82             filter = EVFILT_READ;
83             flags  = EV_ADD | EV_ENABLE;
84             EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null);
85 
86             return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0);
87         }
88         else
89         {
90             filter = EVFILT_READ;
91             flags  = EV_ADD | EV_ENABLE | EV_CLEAR;
92             EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null);
93 
94             filter = EVFILT_WRITE;
95             flags  = EV_ADD | EV_CLEAR;
96             flags |= (et == EventType.READ) ? EV_DISABLE : EV_ENABLE;
97             EV_SET(&(ev[1]), fd, filter, flags, 0, 0, null);
98 
99             return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0);
100         }
101     }
102 
103     override bool reregister(int fd, EventType et)
104     {
105         if (fd < 0)
106         {
107             return false;
108         }
109 
110         return register(fd, et);
111     }
112 
113     override bool unregister(int fd)
114     {
115         if (fd < 0)
116         {
117             return false;
118         }
119 
120         kevent_t[2] ev = void;
121         EV_SET(&(ev[0]), fd, EVFILT_READ,  EV_DELETE, 0, 0, null);
122         EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, null);
123 
124         if (fd == _listener.fd)
125         {
126             return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0);
127         }
128         else
129         {
130             return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0);
131         }
132     }
133 
134     override protected void handleEvent()
135     {
136         kevent_t[64] events;
137         //auto tspec = timespec(1, 1000 * 10);
138         auto len = kevent(_eventHandle, null, 0, events.ptr, events.length, null);//&tspec);
139 
140         foreach (i; 0 .. len)
141         {
142             auto fd = cast(int)events[i].ident;
143 
144             if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR))
145             {
146                 if (fd == _listener.fd)
147                 {
148                     debug writeln("Listener event error.", fd);
149                 }
150                 else
151                 {
152                     if (events[i].flags & EV_ERROR)
153                     {
154                         int err;
155                         socklen_t errlen = err.sizeof;
156                         getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
157                         removeClient(fd, err);
158                     }
159                     else
160                     {
161                         removeClient(fd);
162                     }
163 
164                     debug writeln("Close event: ", fd);
165                 }
166 
167                 continue;
168             }
169 
170             if (fd == _listener.fd)
171             {
172                 accept();
173             }
174             else if (events[i].filter == EVFILT_READ)
175             {
176                 read(fd);
177             }
178             else if (events[i].filter == EVFILT_WRITE)
179             {
180                 write(fd);
181             }
182         }
183     }
184 }
185 
186 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow
187 {
188     *kevp = kevent_t(args);
189 }