2
3
4
5
21 const std::string& name ,
34 m_Socket = std::make_unique<Socket>(socketFd);
35 m_Socket->SetKeepAlive(
true);
37 m_Channel = std::make_unique<Channel>(socketFd, m_IoLoop);
38 m_Channel->SetReadCallback ([=]() { HandleRead(); });
39 m_Channel->SetWriteCallback([=]() { HandleWrite(); });
40 m_Channel->SetCloseCallback([=]() { HandleClose(); });
41 m_Channel->SetErrorCallback([=]() { HandleError(); });
46 SPICES_CORE_INFO(
"TcpConnection disconnection")
51 if (m_State.load() == State::Connected)
55 SendInLoop(buffer.c_str(), buffer.size());
59 m_IoLoop->RunInLoop([=]() { SendInLoop(buffer.c_str(), buffer.size()); });
66 if (m_State == State::Connected)
77 m_Channel->Tie(shared_from_this());
78 m_Channel->EnableReading();
80 m_ConnectionCallback.Broadcast(shared_from_this());
85 if (m_State.load() == State::Connected)
89 m_Channel->DisableAll();
90 m_CloseCallback.Broadcast(shared_from_this());
98 size_t n = m_InputBuffer.ReadFd(m_Channel->Fd(), &saveErrno);
101 m_MessageCallback.Broadcast(shared_from_this(), &m_InputBuffer);
109 SPICES_CORE_ERROR(
"TcpConnection::HandleRead Error")
116 if (m_Channel->IsWriting())
119 size_t n = m_OutputBuffer.WriteFd(m_Channel->Fd(), &saveErrno);
122 m_OutputBuffer.Retrieve(n);
123 if (m_OutputBuffer.ReadableBytes() == 0)
125 m_Channel->DisableWriting();
126 if (!m_WriteCompleteCallback.empty())
128 m_IoLoop->QueueInLoop([=]() { m_WriteCompleteCallback.Broadcast(shared_from_this()); });
130 if (m_State.load() == State::Disconnecting)
138 SPICES_CORE_ERROR(
"TcpConnection::HandleWrite Error")
143 SPICES_CORE_ERROR(
"TcpConnection::HandleWrite Error")
150 m_Channel->DisableAll();
152 const TcpConnectionPtr connectionPtr = shared_from_this();
153 m_ConnectionCallback.Broadcast(connectionPtr);
154 m_CloseCallback.Broadcast(connectionPtr);
160 socklen_t optLen =
sizeof(optVal);
162 if (::getsockopt(m_Channel->Fd(), SOL_SOCKET, SO_ERROR, &optVal, &optLen) < 0)
164 err = WSAGetLastError();
171 std::stringstream ss;
172 ss <<
"TcpConnection::HandleError SO_ERROR: " << err;
174 SPICES_CORE_ERROR(ss.str())
180 size_t remaining = len;
181 bool faultError =
false;
184
185
186 if (m_State.load() == State::Disconnected)
188 SPICES_CORE_ERROR(
"Disconnected, Can bot send message")
193
194
195 if (!m_Channel->IsWriting() && m_OutputBuffer.ReadableBytes() == 0)
197 nWrote = ::send(m_Channel->Fd(), message, len, 0);
200 remaining = len - nWrote;
201 if (remaining == 0 && m_WriteCompleteCallback.size() > 0)
203 m_IoLoop->QueueInLoop([=]() { m_WriteCompleteCallback.Broadcast(shared_from_this()); });
209 const int err = WSAGetLastError();
211 if (err != WSAEWOULDBLOCK)
213 SPICES_CORE_ERROR(
" TcpConnection::SendInLoop")
215 if (err == WSAECONNRESET || err == WSAECONNRESET)
224
225
226 if (!faultError && remaining > 0)
228 const size_t oldLen = m_OutputBuffer.ReadableBytes();
229 if (oldLen + remaining >= HighWaterMark && oldLen < HighWaterMark && m_HighWaterMarkCallback.size() > 0)
231 m_IoLoop->QueueInLoop([=]() { m_HighWaterMarkCallback.Broadcast(shared_from_this(), oldLen + remaining); });
233 m_OutputBuffer.Append((
char*)message + nWrote, remaining);
234 if (!m_Channel->IsWriting())
236 m_Channel->EnableWriting();
243 if (!m_Channel->IsWriting())
245 m_Socket->ShutDownWrite();
#define SPICES_PROFILE_ZONE
bool IsInLoopThread() const
Determine if current thread is in eventloop thread.
Wrapper of Poller and wakeup socket to acceptor(SubLoop).
This class is Wrapper of current socket address.
void HandleWrite()
Handle Write event.
std::string m_Name
TcpConnection name.
void HandleClose()
Handle Close event.
void ConnectEstablished()
TcpConnection(EventLoop *ioLoop, const std::string &name, SOCKET socketFd, const InetAddress &localAddress, const InetAddress &peerAddress)
Constructor Function.
void HandleRead()
Handle Read event.
void HandleError() const
Handle Error event.
void SendInLoop(const char *message, size_t len)
Send message to socket.
void SetState(State state)
void Send(const std::string &buffer)
Send message to socket.
void ShutDown()
ShutDown socket.
EventLoop * m_IoLoop
io Loop from TcpServer::NewConnection.
virtual ~TcpConnection()
Destructor Function.
void ShutDownInLoop() const
ShutDown socket.
Combine of Socket Connection data.
constexpr size_t HighWaterMark