1 module async.event.epoll;
2 
3 debug import std.stdio;
4 
5 version (linux):
6 
7 import core.stdc.errno;
8 
9 import core.sys.linux.epoll;
10 import core.sys.posix.signal;
11 import core.sys.posix.netinet.tcp;
12 import core.sys.posix.netinet.in_;
13 import core.sys.posix.time;
14 
15 import std.socket;
16 
17 import async.event.selector;
18 import async.net.tcpstream;
19 import async.net.tcplistener;
20 import async.net.tcpclient;
21 import async.codec;
22 
23 alias LoopSelector = Epoll;
24 
25 class Epoll : Selector
26 {
27     this(TcpListener listener,
28         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
29         OnSocketError onSocketError, Codec codec, const int workerThreadNum)
30     {
31         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum);
32 
33         _eventHandle = epoll_create1(0);
34         register(_listener.fd, EventType.ACCEPT);
35     }
36 
37     override bool register(const int fd, EventType et)
38     {
39         if (fd < 0)
40         {
41             return false;
42         }
43 
44         epoll_event ev;
45         ev.events  = EPOLLHUP | EPOLLERR;
46         ev.data.fd = fd;
47 
48         if (et != EventType.ACCEPT)
49         {
50             ev.events |= EPOLLET;
51         }
52         if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE)
53         {
54             ev.events |= EPOLLIN;
55         }
56         if ((et == EventType.WRITE) || (et == EventType.READWRITE))
57         {
58             ev.events |= EPOLLOUT;
59         }
60 
61         if (epoll_ctl(_eventHandle, EPOLL_CTL_ADD, fd, &ev) != 0)
62         {
63             if (errno != EEXIST)
64             {
65                 return false;
66             }
67         }
68 
69         return true;
70     }
71 
72     override bool reregister(const int fd, EventType et)
73     {
74         if (fd < 0)
75         {
76             return false;
77         }
78 
79         epoll_event ev;
80         ev.events  = EPOLLHUP | EPOLLERR;
81         ev.data.fd = fd;
82 
83         if (et != EventType.ACCEPT)
84         {
85             ev.events |= EPOLLET;
86         }
87         if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE)
88         {
89             ev.events |= EPOLLIN;
90         }
91         if ((et == EventType.WRITE) || (et == EventType.READWRITE))
92         {
93             ev.events |= EPOLLOUT;
94         }
95 
96         return (epoll_ctl(_eventHandle, EPOLL_CTL_MOD, fd, &ev) == 0);
97     }
98 
99     override bool unregister(const int fd)
100     {
101         if (fd < 0)
102         {
103             return false;
104         }
105 
106         return (epoll_ctl(_eventHandle, EPOLL_CTL_DEL, fd, null) == 0);
107     }
108 
109     override protected void handleEvent()
110     {
111         epoll_event[64] events;
112         const int len = epoll_wait(_eventHandle, events.ptr, events.length, -1);
113 
114         foreach (i; 0 .. len)
115         {
116             auto fd = events[i].data.fd;
117 
118             if ((events[i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0)
119             {
120                 if (fd == _listener.fd)
121                 {
122                     debug writeln("Listener event error.", fd);
123                 }
124                 else
125                 {
126                     int err;
127                     socklen_t errlen = err.sizeof;
128                     getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
129                     removeClient(fd, err);
130 
131                     debug writeln("Close event: ", fd);
132                 }
133 
134                 continue;
135             }
136 
137             if (fd == _listener.fd)
138             {
139                 accept();
140             }
141             else if (events[i].events & EPOLLIN)
142             {
143                 read(fd);
144             }
145             else if (events[i].events & EPOLLOUT)
146             {
147                 write(fd);
148             }
149         }
150     }
151 }