1 module async.event.selector;
2 
3 import core.sync.mutex;
4 import core.thread;
5 version (Windows)
6 {
7 }
8 else
9 {
10     import core.sys.posix.unistd;
11 }
12 
13 import std.socket;
14 import std.parallelism;
15 
16 import async.net.tcplistener;
17 import async.net.tcpclient;
18 import async.container.map;
19 import async.thread;
20 
21 alias OnConnected     = void function(TcpClient);                       nothrow @trusted
22 alias OnDisConnected  = void function(int, string);                     nothrow @trusted
23 alias OnReceive       = void function(TcpClient, in ubyte[]);           nothrow @trusted
24 alias OnSendCompleted = void function(int, string, in ubyte[], size_t); nothrow @trusted
25 alias OnSocketError   = void function(int, string, string);             nothrow @trusted
26 
27 enum EventType
28 {
29     ACCEPT, READ, WRITE, READWRITE
30 }
31 
32 abstract class Selector
33 {
34     this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum)
35     {
36         this._onConnected    = onConnected;
37         this._onDisConnected = onDisConnected;
38         this.onReceive       = onReceive;
39         this.onSendCompleted = onSendCompleted;
40         this._onSocketError  = onSocketError;
41 
42         _clients  = new Map!(int, TcpClient);
43         _listener = listener;
44 
45         if (workerThreadNum <= 0)
46         {
47             workerThreadNum = totalCPUs;
48         }
49 
50         _acceptPool = new ThreadPool(1);
51         workerPool  = new ThreadPool(workerThreadNum);
52     }
53 
54     ~this()
55     {
56         dispose();
57     }
58 
59     bool register  (int fd, EventType et);
60     bool reregister(int fd, EventType et);
61     bool unregister(int fd);
62 
63     void startLoop()
64     {
65         _runing = true;
66 
67         while (_runing)
68         {
69             handleEvent();
70         }
71     }
72 
73     void stop()
74     {
75         _runing = false;
76     }
77 
78     void dispose()
79     {
80         if (_isDisposed)
81         {
82             return;
83         }
84 
85         _isDisposed = true;
86 
87         _clients.lock();
88         foreach (ref c; _clients)
89         {
90             unregister(c.fd);
91 
92             if (c.isAlive)
93             {
94                 c.close();
95             }
96         }
97         _clients.unlock();
98 
99         _clients.clear();
100 
101         unregister(_listener.fd);
102         _listener.close();
103 
104         version (Windows)
105         {
106         }
107         else
108         {
109             core.sys.posix.unistd.close(_eventHandle);
110         }
111     }
112 
113     void removeClient(int fd, int err = 0)
114     {
115         unregister(fd);
116 
117         TcpClient client = _clients[fd];
118 
119         if (client !is null)
120         {
121             if ((err > 0) && (_onSocketError !is null))
122             {
123                 _onSocketError(fd, client._remoteAddress, formatSocketError(err));
124             }
125 
126             if (_onDisConnected !is null)
127             {
128                 _onDisConnected(fd, client._remoteAddress);
129             }
130 
131             client.close();
132         }
133 
134         _clients.remove(fd);
135     }
136 
137 protected:
138 
139     void accept()
140     {
141         _acceptPool.run!beginAccept(this);
142     }
143 
144     static void beginAccept(Selector selector)
145     {
146         Socket socket;
147 
148         try
149         {
150             socket = selector._listener.accept();
151         }
152         catch (Exception e)
153         {
154             return;
155         }
156 
157         TcpClient client = new TcpClient(selector, socket);
158         client.setKeepAlive(600, 10);
159         selector._clients[client.fd] = client;
160 
161         if (selector._onConnected !is null)
162         {
163             selector._onConnected(client);
164         }
165 
166         selector.register(client.fd, EventType.READ);
167     }
168 
169     void read(int fd)
170     {
171         TcpClient client = _clients[fd];
172 
173         if (client !is null)
174         {
175             client.weakup(EventType.READ);
176         }
177     }
178 
179     void write(int fd)
180     {
181         TcpClient client = _clients[fd];
182 
183         if (client !is null)
184         {
185             client.weakup(EventType.WRITE);
186         }
187     }
188 
189 protected:
190 
191     bool                 _isDisposed = false;
192     TcpListener          _listener;
193     int                  _eventHandle;
194 
195     void handleEvent();
196 
197 private:
198 
199     ThreadPool           _acceptPool;
200     bool                 _runing;
201     Map!(int, TcpClient) _clients;
202 
203     OnConnected          _onConnected;
204     OnDisConnected       _onDisConnected;
205     OnSocketError        _onSocketError;
206 
207 public:
208 
209     ThreadPool           workerPool;
210 
211     OnReceive            onReceive;
212     OnSendCompleted      onSendCompleted;
213 }