1 module async.event.selector;
2 
3 import core.sync.mutex;
4 import core.thread;
5 
6 import std.socket;
7 import std.parallelism : totalCPUs;
8 
9 import async.net.tcplistener;
10 import async.net.tcpclient;
11 import async.container.map;
12 import async.thread;
13 import async.codec;
14 
15 alias OnConnected     = void function(TcpClient)                                            nothrow @trusted;
16 alias OnDisConnected  = void function(const int, string)                                    nothrow @trusted;
17 alias OnReceive       = void function(TcpClient, const scope ubyte[])                       nothrow @trusted;
18 alias OnSendCompleted = void function(const int, string, const scope ubyte[], const size_t) nothrow @trusted;
19 alias OnSocketError   = void function(const int, string, string)                            nothrow @trusted;
20 
21 enum EventType
22 {
23     ACCEPT, READ, WRITE, READWRITE
24 }
25 
26 abstract class Selector
27 {
28     this(TcpListener listener,
29         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
30         OnSocketError onSocketError, Codec codec, const int workerThreadNum)
31     {
32         this._onConnected    = onConnected;
33         this._onDisConnected = onDisConnected;
34         this.onReceive       = onReceive;
35         this.onSendCompleted = onSendCompleted;
36         this._onSocketError  = onSocketError;
37         this._codec          = codec;
38 
39         _clients  = new Map!(int, TcpClient);
40         _listener = listener;
41         _workerThreadNum = (workerThreadNum <= 0) ? totalCPUs * 2 + 2 : workerThreadNum;
42 
43         version (Windows) { } else _acceptPool = new ThreadPool(1);
44         workerPool = new ThreadPool(_workerThreadNum);
45     }
46 
47     ~this()
48     {
49         dispose();
50     }
51 
52     bool register  (const int fd, EventType et);
53     bool reregister(const int fd, EventType et);
54     bool unregister(const int fd);
55 
56     void startLoop()
57     {
58         _runing = true;
59 
60         version (Windows)
61         {
62             foreach (i; 0 .. _workerThreadNum)
63                 workerPool.run!handleEvent(this);
64         }
65 
66         while (_runing)
67         {
68             version (Windows)
69             {
70                 beginAccept(this);
71             }
72             else
73             {
74                 handleEvent();
75             }
76         }
77     }
78 
79     void stop()
80     {
81         _runing = false;
82     }
83 
84     void dispose()
85     {
86         if (_isDisposed)
87         {
88             return;
89         }
90 
91         _isDisposed = true;
92 
93         _clients.lock();
94         foreach (ref c; _clients)
95         {
96             unregister(c.fd);
97 
98             if (c.isAlive)
99             {
100                 c.close();
101             }
102         }
103         _clients.unlock();
104 
105         _clients.clear();
106 
107         unregister(_listener.fd);
108         _listener.close();
109 
110         version (Windows) { } else
111         {
112             static import core.sys.posix.unistd;
113             core.sys.posix.unistd.close(_eventHandle);
114         }
115     }
116 
117     void removeClient(const int fd, const int err = 0)
118     {
119         unregister(fd);
120 
121         TcpClient client = _clients[fd];
122 
123         if (client !is null)
124         {
125             if ((err > 0) && (_onSocketError !is null))
126             {
127                 _onSocketError(fd, client._remoteAddress, formatSocketError(err));
128             }
129 
130             if (_onDisConnected !is null)
131             {
132                 _onDisConnected(fd, client._remoteAddress);
133             }
134 
135             client.close();
136         }
137 
138         _clients.remove(fd);
139     }
140 
141     version (Windows)
142     {
143         void iocp_send(const int fd, const scope ubyte[] data);
144         void iocp_receive(const int fd);
145     }
146 
147     @property Codec codec()
148     {
149         return this._codec;
150     }
151 
152 protected:
153 
154     version (Windows) { } else void accept()
155     {
156         _acceptPool.run!beginAccept(this);
157     }
158 
159     static void beginAccept(Selector selector)
160     {
161         Socket socket;
162 
163         try
164         {
165             socket = selector._listener.accept();
166         }
167         catch (Exception e)
168         {
169             return;
170         }
171 
172         TcpClient client = new TcpClient(selector, socket);
173 
174         try
175         {
176             client.setKeepAlive(600, 10);
177         }
178         catch (Exception e)
179         {
180         }
181 
182         selector._clients[client.fd] = client;
183 
184         if (selector._onConnected !is null)
185         {
186             selector._onConnected(client);
187         }
188 
189         selector.register(client.fd, EventType.READ);
190 
191         version (Windows) selector.iocp_receive(client.fd);
192     }
193 
194     version (Windows)
195     {
196         void read(const int fd, const scope ubyte[] data)
197         {
198             TcpClient client = _clients[fd];
199 
200             if ((client !is null) && (onReceive !is null))
201             {
202                 onReceive(client, data);
203             }
204         }
205     }
206     else
207     {
208         void read(const int fd)
209         {
210             TcpClient client = _clients[fd];
211 
212             if (client !is null)
213             {
214                 client.weakup(EventType.READ);
215             }
216         }
217     }
218 
219     version (Windows) { } else void write(const int fd)
220     {
221         TcpClient client = _clients[fd];
222 
223         if (client !is null)
224         {
225             client.weakup(EventType.WRITE);
226         }
227     }
228 
229     bool                 _isDisposed = false;
230     TcpListener          _listener;
231     bool                 _runing;
232     int                  _workerThreadNum;
233 
234     version (Windows)
235     {
236         import core.sys.windows.basetsd : HANDLE;
237         HANDLE _eventHandle;
238 
239         static void handleEvent(Selector selector)
240         {
241             import async.event.iocp : Iocp;
242             Iocp.handleEvent(selector);
243         }
244     }
245     else
246     {
247         int _eventHandle;
248 
249         void handleEvent();
250     }
251 
252 private:
253 
254     ThreadPool           _acceptPool;
255     Map!(int, TcpClient) _clients;
256 
257     OnConnected          _onConnected;
258     OnDisConnected       _onDisConnected;
259     OnSocketError        _onSocketError;
260 
261     Codec                _codec;
262 
263 public:
264 
265     ThreadPool           workerPool;
266 
267     OnReceive            onReceive;
268     OnSendCompleted      onSendCompleted;
269 }