1 module async.event.iocp;
2 
3 debug import std.stdio;
4 
5 version (Windows):
6 
7 import core.stdc.errno;
8 
9 import core.sys.windows.windows;
10 import core.sys.windows.winsock2;
11 import core.sys.windows.mswsock;
12 
13 import std.socket;
14 
15 import async.event.selector;
16 import async.net.tcpstream;
17 import async.net.tcplistener;
18 import async.net.tcpclient;
19 import async.codec;
20 
21 alias LoopSelector = Iocp;
22 
23 class Iocp : Selector
24 {
25     this(TcpListener listener,
26         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
27         OnSocketError onSocketError, Codec codec, const int workerThreadNum)
28     {
29         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum);
30 
31         _eventHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, _workerThreadNum);
32     }
33 
34     override bool register(const int fd, EventType et)
35     {
36         if (fd < 0)
37         {
38             return false;
39         }
40 
41         return (CreateIoCompletionPort(cast(HANDLE)fd, _eventHandle, cast(size_t)(cast(void*) fd), 0) != null);
42     }
43 
44     override bool reregister(const int fd, EventType et)
45     {
46         if (fd < 0)
47         {
48             return false;
49         }
50 
51         return true;
52     }
53 
54     override bool unregister(const int fd)
55     {
56         if (fd < 0)
57         {
58             return false;
59         }
60 
61         return true;
62     }
63 
64     static void handleEvent(Selector selector)
65     {
66         OVERLAPPED* overlapped;
67         IocpContext* context;
68         WSABUF buffSend;
69         uint dwSendNumBytes;
70         uint dwFlags;
71         DWORD bytes;
72         int ret;
73 
74         while (selector._runing)
75         {
76             ULONG_PTR key;
77             bytes = 0;
78             ret = GetQueuedCompletionStatus(selector._eventHandle, &bytes, &key, &overlapped, INFINITE);
79             context = cast(IocpContext*) overlapped;
80 
81             if (ret == 0)
82             {
83                 immutable err = GetLastError();
84                 if (err == WAIT_TIMEOUT)
85                     continue;
86 
87                 if (context !is null)
88                 {
89                     selector.removeClient(context.fd, err);
90                     debug writeln("Close event: ", context.fd);
91                 }
92 
93                 continue;
94             }
95 
96             if (bytes == 0)
97             {
98                 selector.removeClient(context.fd, 0);
99                 debug writeln("Close event: ", context.fd);
100 
101                 continue;
102             }
103 
104             if (context.operation == IocpOperation.read) // A read operation complete.
105             {
106                 selector.read(context.fd, cast(ubyte[]) context.wsabuf.buf[0..bytes]);
107 
108                 // Read operation completed, so post Read operation for remainder (if exists).
109                 selector.iocp_receive(context.fd);
110             }
111             else if (context.operation == IocpOperation.write) // A write operation complete.
112             {
113                 context.nSentBytes += bytes;
114                 dwFlags = 0;
115 
116                 if (context.nSentBytes < context.nTotalBytes)
117                 {
118                     // A Write operation has not completed yet, so post another.
119                     // Write operation to post remaining data.
120                     context.operation = IocpOperation.write;
121                     buffSend.buf = context.buffer.ptr + context.nSentBytes;
122                     buffSend.len = context.nTotalBytes - context.nSentBytes;
123                     ret = WSASend(cast(SOCKET) context.fd, &buffSend, 1, &dwSendNumBytes, dwFlags, &(context.overlapped), null);
124 
125                     if (ret == SOCKET_ERROR)
126                     {
127                         immutable err = WSAGetLastError();
128 
129                         if (err != ERROR_IO_PENDING)
130                         {
131                             selector.removeClient(context.fd, err);
132                             debug writeln("Close event: ", context.fd);
133 
134                             continue;
135                         }
136                     }
137                 }
138                 else
139                 {
140                     // Write operation completed, so post Read operation.
141                     selector.iocp_receive(context.fd);
142                 }
143             }
144         }
145     }
146 
147     override void iocp_receive(const int fd)
148     {
149         IocpContext* context = new IocpContext();
150         context.operation   = IocpOperation.read;
151         context.nTotalBytes = 0;
152         context.nSentBytes  = 0;
153         context.wsabuf.buf  = context.buffer.ptr;
154         context.wsabuf.len  = context.buffer.sizeof;
155         context.fd          = fd;
156         uint dwRecvNumBytes = 0;
157         uint dwFlags = 0;
158         int ret = WSARecv(cast(HANDLE) fd, &context.wsabuf, 1, &dwRecvNumBytes, &dwFlags, &context.overlapped, null);
159 
160         if (ret == SOCKET_ERROR)
161         {
162             immutable err = WSAGetLastError();
163 
164             if (err != ERROR_IO_PENDING)
165             {
166                 removeClient(fd, err);
167                 debug writeln("Close event: ", fd);
168             }
169         }
170     }
171 
172     override void iocp_send(const int fd, const scope ubyte[] data)
173     {
174         size_t pos;
175         while (pos < data.length)
176         {
177             size_t len = data.length - pos;
178             len = ((len > BUFFERSIZE) ? BUFFERSIZE : len);
179 
180             IocpContext* context = new IocpContext();
181             context.operation  = IocpOperation.write;
182             context.buffer[0..len] = cast(char[]) data[pos..pos + len];
183             context.nTotalBytes = cast(int) len;
184             context.nSentBytes  = 0;
185             context.wsabuf.buf  = context.buffer.ptr;
186             context.wsabuf.len  = cast(int) len;
187             context.fd          = fd;
188             uint dwSendNumBytes = 0;
189             immutable uint dwFlags = 0;
190             int ret = WSASend(cast(HANDLE) fd, &context.wsabuf, 1, &dwSendNumBytes, dwFlags, &context.overlapped, null);
191 
192             if (ret == SOCKET_ERROR)
193             {
194                 immutable err = WSAGetLastError();
195 
196                 if (err != ERROR_IO_PENDING)
197                 {
198                     removeClient(fd, err);
199                     debug writeln("Close event: ", fd);
200 
201                     return;
202                 }
203             }
204 
205             pos += len;
206         }
207     }
208 }
209 
210 private:
211 
212 enum IocpOperation
213 {
214     accept,
215     connect,
216     read,
217     write,
218     event,
219     close
220 }
221 
222 immutable BUFFERSIZE = 4096 * 2;
223 
224 struct IocpContext
225 {
226     OVERLAPPED       overlapped;
227     char[BUFFERSIZE] buffer;
228     WSABUF           wsabuf;
229     int              nTotalBytes;
230     int              nSentBytes;
231     IocpOperation    operation;
232     int              fd;
233 }
234 
235 
236 extern (Windows):
237 
238 alias POVERLAPPED_COMPLETION_ROUTINE = void function(DWORD, DWORD, OVERLAPPED*, DWORD);
239 int WSASend(SOCKET, WSABUF*, DWORD, LPDWORD, DWORD,   OVERLAPPED*, POVERLAPPED_COMPLETION_ROUTINE);
240 int WSARecv(SOCKET, WSABUF*, DWORD, LPDWORD, LPDWORD, OVERLAPPED*, POVERLAPPED_COMPLETION_ROUTINE);