1 module async.event.iocp;
2 
3 debug import std.stdio;
4 
5 version (Windows):
6 
7 pragma(lib, "Ws2_32");
8 
9 import core.stdc.errno;
10 import core.sys.windows.windows;
11 import core.sys.windows.winsock2;
12 import core.sys.windows.mswsock;
13 
14 import std.socket;
15 
16 import async.event.selector;
17 import async.net.tcpstream;
18 import async.net.tcplistener;
19 import async.net.tcpclient;
20 
21 alias LoopSelector = Iocp;
22 
23 class Iocp : Selector
24 {
25     this(TcpListener listener, OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted, OnSocketError onSocketError, int workerThreadNum)
26     {
27         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, workerThreadNum);
28 
29         _eventHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
30         register(_listener.fd, EventType.ACCEPT);
31     }
32 
33     override bool register(int fd, EventType et)
34     {
35         if (fd < 0)
36         {
37             return false;
38         }
39 
40         auto ret = CreateIoCompletionPort(cast(HANDLE)fd, _eventHandle, cast(ULONG_PTR)fd, 0);
41         return !(!ret);
42     }
43 
44     override bool reregister(int fd, EventType et)
45     {
46         if (fd < 0)
47         {
48             return false;
49         }
50 
51         return true;
52     }
53 
54     override bool unregister(int fd)
55     {
56         if (fd < 0)
57         {
58             return false;
59         }
60 
61         return true;
62     }
63 
64 //    override void startLoop()
65 //    {
66 //        _runing = true;
67 //
68 //        while (_runing)
69 //        {
70 ////            Socket socket = _listener.accept();
71 ////            TcpClient client = new TcpClient(this, socket);
72 ////            register(client.fd, EventType.READ);
73 ////            _clients[client.fd] = client;
74 //            handleEvent();
75 //        }
76 //    }
77 
78     override protected void handleEvent()
79     {
80         DWORD bytes   = 0;
81         ULONG_PTR key = 0;
82         OVERLAPPED*   overlapped;
83         auto timeout  = 1000;
84 
85         const int ret = GetQueuedCompletionStatus(_eventHandle, &bytes, &key, &overlapped, timeout);
86 
87         if (overlapped is null)
88         {
89             debug writeln("Event is null.");
90 
91             return;
92         }
93 
94         if (ret == 0)
95         {
96             const auto error = GetLastError();
97             if (error == WAIT_TIMEOUT) // || error == ERROR_OPERATION_ABORTED
98             {
99                 return;
100             }
101 
102             auto ev = cast(IocpContext*)overlapped;
103 
104             if (ev && ev.fd)
105             {
106                 removeClient(ev.fd);
107                 debug writeln("Close event: ", ev.fd);
108 
109                 return;
110             }
111         }
112 
113         auto ev = cast(IocpContext*)overlapped;
114         switch (ev.operation)
115         {
116         case IocpOperation.accept:
117             accept();
118 
119             break;
120         case IocpOperation.connect:
121             read(ev.fd);
122 
123             break;
124         case IocpOperation.read:
125             write(ev.fd);
126 
127             break;
128         case IocpOperation.write:
129 
130             break;
131         case IocpOperation.event:
132 
133             break;
134         case IocpOperation.close:
135             removeClient(ev.fd);
136             debug writeln("Close event: ", ev.fd);
137 
138             break;
139         default:
140             debug writefln("Unsupported operation type: ", ev.operation);
141 
142             break;
143         }
144     }
145 
146     shared static this()
147     {
148         WSADATA wsaData;
149         int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
150 
151         SOCKET ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
152         scope (exit)
153             closesocket(ListenSocket);
154         GUID guid;
155 //        mixin(GET_FUNC_POINTER("WSAID_ACCEPTEX", "AcceptEx"));
156 //        mixin(GET_FUNC_POINTER("WSAID_CONNECTEX", "ConnectEx"));
157     }
158 
159     shared static ~this()
160     {
161         WSACleanup();
162     }
163 
164 private:
165 
166     HANDLE _eventHandle;
167 }
168 
169 enum IocpOperation
170 {
171     accept,
172     connect,
173     read,
174     write,
175     event,
176     close
177 }
178 
179 struct IocpContext
180 {
181     OVERLAPPED    overlapped;
182     IocpOperation operation;
183     int           fd;
184 }