1 module async.event.iocp; 2 3 debug import std.stdio; 4 5 version (Windows): 6 7 import core.stdc.errno; 8 9 import core.sys.windows.windows; 10 import core.sys.windows.winsock2; 11 import core.sys.windows.mswsock; 12 13 import std.socket; 14 15 import async.event.selector; 16 import async.net.tcpstream; 17 import async.net.tcplistener; 18 import async.net.tcpclient; 19 import async.codec; 20 21 alias LoopSelector = Iocp; 22 23 class Iocp : Selector 24 { 25 this(TcpListener listener, 26 OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, 27 OnSocketError onSocketError, Codec codec, const int workerThreadNum) 28 { 29 super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum); 30 31 _eventHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, _workerThreadNum); 32 } 33 34 override bool register(const int fd, EventType et) 35 { 36 if (fd < 0) 37 { 38 return false; 39 } 40 41 return (CreateIoCompletionPort(cast(HANDLE)fd, _eventHandle, cast(size_t)(cast(void*) fd), 0) != null); 42 } 43 44 override bool reregister(const int fd, EventType et) 45 { 46 if (fd < 0) 47 { 48 return false; 49 } 50 51 return true; 52 } 53 54 override bool unregister(const int fd) 55 { 56 if (fd < 0) 57 { 58 return false; 59 } 60 61 return true; 62 } 63 64 static void handleEvent(Selector selector) 65 { 66 OVERLAPPED* overlapped; 67 IocpContext* context; 68 WSABUF buffSend; 69 uint dwSendNumBytes; 70 uint dwFlags; 71 DWORD bytes; 72 int ret; 73 74 while (selector._runing) 75 { 76 ULONG_PTR key; 77 bytes = 0; 78 ret = GetQueuedCompletionStatus(selector._eventHandle, &bytes, &key, &overlapped, INFINITE); 79 context = cast(IocpContext*) overlapped; 80 81 if (ret == 0) 82 { 83 immutable err = GetLastError(); 84 if (err == WAIT_TIMEOUT) 85 continue; 86 87 if (context !is null) 88 { 89 selector.removeClient(context.fd, err); 90 debug writeln("Close event: ", context.fd); 91 } 92 93 continue; 94 } 95 96 if (bytes == 0) 97 { 98 selector.removeClient(context.fd, 0); 99 debug writeln("Close event: ", context.fd); 100 101 continue; 102 } 103 104 if (context.operation == IocpOperation.read) // A read operation complete. 105 { 106 selector.read(context.fd, cast(ubyte[]) context.wsabuf.buf[0..bytes]); 107 108 // Read operation completed, so post Read operation for remainder (if exists). 109 selector.iocp_receive(context.fd); 110 } 111 else if (context.operation == IocpOperation.write) // A write operation complete. 112 { 113 context.nSentBytes += bytes; 114 dwFlags = 0; 115 116 if (context.nSentBytes < context.nTotalBytes) 117 { 118 // A Write operation has not completed yet, so post another. 119 // Write operation to post remaining data. 120 context.operation = IocpOperation.write; 121 buffSend.buf = context.buffer.ptr + context.nSentBytes; 122 buffSend.len = context.nTotalBytes - context.nSentBytes; 123 ret = WSASend(cast(SOCKET) context.fd, &buffSend, 1, &dwSendNumBytes, dwFlags, &(context.overlapped), null); 124 125 if (ret == SOCKET_ERROR) 126 { 127 immutable err = WSAGetLastError(); 128 129 if (err != ERROR_IO_PENDING) 130 { 131 selector.removeClient(context.fd, err); 132 debug writeln("Close event: ", context.fd); 133 134 continue; 135 } 136 } 137 } 138 else 139 { 140 // Write operation completed, so post Read operation. 141 selector.iocp_receive(context.fd); 142 } 143 } 144 } 145 } 146 147 override void iocp_receive(const int fd) 148 { 149 IocpContext* context = new IocpContext(); 150 context.operation = IocpOperation.read; 151 context.nTotalBytes = 0; 152 context.nSentBytes = 0; 153 context.wsabuf.buf = context.buffer.ptr; 154 context.wsabuf.len = context.buffer.sizeof; 155 context.fd = fd; 156 uint dwRecvNumBytes = 0; 157 uint dwFlags = 0; 158 int ret = WSARecv(cast(HANDLE) fd, &context.wsabuf, 1, &dwRecvNumBytes, &dwFlags, &context.overlapped, null); 159 160 if (ret == SOCKET_ERROR) 161 { 162 immutable err = WSAGetLastError(); 163 164 if (err != ERROR_IO_PENDING) 165 { 166 removeClient(fd, err); 167 debug writeln("Close event: ", fd); 168 } 169 } 170 } 171 172 override void iocp_send(const int fd, const scope ubyte[] data) 173 { 174 size_t pos; 175 while (pos < data.length) 176 { 177 size_t len = data.length - pos; 178 len = ((len > BUFFERSIZE) ? BUFFERSIZE : len); 179 180 IocpContext* context = new IocpContext(); 181 context.operation = IocpOperation.write; 182 context.buffer[0..len] = cast(char[]) data[pos..pos + len]; 183 context.nTotalBytes = cast(int) len; 184 context.nSentBytes = 0; 185 context.wsabuf.buf = context.buffer.ptr; 186 context.wsabuf.len = cast(int) len; 187 context.fd = fd; 188 uint dwSendNumBytes = 0; 189 immutable uint dwFlags = 0; 190 int ret = WSASend(cast(HANDLE) fd, &context.wsabuf, 1, &dwSendNumBytes, dwFlags, &context.overlapped, null); 191 192 if (ret == SOCKET_ERROR) 193 { 194 immutable err = WSAGetLastError(); 195 196 if (err != ERROR_IO_PENDING) 197 { 198 removeClient(fd, err); 199 debug writeln("Close event: ", fd); 200 201 return; 202 } 203 } 204 205 pos += len; 206 } 207 } 208 } 209 210 private: 211 212 enum IocpOperation 213 { 214 accept, 215 connect, 216 read, 217 write, 218 event, 219 close 220 } 221 222 immutable BUFFERSIZE = 4096 * 2; 223 224 struct IocpContext 225 { 226 OVERLAPPED overlapped; 227 char[BUFFERSIZE] buffer; 228 WSABUF wsabuf; 229 int nTotalBytes; 230 int nSentBytes; 231 IocpOperation operation; 232 int fd; 233 } 234 235 236 extern (Windows): 237 238 alias POVERLAPPED_COMPLETION_ROUTINE = void function(DWORD, DWORD, OVERLAPPED*, DWORD); 239 int WSASend(SOCKET, WSABUF*, DWORD, LPDWORD, DWORD, OVERLAPPED*, POVERLAPPED_COMPLETION_ROUTINE); 240 int WSARecv(SOCKET, WSABUF*, DWORD, LPDWORD, LPDWORD, OVERLAPPED*, POVERLAPPED_COMPLETION_ROUTINE);