1 module async.event.selector; 2 3 import core.sync.mutex; 4 import core.thread; 5 6 import std.socket; 7 import std.parallelism : totalCPUs; 8 9 import async.net.tcplistener; 10 import async.net.tcpclient; 11 import async.container.map; 12 import async.thread; 13 import async.codec; 14 15 alias OnConnected = void function(TcpClient) nothrow @trusted; 16 alias OnDisConnected = void function(const int, string) nothrow @trusted; 17 alias OnReceive = void function(TcpClient, const scope ubyte[]) nothrow @trusted; 18 alias OnSendCompleted = void function(const int, string, const scope ubyte[], const size_t) nothrow @trusted; 19 alias OnSocketError = void function(const int, string, string) nothrow @trusted; 20 21 enum EventType 22 { 23 ACCEPT, READ, WRITE, READWRITE 24 } 25 26 abstract class Selector 27 { 28 this(TcpListener listener, 29 OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, 30 OnSocketError onSocketError, Codec codec, const int workerThreadNum) 31 { 32 this._onConnected = onConnected; 33 this._onDisConnected = onDisConnected; 34 this.onReceive = onReceive; 35 this.onSendCompleted = onSendCompleted; 36 this._onSocketError = onSocketError; 37 this._codec = codec; 38 39 _clients = new Map!(int, TcpClient); 40 _listener = listener; 41 _workerThreadNum = (workerThreadNum <= 0) ? totalCPUs * 2 + 2 : workerThreadNum; 42 43 version (Windows) { } else _acceptPool = new ThreadPool(1); 44 workerPool = new ThreadPool(_workerThreadNum); 45 } 46 47 ~this() 48 { 49 dispose(); 50 } 51 52 bool register (const int fd, EventType et); 53 bool reregister(const int fd, EventType et); 54 bool unregister(const int fd); 55 56 void startLoop() 57 { 58 _runing = true; 59 60 version (Windows) 61 { 62 foreach (i; 0 .. _workerThreadNum) 63 workerPool.run!handleEvent(this); 64 } 65 66 while (_runing) 67 { 68 version (Windows) 69 { 70 beginAccept(this); 71 } 72 else 73 { 74 handleEvent(); 75 } 76 } 77 } 78 79 void stop() 80 { 81 _runing = false; 82 } 83 84 void dispose() 85 { 86 if (_isDisposed) 87 { 88 return; 89 } 90 91 _isDisposed = true; 92 93 _clients.lock(); 94 foreach (ref c; _clients) 95 { 96 unregister(c.fd); 97 98 if (c.isAlive) 99 { 100 c.close(); 101 } 102 } 103 _clients.unlock(); 104 105 _clients.clear(); 106 107 unregister(_listener.fd); 108 _listener.close(); 109 110 version (Windows) { } else 111 { 112 static import core.sys.posix.unistd; 113 core.sys.posix.unistd.close(_eventHandle); 114 } 115 } 116 117 void removeClient(const int fd, const int err = 0) 118 { 119 unregister(fd); 120 121 TcpClient client = _clients[fd]; 122 123 if (client !is null) 124 { 125 if ((err > 0) && (_onSocketError !is null)) 126 { 127 _onSocketError(fd, client._remoteAddress, formatSocketError(err)); 128 } 129 130 if (_onDisConnected !is null) 131 { 132 _onDisConnected(fd, client._remoteAddress); 133 } 134 135 client.close(); 136 } 137 138 _clients.remove(fd); 139 } 140 141 version (Windows) 142 { 143 void iocp_send(const int fd, const scope ubyte[] data); 144 void iocp_receive(const int fd); 145 } 146 147 @property Codec codec() 148 { 149 return this._codec; 150 } 151 152 protected: 153 154 version (Windows) { } else void accept() 155 { 156 _acceptPool.run!beginAccept(this); 157 } 158 159 static void beginAccept(Selector selector) 160 { 161 Socket socket; 162 163 try 164 { 165 socket = selector._listener.accept(); 166 } 167 catch (Exception e) 168 { 169 return; 170 } 171 172 TcpClient client = new TcpClient(selector, socket); 173 174 try 175 { 176 client.setKeepAlive(600, 10); 177 } 178 catch (Exception e) 179 { 180 } 181 182 selector._clients[client.fd] = client; 183 184 if (selector._onConnected !is null) 185 { 186 selector._onConnected(client); 187 } 188 189 selector.register(client.fd, EventType.READ); 190 191 version (Windows) selector.iocp_receive(client.fd); 192 } 193 194 version (Windows) 195 { 196 void read(const int fd, const scope ubyte[] data) 197 { 198 TcpClient client = _clients[fd]; 199 200 if ((client !is null) && (onReceive !is null)) 201 { 202 onReceive(client, data); 203 } 204 } 205 } 206 else 207 { 208 void read(const int fd) 209 { 210 TcpClient client = _clients[fd]; 211 212 if (client !is null) 213 { 214 client.weakup(EventType.READ); 215 } 216 } 217 } 218 219 version (Windows) { } else void write(const int fd) 220 { 221 TcpClient client = _clients[fd]; 222 223 if (client !is null) 224 { 225 client.weakup(EventType.WRITE); 226 } 227 } 228 229 bool _isDisposed = false; 230 TcpListener _listener; 231 bool _runing; 232 int _workerThreadNum; 233 234 version (Windows) 235 { 236 import core.sys.windows.basetsd : HANDLE; 237 HANDLE _eventHandle; 238 239 static void handleEvent(Selector selector) 240 { 241 import async.event.iocp : Iocp; 242 Iocp.handleEvent(selector); 243 } 244 } 245 else 246 { 247 int _eventHandle; 248 249 void handleEvent(); 250 } 251 252 private: 253 254 ThreadPool _acceptPool; 255 Map!(int, TcpClient) _clients; 256 257 OnConnected _onConnected; 258 OnDisConnected _onDisConnected; 259 OnSocketError _onSocketError; 260 261 Codec _codec; 262 263 public: 264 265 ThreadPool workerPool; 266 267 OnReceive onReceive; 268 OnSendCompleted onSendCompleted; 269 }