SpiecsEngine
 
Loading...
Searching...
No Matches
TcpConnection.cpp
Go to the documentation of this file.
1/**
2* @file TcpConnection.cpp.
3* @brief The TcpConnection Class Implementation.
4* @author Spices & Muduo.
5*/
6
7#include "Pchheader.h"
9#include "Socket.h"
10#include "Channel.h"
11#include "EventLoop.h"
12
13namespace Spices {
14
15namespace Net {
16
17 constexpr size_t HighWaterMark = 64 * 1024 * 1024;
18
20 EventLoop* ioLoop ,
21 const std::string& name ,
22 SOCKET socketFd ,
23 const InetAddress& localAddress ,
24 const InetAddress& peerAddress
25 )
26 : m_IoLoop(ioLoop)
27 , m_Name(name)
31 {
33
34 m_Socket = std::make_unique<Socket>(socketFd);
35 m_Socket->SetKeepAlive(true);
36
37 m_Channel = std::make_unique<Channel>(socketFd, m_IoLoop);
38 m_Channel->SetReadCallback ([=]() { HandleRead(); });
39 m_Channel->SetWriteCallback([=]() { HandleWrite(); });
40 m_Channel->SetCloseCallback([=]() { HandleClose(); });
41 m_Channel->SetErrorCallback([=]() { HandleError(); });
42 }
43
45 {
46 SPICES_CORE_INFO("TcpConnection disconnection")
47 }
48
49 void TcpConnection::Send(const std::string& buffer)
50 {
51 if (m_State.load() == State::Connected)
52 {
54 {
55 SendInLoop(buffer.c_str(), buffer.size());
56 }
57 else
58 {
59 m_IoLoop->RunInLoop([=]() { SendInLoop(buffer.c_str(), buffer.size()); });
60 }
61 }
62 }
63
65 {
66 if (m_State == State::Connected)
67 {
69
70 m_IoLoop->RunInLoop([=]() { ShutDownInLoop(); });
71 }
72 }
73
75 {
77 m_Channel->Tie(shared_from_this());
78 m_Channel->EnableReading();
79
80 m_ConnectionCallback.Broadcast(shared_from_this());
81 }
82
84 {
85 if (m_State.load() == State::Connected)
86 {
88
89 m_Channel->DisableAll();
90 m_CloseCallback.Broadcast(shared_from_this());
91 }
92 m_Channel->Remove();
93 }
94
96 {
97 int saveErrno = 0;
98 size_t n = m_InputBuffer.ReadFd(m_Channel->Fd(), &saveErrno);
99 if (n > 0)
100 {
101 m_MessageCallback.Broadcast(shared_from_this(), &m_InputBuffer);
102 }
103 else if (n == 0)
104 {
106 }
107 else
108 {
109 SPICES_CORE_ERROR("TcpConnection::HandleRead Error")
111 }
112 }
113
115 {
116 if (m_Channel->IsWriting())
117 {
118 int saveErrno = 0;
119 size_t n = m_OutputBuffer.WriteFd(m_Channel->Fd(), &saveErrno);
120 if (n > 0)
121 {
122 m_OutputBuffer.Retrieve(n);
123 if (m_OutputBuffer.ReadableBytes() == 0)
124 {
125 m_Channel->DisableWriting();
126 if (!m_WriteCompleteCallback.empty())
127 {
128 m_IoLoop->QueueInLoop([=]() { m_WriteCompleteCallback.Broadcast(shared_from_this()); });
129 }
130 if (m_State.load() == State::Disconnecting)
131 {
133 }
134 }
135 }
136 else
137 {
138 SPICES_CORE_ERROR("TcpConnection::HandleWrite Error")
139 }
140 }
141 else
142 {
143 SPICES_CORE_ERROR("TcpConnection::HandleWrite Error")
144 }
145 }
146
148 {
150 m_Channel->DisableAll();
151
152 const TcpConnectionPtr connectionPtr = shared_from_this();
153 m_ConnectionCallback.Broadcast(connectionPtr);
154 m_CloseCallback.Broadcast(connectionPtr);
155 }
156
158 {
159 char optVal;
160 socklen_t optLen = sizeof(optVal);
161 int err = 0;
162 if (::getsockopt(m_Channel->Fd(), SOL_SOCKET, SO_ERROR, &optVal, &optLen) < 0)
163 {
164 err = WSAGetLastError();
165 }
166 else
167 {
168 err = optVal;
169 }
170
171 std::stringstream ss;
172 ss << "TcpConnection::HandleError SO_ERROR: " << err;
173
174 SPICES_CORE_ERROR(ss.str())
175 }
176
177 void TcpConnection::SendInLoop(const char* message, size_t len)
178 {
179 int nWrote = 0;
180 size_t remaining = len;
181 bool faultError = false;
182
183 /**
184 * @brief ShutDown has been called before.
185 */
186 if (m_State.load() == State::Disconnected)
187 {
188 SPICES_CORE_ERROR("Disconnected, Can bot send message")
189 return;
190 }
191
192 /**
193 * @brief First writing.
194 */
195 if (!m_Channel->IsWriting() && m_OutputBuffer.ReadableBytes() == 0)
196 {
197 nWrote = ::send(m_Channel->Fd(), message, len, 0);
198 if (nWrote >= 0)
199 {
200 remaining = len - nWrote;
201 if (remaining == 0 && m_WriteCompleteCallback.size() > 0)
202 {
203 m_IoLoop->QueueInLoop([=]() { m_WriteCompleteCallback.Broadcast(shared_from_this()); });
204 }
205 }
206 else
207 {
208 nWrote = 0;
209 const int err = WSAGetLastError();
210
211 if (err != WSAEWOULDBLOCK)
212 {
213 SPICES_CORE_ERROR(" TcpConnection::SendInLoop")
214
215 if (err == WSAECONNRESET || err == WSAECONNRESET)
216 {
217 faultError = true;
218 }
219 }
220 }
221 }
222
223 /**
224 * @brief Send message separatly.
225 */
226 if (!faultError && remaining > 0)
227 {
228 const size_t oldLen = m_OutputBuffer.ReadableBytes();
229 if (oldLen + remaining >= HighWaterMark && oldLen < HighWaterMark && m_HighWaterMarkCallback.size() > 0)
230 {
231 m_IoLoop->QueueInLoop([=]() { m_HighWaterMarkCallback.Broadcast(shared_from_this(), oldLen + remaining); });
232 }
233 m_OutputBuffer.Append((char*)message + nWrote, remaining);
234 if (!m_Channel->IsWriting())
235 {
236 m_Channel->EnableWriting();
237 }
238 }
239 }
240
242 {
243 if (!m_Channel->IsWriting())
244 {
245 m_Socket->ShutDownWrite();
246 }
247 }
248}
249
250}
#define SPICES_PROFILE_ZONE
bool IsInLoopThread() const
Determine if current thread is in eventloop thread.
Definition EventLoop.h:109
Wrapper of Poller and wakeup socket to acceptor(SubLoop).
Definition EventLoop.h:28
This class is Wrapper of current socket address.
Definition InetAddress.h:22
void HandleWrite()
Handle Write event.
std::string m_Name
TcpConnection name.
void HandleClose()
Handle Close event.
TcpConnection(EventLoop *ioLoop, const std::string &name, SOCKET socketFd, const InetAddress &localAddress, const InetAddress &peerAddress)
Constructor Function.
void HandleRead()
Handle Read event.
void HandleError() const
Handle Error event.
void SendInLoop(const char *message, size_t len)
Send message to socket.
void SetState(State state)
void Send(const std::string &buffer)
Send message to socket.
void ShutDown()
ShutDown socket.
EventLoop * m_IoLoop
io Loop from TcpServer::NewConnection.
virtual ~TcpConnection()
Destructor Function.
void ShutDownInLoop() const
ShutDown socket.
Combine of Socket Connection data.
constexpr size_t HighWaterMark