1 module async.event.selector; 2 3 import core.sync.mutex; 4 import core.thread; 5 6 import std.socket; 7 import std.parallelism : totalCPUs; 8 9 import async.net.tcplistener; 10 import async.net.tcpclient; 11 import async.container.map; 12 import async.thread; 13 import async.codec; 14 15 alias OnConnected = void function(TcpClient) nothrow @trusted; 16 alias OnDisConnected = void function(const int, string) nothrow @trusted; 17 alias OnReceive = void function(TcpClient, const scope ubyte[]) nothrow @trusted; 18 alias OnSendCompleted = void function(const int, string, const scope ubyte[], const size_t) nothrow @trusted; 19 alias OnSocketError = void function(const int, string, string) nothrow @trusted; 20 21 enum EventType 22 { 23 ACCEPT, READ, WRITE, READWRITE 24 } 25 26 abstract class Selector 27 { 28 this(TcpListener listener, 29 OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, 30 OnSocketError onSocketError, Codec codec, const int workerThreadNum) 31 { 32 this._onConnected = onConnected; 33 this._onDisConnected = onDisConnected; 34 this.onReceive = onReceive; 35 this.onSendCompleted = onSendCompleted; 36 this._onSocketError = onSocketError; 37 this._codec = codec; 38 39 _clients = new Map!(int, TcpClient); 40 _listener = listener; 41 _workerThreadNum = (workerThreadNum <= 0) ? totalCPUs * 2 + 2 : workerThreadNum; 42 43 version (Windows) { } else _acceptPool = new ThreadPool(1); 44 workerPool = new ThreadPool(_workerThreadNum); 45 } 46 47 ~this() 48 { 49 dispose(); 50 } 51 52 bool register (const int fd, EventType et); 53 bool reregister(const int fd, EventType et); 54 bool unregister(const int fd); 55 56 void initialize() 57 { 58 version (Windows) 59 { 60 foreach (i; 0 .. _workerThreadNum) 61 workerPool.run!handleEvent(this); 62 } 63 } 64 65 void runLoop() 66 { 67 version (Windows) 68 { 69 beginAccept(this); 70 } 71 else 72 { 73 handleEvent(); 74 } 75 } 76 77 void startLoop() 78 { 79 _runing = true; 80 81 version (Windows) 82 { 83 foreach (i; 0 .. _workerThreadNum) 84 workerPool.run!handleEvent(this); 85 } 86 87 while (_runing) 88 { 89 version (Windows) 90 { 91 beginAccept(this); 92 } 93 else 94 { 95 handleEvent(); 96 } 97 } 98 } 99 100 void stop() 101 { 102 _runing = false; 103 } 104 105 void dispose() 106 { 107 if (_isDisposed) 108 { 109 return; 110 } 111 112 _isDisposed = true; 113 114 _clients.lock(); 115 foreach (ref c; _clients) 116 { 117 unregister(c.fd); 118 119 if (c.isAlive) 120 { 121 c.close(); 122 } 123 } 124 _clients.unlock(); 125 126 _clients.clear(); 127 128 unregister(_listener.fd); 129 _listener.close(); 130 131 version (Windows) { } else 132 { 133 static import core.sys.posix.unistd; 134 core.sys.posix.unistd.close(_eventHandle); 135 } 136 } 137 138 void removeClient(const int fd, const int err = 0) 139 { 140 unregister(fd); 141 142 TcpClient client = _clients[fd]; 143 144 if (client !is null) 145 { 146 if ((err > 0) && (_onSocketError !is null)) 147 { 148 _onSocketError(fd, client._remoteAddress, formatSocketError(err)); 149 } 150 151 if (_onDisConnected !is null) 152 { 153 _onDisConnected(fd, client._remoteAddress); 154 } 155 156 client.close(); 157 } 158 159 _clients.remove(fd); 160 } 161 162 version (Windows) 163 { 164 void iocp_send(const int fd, const scope ubyte[] data); 165 void iocp_receive(const int fd); 166 } 167 168 @property Codec codec() 169 { 170 return this._codec; 171 } 172 173 protected: 174 175 version (Windows) { } else void accept() 176 { 177 _acceptPool.run!beginAccept(this); 178 } 179 180 static void beginAccept(Selector selector) 181 { 182 Socket socket; 183 184 try 185 { 186 socket = selector._listener.accept(); 187 } 188 catch (Exception e) 189 { 190 return; 191 } 192 193 TcpClient client = new TcpClient(selector, socket); 194 195 try 196 { 197 client.setKeepAlive(600, 10); 198 } 199 catch (Exception e) 200 { 201 } 202 203 selector._clients[client.fd] = client; 204 205 if (selector._onConnected !is null) 206 { 207 selector._onConnected(client); 208 } 209 210 selector.register(client.fd, EventType.READ); 211 212 version (Windows) selector.iocp_receive(client.fd); 213 } 214 215 version (Windows) 216 { 217 void read(const int fd, const scope ubyte[] data) 218 { 219 TcpClient client = _clients[fd]; 220 221 if ((client !is null) && (onReceive !is null)) 222 { 223 onReceive(client, data); 224 } 225 } 226 } 227 else 228 { 229 void read(const int fd) 230 { 231 TcpClient client = _clients[fd]; 232 233 if (client !is null) 234 { 235 client.weakup(EventType.READ); 236 } 237 } 238 } 239 240 version (Windows) { } else void write(const int fd) 241 { 242 TcpClient client = _clients[fd]; 243 244 if (client !is null) 245 { 246 client.weakup(EventType.WRITE); 247 } 248 } 249 250 bool _isDisposed = false; 251 TcpListener _listener; 252 bool _runing; 253 int _workerThreadNum; 254 255 version (Windows) 256 { 257 import core.sys.windows.basetsd : HANDLE; 258 HANDLE _eventHandle; 259 260 static void handleEvent(Selector selector) 261 { 262 import async.event.iocp : Iocp; 263 Iocp.handleEvent(selector); 264 } 265 } 266 else 267 { 268 int _eventHandle; 269 270 void handleEvent(); 271 } 272 273 private: 274 275 ThreadPool _acceptPool; 276 Map!(int, TcpClient) _clients; 277 278 OnConnected _onConnected; 279 OnDisConnected _onDisConnected; 280 OnSocketError _onSocketError; 281 282 Codec _codec; 283 284 public: 285 286 ThreadPool workerPool; 287 288 OnReceive onReceive; 289 OnSendCompleted onSendCompleted; 290 }