1 module async.event.iocp; 2 3 debug import std.stdio; 4 5 version (Windows): 6 7 pragma(lib, "Ws2_32"); 8 9 import core.stdc.errno; 10 import core.sys.windows.windows; 11 import core.sys.windows.winsock2; 12 import core.sys.windows.mswsock; 13 14 import std.socket; 15 16 import async.event.selector; 17 import async.net.tcpstream; 18 import async.net.tcplistener; 19 import async.net.tcpclient; 20 21 alias LoopSelector = Iocp; 22 23 class Iocp : Selector 24 { 25 this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum) 26 { 27 super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, workerThreadNum); 28 29 _eventHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 30 register(_listener.fd, EventType.ACCEPT); 31 } 32 33 override bool register(int fd, EventType et) 34 { 35 if (fd < 0) 36 { 37 return false; 38 } 39 40 auto ret = CreateIoCompletionPort(cast(HANDLE)fd, _eventHandle, cast(ULONG_PTR)fd, 0); 41 return !(!ret); 42 } 43 44 override bool reregister(int fd, EventType et) 45 { 46 if (fd < 0) 47 { 48 return false; 49 } 50 51 return true; 52 } 53 54 override bool unregister(int fd) 55 { 56 if (fd < 0) 57 { 58 return false; 59 } 60 61 return true; 62 } 63 64 // override void startLoop() 65 // { 66 // _runing = true; 67 // 68 // while (_runing) 69 // { 70 //// Socket socket = _listener.accept(); 71 //// TcpClient client = new TcpClient(this, socket); 72 //// register(client.fd, EventType.READ); 73 //// _clients[client.fd] = client; 74 // handleEvent(); 75 // } 76 // } 77 78 override protected void handleEvent() 79 { 80 DWORD bytes = 0; 81 ULONG_PTR key = 0; 82 OVERLAPPED* overlapped; 83 auto timeout = 1000; 84 85 const int ret = GetQueuedCompletionStatus(_eventHandle, &bytes, &key, &overlapped, timeout); 86 87 if (overlapped is null) 88 { 89 debug writeln("Event is null."); 90 91 return; 92 } 93 94 if (ret == 0) 95 { 96 const auto error = GetLastError(); 97 if (error == WAIT_TIMEOUT) // || error == ERROR_OPERATION_ABORTED 98 { 99 return; 100 } 101 102 auto ev = cast(IocpContext*)overlapped; 103 104 if (ev && ev.fd) 105 { 106 removeClient(ev.fd); 107 debug writeln("Close event: ", ev.fd); 108 109 return; 110 } 111 } 112 113 auto ev = cast(IocpContext*)overlapped; 114 switch (ev.operation) 115 { 116 case IocpOperation.accept: 117 accept(); 118 119 break; 120 case IocpOperation.connect: 121 read(ev.fd); 122 123 break; 124 case IocpOperation.read: 125 write(ev.fd); 126 127 break; 128 case IocpOperation.write: 129 130 break; 131 case IocpOperation.event: 132 133 break; 134 case IocpOperation.close: 135 removeClient(ev.fd); 136 debug writeln("Close event: ", ev.fd); 137 138 break; 139 default: 140 debug writefln("Unsupported operation type: ", ev.operation); 141 142 break; 143 } 144 } 145 146 shared static this() 147 { 148 WSADATA wsaData; 149 int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); 150 151 SOCKET ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 152 scope (exit) 153 closesocket(ListenSocket); 154 GUID guid; 155 // mixin(GET_FUNC_POINTER("WSAID_ACCEPTEX", "AcceptEx")); 156 // mixin(GET_FUNC_POINTER("WSAID_CONNECTEX", "ConnectEx")); 157 } 158 159 shared static ~this() 160 { 161 WSACleanup(); 162 } 163 164 private: 165 166 HANDLE _eventHandle; 167 } 168 169 enum IocpOperation 170 { 171 accept, 172 connect, 173 read, 174 write, 175 event, 176 close 177 } 178 179 struct IocpContext 180 { 181 OVERLAPPED overlapped; 182 IocpOperation operation; 183 int fd; 184 }