1 module async.eventloop;
2 
3 import std.stdio;
4 import std.socket;
5 import std.parallelism : totalCPUs;
6 
7 import async.event.selector;
8 import async.net.tcplistener;
9 import async.codec;
10 
11 version (Posix)
12 {
13     import core.sys.posix.signal;
14 }
15 
16 version (linux)
17 {
18     import async.event.epoll;
19 }
20 else version (OSX)
21 {
22     import async.event.kqueue;
23 }
24 else version (iOS)
25 {
26     import async.event.kqueue;
27 }
28 else version (TVOS)
29 {
30     import async.event.kqueue;
31 }
32 else version (WatchOS)
33 {
34     import async.event.kqueue;
35 }
36 else version (FreeBSD)
37 {
38     import async.event.kqueue;
39 }
40 else version (OpenBSD)
41 {
42     import async.event.kqueue;
43 }
44 else version (DragonFlyBSD)
45 {
46     import async.event.kqueue;
47 }
48 else version (Windows)
49 {
50     import async.event.iocp;
51 }
52 else
53 {
54     static assert(false, "Unsupported platform.");
55 }
56 
57 class EventLoop : LoopSelector
58 {
59     this(TcpListener listener,
60         OnConnected onConnected, OnDisConnected onDisConnected, OnReceive onReceive, OnSendCompleted onSendCompleted,
61         OnSocketError onSocketError, Codec codec = null, const int workerThreadNum = totalCPUs * 2 + 2)
62     {
63         version (Posix)
64         {
65             // For main thread.
66             signal(SIGPIPE, SIG_IGN);
67 
68             // For background threads.
69             sigset_t mask1;
70             sigemptyset(&mask1);
71             sigaddset(&mask1, SIGPIPE);
72             sigaddset(&mask1, SIGILL);
73             sigprocmask(SIG_BLOCK, &mask1, null);
74         }
75 
76         super(listener, onConnected, onDisConnected, onReceive, onSendCompleted, onSocketError, codec, workerThreadNum);
77     }
78 
79     void run()
80     {
81         writefln("Start listening to %s...", _listener.localAddress().toString());
82 
83         startLoop();
84     }
85 }