1 module async.event.epoll; 2 3 debug import std.stdio; 4 5 version (linux): 6 7 import core.stdc.errno; 8 9 import core.sys.linux.epoll; 10 import core.sys.posix.signal; 11 import core.sys.posix.netinet.tcp; 12 import core.sys.posix.netinet.in_; 13 import core.sys.posix.time; 14 15 import std.socket; 16 17 import async.event.selector; 18 import async.net.tcpstream; 19 import async.net.tcplistener; 20 import async.net.tcpclient; 21 import async.codec; 22 23 alias LoopSelector = Epoll; 24 25 class Epoll : Selector 26 { 27 this(TcpListener listener, 28 OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, 29 OnSocketError onSocketError, Codec codec, const int workerThreadNum) 30 { 31 super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum); 32 33 _eventHandle = epoll_create1(0); 34 register(_listener.fd, EventType.ACCEPT); 35 } 36 37 override bool register(const int fd, EventType et) 38 { 39 if (fd < 0) 40 { 41 return false; 42 } 43 44 epoll_event ev; 45 ev.events = EPOLLHUP | EPOLLERR; 46 ev.data.fd = fd; 47 48 if (et != EventType.ACCEPT) 49 { 50 ev.events |= EPOLLET; 51 } 52 if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE) 53 { 54 ev.events |= EPOLLIN; 55 } 56 if ((et == EventType.WRITE) || (et == EventType.READWRITE)) 57 { 58 ev.events |= EPOLLOUT; 59 } 60 61 if (epoll_ctl(_eventHandle, EPOLL_CTL_ADD, fd, &ev) != 0) 62 { 63 if (errno != EEXIST) 64 { 65 return false; 66 } 67 } 68 69 return true; 70 } 71 72 override bool reregister(const int fd, EventType et) 73 { 74 if (fd < 0) 75 { 76 return false; 77 } 78 79 epoll_event ev; 80 ev.events = EPOLLHUP | EPOLLERR; 81 ev.data.fd = fd; 82 83 if (et != EventType.ACCEPT) 84 { 85 ev.events |= EPOLLET; 86 } 87 if (et == EventType.ACCEPT || et == EventType.READ || et == EventType.READWRITE) 88 { 89 ev.events |= EPOLLIN; 90 } 91 if ((et == EventType.WRITE) || (et == EventType.READWRITE)) 92 { 93 ev.events |= EPOLLOUT; 94 } 95 96 return (epoll_ctl(_eventHandle, EPOLL_CTL_MOD, fd, &ev) == 0); 97 } 98 99 override bool unregister(const int fd) 100 { 101 if (fd < 0) 102 { 103 return false; 104 } 105 106 return (epoll_ctl(_eventHandle, EPOLL_CTL_DEL, fd, null) == 0); 107 } 108 109 override protected void handleEvent() 110 { 111 epoll_event[64] events; 112 const int len = epoll_wait(_eventHandle, events.ptr, events.length, -1); 113 114 foreach (i; 0 .. len) 115 { 116 auto fd = events[i].data.fd; 117 118 if ((events[i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0) 119 { 120 if (fd == _listener.fd) 121 { 122 debug writeln("Listener event error.", fd); 123 } 124 else 125 { 126 int err; 127 socklen_t errlen = err.sizeof; 128 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen); 129 removeClient(fd, err); 130 131 debug writeln("Close event: ", fd); 132 } 133 134 continue; 135 } 136 137 if (fd == _listener.fd) 138 { 139 accept(); 140 } 141 else if (events[i].events & EPOLLIN) 142 { 143 read(fd); 144 } 145 else if (events[i].events & EPOLLOUT) 146 { 147 write(fd); 148 } 149 } 150 } 151 }