首先在EventLoop(五)基础上,在TcpConnection 构造函数中添加:
// 通道可写事件到来的时候,回调TcpConnection::handleWrite
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
多了两个应用层缓冲区成员:
Buffer inputBuffer_; // 应用层接收缓冲区
Buffer outputBuffer_; // 应用层发送缓冲区
在 TcpServer::newConnection() 中再添加:
conn->setWriteCompleteCallback(writeCompleteCallback_);
将TcpConnection::handleRead() 修改为:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
现在当某个TcpConnection 发生可读事件,调用TcpConnection::handleRead() , 先调用inputBuffer_.readFd()
将内核接收缓冲区数据读取到inputBuffer_ 中,接着调用messageCallback_ , 用户代码可以按消息界限从
inputBuffer_ 中读取数据。
用户代码想要发送数据时,调用TcpConnection::send() ,重载了3个版本,都是线程安全的,内部最终都是调用TcpConnection::sendInLoop()(如果不是在当前IO线程调用send 时,sendInLoop 会在当前IO线程处理doPendingFunctors 时被调用)
void TcpConnection::sendInLoop(const void *data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool error = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
// 通道没有关注可写事件并且应用层发送缓冲区没有数据,直接write
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 写完了,回调writeCompleteCallback_
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE) // FIXME: any others?
{
error = true;
}
}
}
}
assert(remaining <= len);
// 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中)
if (!error && remaining > 0)
{
LOG_TRACE << "I am going to write more data";
size_t oldLen = outputBuffer_.readableBytes();
// 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char *>(data) + nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting(); // 关注POLLOUT事件
}
}
}
即首先尝试write入内核发送缓冲区,如果内核发送缓冲区满则将未写完的数据添加到outputBuffer_ 中(注意,只要第一次没写完,
下次调用send 也会将数据添加到outputBuffer_ 的末尾而不直接write),并关注POLLOUT 事件,当内核发送缓冲区不为满,即发生
可写事件,调用TcpConnection::handleWrite()
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 应用层发送缓冲区已清空
{
channel_->disableWriting(); // 停止关注POLLOUT事件,以免出现busy loop
if (writeCompleteCallback_) // 回调writeCompleteCallback_
{
// 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) // 应用层发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
{
shutdownInLoop(); // 关闭连接
}
}
else
{
LOG_TRACE << "I am going to write more data";
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
即从outputBuffer_ 中取出数据写入内核发送缓冲区,当然也许此次并不能完全写入,但只要应用层发送缓冲区不为空,就一直关注
POLLOUT事件,当内核发送缓冲区不为满时触发再次写入。
如果output buffer 里还有待发送的数据,而程序又想关闭连接(对程序而言,调用TcpConnection::send() 之后他就认为数据迟早会发出去),那么这时候网络库不能立刻关闭连接,而要等数据发送完毕,而Muduo TcpConnection 没有提供close,而只提供shutdown ,这么做是为了收发数据的完整性。如下所示
void DaytimeServer::onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
conn->send(Timestamp::now().toFormattedString() + ”\n”);
conn->shutdown(); // 调用TcpConnection::shutdown()
}
}
void TcpConnection::shutdown()
{
if (state_ == kConnected)
{
setState(kDisconnecting);
// 调用TcpConnection::shutdownInLoop()
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite(); // 调用Socket::shutdownWrite()
}
}
void Socket::shutdownWrite()
{
sockets::shutdownWrite(sockfd_);
}
void sockets::shutdownWrite(int sockfd)
{
int ret = ::shutdown(sockfd, SHUT_WR);
// 检查错误
}
此时如果应用层缓冲区数据还没发完,即还在关注POLLOUT事件,那么shutdown() 中只是先设置state_ = kDisconnecting; 而 shutdownInLoop() 中判断 isWriting() 为true, 故不会执行shutdownWrite(),回顾handleWrite() 函数,当应用层缓冲区数据发完,判断状态为kDisconnecting 而且已经disableWriting(),就继续调用
shutdownInLoop() ,此时就会真正关闭写的这一端。
用shutdown 而不用close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么muduo 不会漏收这些数据。我们发完了数据,于是shutdownWrite,发送TCP FIN 分节,对方会读到0 字节,然后对方通常会关闭连接(无论shutdownWrite() 还是close()),可读事件发生调用handleRead(),这样muduo 会读到0 字节,调用handleClose(),进而调用connectionCallback_, 这样客户代码就知道对方断开连接了(判断是否connected()),最后调用closeCallback_ (TcpServer::removeConnection())。
那么muduo 什么时候真正close socket 呢?在TcpConnection 对象析构的时候。TcpConnection 持有一个Socket 对象,Socket 是一个RAII handler,它的析构函数会close(sockfd_)。TcpConnection 对象生存期参考
测试代码:
客户端 nc 127.0.0.1 8888
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop *loop,
const InetAddress &listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
message1_.resize(100);
message2_.resize(200);
std::fill(message1_.begin(), message1_.end(), 'A');
std::fill(message2_.begin(), message2_.end(), 'B');
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
conn->send(message1_);
conn->send(message2_);
conn->shutdown();
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp receiveTime)
{
muduo::string msg(buf->retrieveAllAsString());
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
msg.size(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
conn->send(msg);
}
EventLoop *loop_;
TcpServer server_;
muduo::string message1_;
muduo::string message2_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
执行结果如下:
simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test12
20131110 04:47:24.913096Z 2330 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51
main(): pid = 2330
20131110 04:47:24.916700Z 2330 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131110 04:47:24.917170Z 2330 TRACE EventLoop EventLoop created 0xBFCB2CE4 in thread 2330 - EventLoop.cc:76
20131110 04:47:24.917487Z 2330 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131110 04:47:24.918344Z 2330 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131110 04:47:24.918942Z 2330 TRACE loop EventLoop 0xBFCB2CE4 start looping - EventLoop.cc:108
20131110 04:47:26.868111Z 2330 TRACE poll 1 events happended - EPollPoller.cc:65
20131110 04:47:26.868584Z 2330 TRACE printActiveChannels {6: IN } - EventLoop.cc:271
20131110 04:47:26.868688Z 2330 INFO TcpServer::newConnection TestServer - new connection TestServer:0.0.0.0:8888#1 from 127.0.0.1:54898 - TcpServer.cc:93
20131110 04:47:26.868831Z 2330 DEBUG TcpConnection TcpConnection::ctorTestServer:0.0.0.0:8888#1 at 0x84AE920 fd=8 - TcpConnection.cc:65
20131110 04:47:26.868847Z 2330 TRACE newConnection 1 usecount=1 - TcpServer.cc:111
20131110 04:47:26.868894Z 2330 TRACE newConnection 2 usecount=2 - TcpServer.cc:113
20131110 04:47:26.868931Z 2330 TRACE connectEstablished 3 usecount=6 - TcpConnection.cc:238
20131110 04:47:26.868941Z 2330 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection TestServer:0.0.0.0:8888#1 from 127.0.0.1:54898
20131110 04:47:26.869098Z 2330 TRACE connectEstablished 4 usecount=6 - TcpConnection.cc:243
20131110 04:47:26.869109Z 2330 TRACE newConnection 5 usecount=2 - TcpServer.cc:123
20131110 04:47:26.869800Z 2330 TRACE poll 1 events happended - EPollPoller.cc:65
20131110 04:47:26.869831Z 2330 TRACE printActiveChannels {8: IN HUP } - EventLoop.cc:271
20131110 04:47:26.869841Z 2330 TRACE handleEvent 6 usecount=2 - Channel.cc:67
20131110 04:47:26.869899Z 2330 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:369
20131110 04:47:26.869909Z 2330 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection TestServer:0.0.0.0:8888#1 is down
20131110 04:47:26.869925Z 2330 TRACE handleClose 7 usecount=3 - TcpConnection.cc:377
20131110 04:47:26.869935Z 2330 INFO TcpServer::removeConnectionInLoop TestServer - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:154
20131110 04:47:26.869943Z 2330 TRACE removeConnectionInLoop 8 usecount=6 - TcpServer.cc:158
20131110 04:47:26.869978Z 2330 TRACE removeConnectionInLoop 9 usecount=5 - TcpServer.cc:160
20131110 04:47:26.869992Z 2330 TRACE removeConnectionInLoop 10 usecount=6 - TcpServer.cc:171
20131110 04:47:26.870000Z 2330 TRACE handleClose 11 usecount=3 - TcpConnection.cc:380
20131110 04:47:26.870007Z 2330 TRACE handleEvent 12 usecount=2 - Channel.cc:69
20131110 04:47:26.870015Z 2330 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20131110 04:47:26.870053Z 2330 DEBUG ~TcpConnection TcpConnection::dtorTestServer:0.0.0.0:8888#1 at 0x84AE920 fd=8 - TcpConnection.cc:72
20131110 04:47:36.880508Z 2330 TRACE poll nothing happended - EPollPoller.cc:74
程序中一旦连接建立,调用onConnection(),send(message1), send(message2),然后立马shutdown()。由前面分析可知会一直等到outputBuffer_ 数据全部写到内核发送缓冲区才会真正关闭写端,客户端读到数据后最后read 返回0,客户端close导致服务端最终removeConnection。可以看到在handleEvent()处理完毕后TcpConnection 才会析构,对照 EventLoop(五)可以理解。
WriteCompleteCallback_ & highWaterMarkCallback_:
如果我们会向一个连接发送send()大流量的数据,发送频率不能太快,因为如果对等方接收不及时,则内核发送缓冲区会堆积数据,根据前面的分析,我们会将数据添加到outputBuffer_,导致outputBuffer_ 增长太快,对此可以关注WriteCompleteCallback_ ,当它被调用时表示outputBuffer_ 已经被清空,此时再次send(),否则outputBuffer_ 可能一直增长直到撑爆。
从这个角度看,可以把WriteCompleteCallback_ 当作是“低水位标”回调函数,相应地,highWaterMarkCallback_ 可以当作是”高水位标“ 回调函数,即如果对等方接收不及时,outputBuffer_ 会一直增大,当增长到highWaterMark_ (具体数值)时,回调highWaterMarkCallback_ 函数,很可能在函数内主动shutdown。
TcpConnection 中 boost::any context_; // 绑定一个未知类型的上下文对象比如HttpContext
可变类型解决方案
void*. 这种方法不是类型安全的 boost::any 任意类型的类型安全存储以及安全的取回 在标准库容器中存放不同类型的方法,比如说vector<boost::any>
下面的程序会不断地发送不同的字符数据,类似chargen 协议(DDos):
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop *loop,
const InetAddress &listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
server_.setWriteCompleteCallback(
boost::bind(&TestServer::onWriteComplete, this, _1));
// 生成数据
string line;
for (int i = 33; i < 127; ++i)
{
line.push_back(char(i));
}
line += line;
for (size_t i = 0; i < 127 - 33; ++i)
{
message_ += line.substr(i, 72) + '\n';
}
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
conn->setTcpNoDelay(true);
conn->send(message_);
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp receiveTime)
{
muduo::string msg(buf->retrieveAllAsString());
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
msg.size(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
conn->send(msg);
}
void onWriteComplete(const TcpConnectionPtr &conn)
{
conn->send(message_);
}
EventLoop *loop_;
TcpServer server_;
muduo::string message_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
程序中一旦连接建立就开始send,当outputBuffer_ 数据全部拷贝到内核发送缓冲区后,回调OnWriteComplete(), 继续send,类似大流量的ddos攻击。客户端 nc 127.0.0.1 8888 > aa 运行后立马ctrl+c 掉,但此时aa文件已经是很大的了,文件的内容部分如下:
simba@ubuntu:~$ ls -lh aa
-rw-rw-r-- 1 simba simba 28M Nov 9 21:01 aa
ABCDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*
BCDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+
CDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,
DEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-
EFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-.
FGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./
GHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0
HIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./01
IJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./012
JKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123
KLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./01234
LMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./012345
MNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456
NOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./01234567
OPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./012345678
PQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789
QRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:
RSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;
STUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<
TUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=
UVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>
VWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?
WXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@
XYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@A
YZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@AB
Z[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABC
[]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCD
]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDE
]^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEF
^_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFG
_`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGH
`abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHI
abcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ
bcdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJK
cdefghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKL
defghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLM
efghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMN
fghijklmnopqrstuvwxyz{|}~!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNO
参考:
《UNP》
muduo manual.pdf
《linux 多线程服务器编程:使用muduo c++网络库》