1 module async.event.selector; 2 3 import core.sync.mutex; 4 import core.thread; 5 version (Windows) 6 { 7 } 8 else 9 { 10 import core.sys.posix.unistd; 11 } 12 13 import std.socket; 14 import std.parallelism; 15 16 import async.net.tcplistener; 17 import async.net.tcpclient; 18 import async.container.map; 19 import async.thread; 20 21 alias OnConnected = void function(TcpClient); nothrow @trusted 22 alias OnDisConnected = void function(int, string); nothrow @trusted 23 alias OnReceive = void function(TcpClient, in ubyte[]); nothrow @trusted 24 alias OnSendCompleted = void function(int, string, in ubyte[], size_t); nothrow @trusted 25 alias OnSocketError = void function(int, string, string); nothrow @trusted 26 27 enum EventType 28 { 29 ACCEPT, READ, WRITE, READWRITE 30 } 31 32 abstract class Selector 33 { 34 this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum) 35 { 36 this._onConnected = onConnected; 37 this._onDisConnected = onDisConnected; 38 this.onReceive = onReceive; 39 this.onSendCompleted = onSendCompleted; 40 this._onSocketError = onSocketError; 41 42 _clients = new Map!(int, TcpClient); 43 _listener = listener; 44 45 if (workerThreadNum <= 0) 46 { 47 workerThreadNum = totalCPUs; 48 } 49 50 _acceptPool = new ThreadPool(1); 51 workerPool = new ThreadPool(workerThreadNum); 52 } 53 54 ~this() 55 { 56 dispose(); 57 } 58 59 bool register (int fd, EventType et); 60 bool reregister(int fd, EventType et); 61 bool unregister(int fd); 62 63 void startLoop() 64 { 65 _runing = true; 66 67 while (_runing) 68 { 69 handleEvent(); 70 } 71 } 72 73 void stop() 74 { 75 _runing = false; 76 } 77 78 void dispose() 79 { 80 if (_isDisposed) 81 { 82 return; 83 } 84 85 _isDisposed = true; 86 87 _clients.lock(); 88 foreach (ref c; _clients) 89 { 90 unregister(c.fd); 91 92 if (c.isAlive) 93 { 94 c.close(); 95 } 96 } 97 _clients.unlock(); 98 99 _clients.clear(); 100 101 unregister(_listener.fd); 102 _listener.close(); 103 104 version (Windows) 105 { 106 } 107 else 108 { 109 core.sys.posix.unistd.close(_eventHandle); 110 } 111 } 112 113 void removeClient(int fd, int err = 0) 114 { 115 unregister(fd); 116 117 TcpClient client = _clients[fd]; 118 119 if (client !is null) 120 { 121 if ((err > 0) && (_onSocketError !is null)) 122 { 123 _onSocketError(fd, client._remoteAddress, formatSocketError(err)); 124 } 125 126 if (_onDisConnected !is null) 127 { 128 _onDisConnected(fd, client._remoteAddress); 129 } 130 131 client.close(); 132 } 133 134 _clients.remove(fd); 135 } 136 137 protected: 138 139 void accept() 140 { 141 _acceptPool.run!beginAccept(this); 142 } 143 144 static void beginAccept(Selector selector) 145 { 146 Socket socket; 147 148 try 149 { 150 socket = selector._listener.accept(); 151 } 152 catch (Exception e) 153 { 154 return; 155 } 156 157 TcpClient client = new TcpClient(selector, socket); 158 client.setKeepAlive(600, 10); 159 selector._clients[client.fd] = client; 160 161 if (selector._onConnected !is null) 162 { 163 selector._onConnected(client); 164 } 165 166 selector.register(client.fd, EventType.READ); 167 } 168 169 void read(int fd) 170 { 171 TcpClient client = _clients[fd]; 172 173 if (client !is null) 174 { 175 client.weakup(EventType.READ); 176 } 177 } 178 179 void write(int fd) 180 { 181 TcpClient client = _clients[fd]; 182 183 if (client !is null) 184 { 185 client.weakup(EventType.WRITE); 186 } 187 } 188 189 protected: 190 191 bool _isDisposed = false; 192 TcpListener _listener; 193 int _eventHandle; 194 195 void handleEvent(); 196 197 private: 198 199 ThreadPool _acceptPool; 200 bool _runing; 201 Map!(int, TcpClient) _clients; 202 203 OnConnected _onConnected; 204 OnDisConnected _onDisConnected; 205 OnSocketError _onSocketError; 206 207 public: 208 209 ThreadPool workerPool; 210 211 OnReceive onReceive; 212 OnSendCompleted onSendCompleted; 213 }