1 module async.event.epoll;
2 
3 debug import std.stdio;
4 
5 version (linux):
6 
7 import core.sys.linux.epoll;
8 import core.stdc.errno;
9 import core.sys.posix.signal;
10 import core.sys.posix.netinet.tcp;
11 import core.sys.posix.netinet.in_;
12 import core.sys.posix.time;
13 
14 import std.socket;
15 
16 import async.event.selector;
17 import async.net.tcpstream;
18 import async.net.tcplistener;
19 import async.net.tcpclient;
20 
21 alias LoopSelector = Epoll;
22 
23 class Epoll : Selector
24 {
25     this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum)
26     {
27         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, workerThreadNum);
28 
29         _eventHandle = epoll_create1(0);
30         register(_listener.fd, EventType.ACCEPT);
31     }
32 
33     override bool register(int fd, EventType et)
34     {
35         if (fd < 0)
36         {
37             return false;
38         }
39 
40         epoll_event ev;
41         ev.events  = EPOLLHUP | EPOLLERR;
42         ev.data.fd = fd;
43 
44         if (et != EventType.ACCEPT)
45         {
46             ev.events |= EPOLLET;
47         }
48         if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE)
49         {
50             ev.events |= EPOLLIN;
51         }
52         if ((et == EventType.WRITE) || (et == EventType.READWRITE))
53         {
54             ev.events |= EPOLLOUT;
55         }
56 
57         if (epoll_ctl(_eventHandle, EPOLL_CTL_ADD, fd, &ev) != 0)
58         {
59             if (errno != EEXIST)
60             {
61                 return false;
62             }
63         }
64 
65         return true;
66     }
67 
68     override bool reregister(int fd, EventType et)
69     {
70         if (fd < 0)
71         {
72             return false;
73         }
74 
75         epoll_event ev;
76         ev.events  = EPOLLHUP | EPOLLERR;
77         ev.data.fd = fd;
78 
79         if (et != EventType.ACCEPT)
80         {
81             ev.events |= EPOLLET;
82         }
83         if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE)
84         {
85             ev.events |= EPOLLIN;
86         }
87         if ((et == EventType.WRITE) || (et == EventType.READWRITE))
88         {
89             ev.events |= EPOLLOUT;
90         }
91 
92         return (epoll_ctl(_eventHandle, EPOLL_CTL_MOD, fd, &ev) == 0);
93     }
94 
95     override bool unregister(int fd)
96     {
97         if (fd < 0)
98         {
99             return false;
100         }
101 
102         return (epoll_ctl(_eventHandle, EPOLL_CTL_DEL, fd, null) == 0);
103     }
104 
105     override protected void handleEvent()
106     {
107         epoll_event[64] events;
108         const int len = epoll_wait(_eventHandle, events.ptr, events.length, -1);
109 
110         foreach (i; 0 .. len)
111         {
112             auto fd = events[i].data.fd;
113 
114             if ((events[i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0)
115             {
116                 if (fd == _listener.fd)
117                 {
118                     debug writeln("Listener event error.", fd);
119                 }
120                 else
121                 {
122                     int err;
123                     socklen_t errlen = err.sizeof;
124                     getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
125                     removeClient(fd, err);
126 
127                     debug writeln("Close event: ", fd);
128                 }
129 
130                 continue;
131             }
132 
133             if (fd == _listener.fd)
134             {
135                 accept();
136             }
137             else if (events[i].events & EPOLLIN)
138             {
139                 read(fd);
140             }
141             else if (events[i].events & EPOLLOUT)
142             {
143                 write(fd);
144             }
145         }
146     }
147 }