1 module async.event.selector;
2 
3 import core.sync.mutex;
4 import core.thread;
5 
6 import std.socket;
7 import std.parallelism : totalCPUs;
8 
9 import async.net.tcplistener;
10 import async.net.tcpclient;
11 import async.container.map;
12 import async.thread;
13 import async.codec;
14 
15 alias OnConnected     = void function(TcpClient)                                            nothrow @trusted;
16 alias OnDisConnected  = void function(const int, string)                                    nothrow @trusted;
17 alias OnReceive       = void function(TcpClient, const scope ubyte[])                       nothrow @trusted;
18 alias OnSendCompleted = void function(const int, string, const scope ubyte[], const size_t) nothrow @trusted;
19 alias OnSocketError   = void function(const int, string, string)                            nothrow @trusted;
20 
21 enum EventType
22 {
23     ACCEPT, READ, WRITE, READWRITE
24 }
25 
26 abstract class Selector
27 {
28     this(TcpListener listener,
29         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
30         OnSocketError onSocketError, Codec codec, const int workerThreadNum)
31     {
32         this._onConnected    = onConnected;
33         this._onDisConnected = onDisConnected;
34         this.onReceive       = onReceive;
35         this.onSendCompleted = onSendCompleted;
36         this._onSocketError  = onSocketError;
37         this._codec          = codec;
38 
39         _clients  = new Map!(int, TcpClient);
40         _listener = listener;
41         _workerThreadNum = (workerThreadNum <= 0) ? totalCPUs * 2 + 2 : workerThreadNum;
42 
43         version (Windows) { } else _acceptPool = new ThreadPool(1);
44         workerPool = new ThreadPool(_workerThreadNum);
45     }
46 
47     ~this()
48     {
49         dispose();
50     }
51 
52     bool register  (const int fd, EventType et);
53     bool reregister(const int fd, EventType et);
54     bool unregister(const int fd);
55 
56     void initialize()
57     {
58         version (Windows)
59         {
60             foreach (i; 0 .. _workerThreadNum)
61                 workerPool.run!handleEvent(this);
62         }
63     }
64     
65     void runLoop()
66     {
67         version (Windows)
68         {
69             beginAccept(this);
70         }
71         else
72         {
73             handleEvent();
74         }
75     }
76     
77     void startLoop()
78     {
79         _runing = true;
80 
81         version (Windows)
82         {
83             foreach (i; 0 .. _workerThreadNum)
84                 workerPool.run!handleEvent(this);
85         }
86 
87         while (_runing)
88         {
89             version (Windows)
90             {
91                 beginAccept(this);
92             }
93             else
94             {
95                 handleEvent();
96             }
97         }
98     }
99 
100     void stop()
101     {
102         _runing = false;
103     }
104 
105     void dispose()
106     {
107         if (_isDisposed)
108         {
109             return;
110         }
111 
112         _isDisposed = true;
113 
114         _clients.lock();
115         foreach (ref c; _clients)
116         {
117             unregister(c.fd);
118 
119             if (c.isAlive)
120             {
121                 c.close();
122             }
123         }
124         _clients.unlock();
125 
126         _clients.clear();
127 
128         unregister(_listener.fd);
129         _listener.close();
130 
131         version (Windows) { } else
132         {
133             static import core.sys.posix.unistd;
134             core.sys.posix.unistd.close(_eventHandle);
135         }
136     }
137 
138     void removeClient(const int fd, const int err = 0)
139     {
140         unregister(fd);
141 
142         TcpClient client = _clients[fd];
143 
144         if (client !is null)
145         {
146             if ((err > 0) && (_onSocketError !is null))
147             {
148                 _onSocketError(fd, client._remoteAddress, formatSocketError(err));
149             }
150 
151             if (_onDisConnected !is null)
152             {
153                 _onDisConnected(fd, client._remoteAddress);
154             }
155 
156             client.close();
157         }
158 
159         _clients.remove(fd);
160     }
161 
162     version (Windows)
163     {
164         void iocp_send(const int fd, const scope ubyte[] data);
165         void iocp_receive(const int fd);
166     }
167 
168     @property Codec codec()
169     {
170         return this._codec;
171     }
172 
173 protected:
174 
175     version (Windows) { } else void accept()
176     {
177         _acceptPool.run!beginAccept(this);
178     }
179 
180     static void beginAccept(Selector selector)
181     {
182         Socket socket;
183 
184         try
185         {
186             socket = selector._listener.accept();
187         }
188         catch (Exception e)
189         {
190             return;
191         }
192 
193         TcpClient client = new TcpClient(selector, socket);
194 
195         try
196         {
197             client.setKeepAlive(600, 10);
198         }
199         catch (Exception e)
200         {
201         }
202 
203         selector._clients[client.fd] = client;
204 
205         if (selector._onConnected !is null)
206         {
207             selector._onConnected(client);
208         }
209 
210         selector.register(client.fd, EventType.READ);
211 
212         version (Windows) selector.iocp_receive(client.fd);
213     }
214 
215     version (Windows)
216     {
217         void read(const int fd, const scope ubyte[] data)
218         {
219             TcpClient client = _clients[fd];
220 
221             if ((client !is null) && (onReceive !is null))
222             {
223                 onReceive(client, data);
224             }
225         }
226     }
227     else
228     {
229         void read(const int fd)
230         {
231             TcpClient client = _clients[fd];
232 
233             if (client !is null)
234             {
235                 client.weakup(EventType.READ);
236             }
237         }
238     }
239 
240     version (Windows) { } else void write(const int fd)
241     {
242         TcpClient client = _clients[fd];
243 
244         if (client !is null)
245         {
246             client.weakup(EventType.WRITE);
247         }
248     }
249 
250     bool                 _isDisposed = false;
251     TcpListener          _listener;
252     bool                 _runing;
253     int                  _workerThreadNum;
254 
255     version (Windows)
256     {
257         import core.sys.windows.basetsd : HANDLE;
258         HANDLE _eventHandle;
259 
260         static void handleEvent(Selector selector)
261         {
262             import async.event.iocp : Iocp;
263             Iocp.handleEvent(selector);
264         }
265     }
266     else
267     {
268         int _eventHandle;
269 
270         void handleEvent();
271     }
272 
273 private:
274 
275     ThreadPool           _acceptPool;
276     Map!(int, TcpClient) _clients;
277 
278     OnConnected          _onConnected;
279     OnDisConnected       _onDisConnected;
280     OnSocketError        _onSocketError;
281 
282     Codec                _codec;
283 
284 public:
285 
286     ThreadPool           workerPool;
287 
288     OnReceive            onReceive;
289     OnSendCompleted      onSendCompleted;
290 }