2
3
4
5
10#include "Poller/Poller.h"
17
18
19
29 m_ThreadId = GetCurrentThreadId();
30 m_Poller = Poller::DefaultPoller(
this);
34 m_WakeupFd.Connect(address);
36 m_WeakupChannel = std::make_unique<Channel>(m_WakeupFd.Fd(),
this);
38 m_WeakupChannel->SetReadCallback([=]() { HandleWakeUp(); });
39 m_WeakupChannel->EnableReading();
44 m_WeakupChannel->DisableAll();
45 m_WeakupChannel->Remove();
55 m_ActiveChannels.clear();
56 m_Poller->Poll(pollTimeoutMs, &m_ActiveChannels);
58 for (Channel* channel : m_ActiveChannels)
60 channel->HandleEvent();
94 std::unique_lock<std::mutex> lock(m_Mutex);
95 m_PendingFunctors.emplace_back(cb);
98 if (!IsInLoopThread() || m_IsCallingPendingFunctors)
107 int n = ::send(m_WakeupFd.Fd(), &one,
sizeof(one), 0);
110 std::stringstream ss;
111 ss <<
"EventLoop::WakeUp error, Error: " << WSAGetLastError();
113 SPICES_CORE_ERROR(ss.str())
119 m_Poller->UpdateChannel(channel);
124 m_Poller->RemoveChannel(channel);
129 return m_Poller->HasChannel(channel);
135 int n = ::recv(m_WakeupFd.Fd(), &one,
sizeof(one), 0);
138 std::stringstream ss;
139 ss <<
"EventLoop::HandleWakeUp error, Error: " << WSAGetLastError();
141 SPICES_CORE_ERROR(ss.str())
147 std::vector<Functor> functors;
148 m_IsCallingPendingFunctors =
true;
151 std::unique_lock<std::mutex> lock(m_Mutex);
152 functors.swap(m_PendingFunctors);
155 for (
const Functor& functor : functors)
160 m_IsCallingPendingFunctors =
false;
178
179
180 static _declspec(thread) EventLoopThreadWrapper pTLSEventLoop;
#define SPICES_PROFILE_ZONE
Wrapper of SOCKET'S Event.
virtual ~EventLoopThreadWrapper()
Destructor Function.
static EventLoop *& GetInst(InetAddress *address=nullptr)
Get EventLoop Instance. @reutrn Returns EventLoop Instance.
EventLoop * instance
This thread EventLoop instance.
Wrapper of Instance/Delete ThreadCache in thread.
void HandleWakeUp()
Handle WakeUp notify by read one byte data.
void DoPendingFunctors()
Execute all pending functors.
bool HasChannel(Channel *channel) const
Determine if channel is inside poller.
EventLoop(InetAddress *address=nullptr)
Constructor Function.
void WakeUp()
WakeUp a thread, which wait on recv, by sending one byte data.
void Loop()
Start Event Loop.
bool IsInLoopThread() const
Determine if current thread is in eventloop thread.
void RunInLoop(Functor cb)
Push functor to pending functors.
~EventLoop()
Destructor Function.
void QueueInLoop(Functor cb)
Execute functor in EventLoop thread.
void UpdateChannel(Channel *channel) const
Update channel state with poller.
void Quit()
Quit from Event Loop.
void RemoveChannel(Channel *channel) const
Remove channel from poller.
Wrapper of Poller and wakeup socket to acceptor(SubLoop).
This class is Wrapper of current socket address.
constexpr uint32_t pollTimeoutMs
Poller timeout. Default is 10s.