1 module async.event.kqueue;
2 
3 debug import std.stdio;
4 
5 version (Posix)
6 {
7     import core.sys.darwin.sys.event;
8 }
9 else version (FreeBSD)
10 {
11     import core.sys.freebsd.sys.event;
12 }
13 else version (DragonFlyBSD)
14 {
15     import core.sys.dragonflybsd.sys.event;
16 }
17 
18 version (OSX)
19 {
20     version = KQUEUE;
21 }
22 else version (iOS)
23 {
24     version = KQUEUE;
25 }
26 else version (TVOS)
27 {
28     version = KQUEUE;
29 }
30 else version (WatchOS)
31 {
32     version = KQUEUE;
33 }
34 else version (FreeBSD)
35 {
36     version = KQUEUE;
37 }
38 else version (OpenBSD)
39 {
40     version = KQUEUE;
41 }
42 else version (DragonFlyBSD)
43 {
44     version = KQUEUE;
45 }
46 
47 version (KQUEUE):
48 
49 import core.stdc.errno;
50 
51 import core.sys.posix.signal;
52 import core.sys.posix.netinet.tcp;
53 import core.sys.posix.netinet.in_;
54 import core.sys.posix.time;
55 
56 import std.socket;
57 
58 import async.event.selector;
59 import async.net.tcpstream;
60 import async.net.tcplistener;
61 import async.net.tcpclient;
62 import async.codec;
63 
64 alias LoopSelector = Kqueue;
65 
66 class Kqueue : Selector
67 {
68     this(TcpListener listener,
69         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
70         OnSocketError onSocketError, Codec codec, const int workerThreadNum)
71     {
72         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum);
73 
74         _eventHandle = kqueue();
75         register(_listener.fd, EventType.ACCEPT);
76     }
77 
78     override bool register(const int fd, EventType et)
79     {
80         kevent_t[2] ev = void;
81         short  filter;
82         ushort flags;
83 
84         if (et == EventType.ACCEPT)
85         {
86             filter = EVFILT_READ;
87             flags  = EV_ADD | EV_ENABLE;
88             EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null);
89 
90             return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0);
91         }
92         else
93         {
94             filter = EVFILT_READ;
95             flags  = EV_ADD | EV_ENABLE | EV_CLEAR;
96             EV_SET(&(ev[0]), fd, filter, flags, 0, 0, null);
97 
98             filter = EVFILT_WRITE;
99             flags  = EV_ADD | EV_CLEAR;
100             flags |= (et == EventType.READ) ? EV_DISABLE : EV_ENABLE;
101             EV_SET(&(ev[1]), fd, filter, flags, 0, 0, null);
102 
103             return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0);
104         }
105     }
106 
107     override bool reregister(const int fd, EventType et)
108     {
109         if (fd < 0)
110         {
111             return false;
112         }
113 
114         return register(fd, et);
115     }
116 
117     override bool unregister(const int fd)
118     {
119         if (fd < 0)
120         {
121             return false;
122         }
123 
124         kevent_t[2] ev = void;
125         EV_SET(&(ev[0]), fd, EVFILT_READ,  EV_DELETE, 0, 0, null);
126         EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, null);
127 
128         if (fd == _listener.fd)
129         {
130             return (kevent(_eventHandle, &(ev[0]), 1, null, 0, null) >= 0);
131         }
132         else
133         {
134             return (kevent(_eventHandle, &(ev[0]), 2, null, 0, null) >= 0);
135         }
136     }
137 
138     override protected void handleEvent()
139     {
140         kevent_t[64] events;
141         //auto tspec = timespec(1, 1000 * 10);
142         auto len = kevent(_eventHandle, null, 0, events.ptr, events.length, null);//&tspec);
143 
144         foreach (i; 0 .. len)
145         {
146             auto fd = cast(int)events[i].ident;
147 
148             if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR))
149             {
150                 if (fd == _listener.fd)
151                 {
152                     debug writeln("Listener event error.", fd);
153                 }
154                 else
155                 {
156                     if (events[i].flags & EV_ERROR)
157                     {
158                         int err;
159                         socklen_t errlen = err.sizeof;
160                         getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
161                         removeClient(fd, err);
162                     }
163                     else
164                     {
165                         removeClient(fd);
166                     }
167 
168                     debug writeln("Close event: ", fd);
169                 }
170 
171                 continue;
172             }
173 
174             if (fd == _listener.fd)
175             {
176                 accept();
177             }
178             else if (events[i].filter == EVFILT_READ)
179             {
180                 read(fd);
181             }
182             else if (events[i].filter == EVFILT_WRITE)
183             {
184                 write(fd);
185             }
186         }
187     }
188 }
189 
190 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow
191 {
192     *kevp = kevent_t(args);
193 }