muduo网络库学习之EventLoop(三):Socket、Acceptor、TcpServer、TcpConnection(连接建立,接收消息)

1、Socket 操作封装

Endian.h

封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中)。

SocketsOps.h/ SocketsOps.cc

封装了socket相关系统调用(全局函数,位于muduo::net::sockets名称空间中)。

Socket.h/Socket.cc(Socket类)

用RAII方法封装socket file descriptor

InetAddress.h/InetAddress.cc(InetAddress类)

网际地址sockaddr_in封装

2、Acceptor

Acceptor用于accept(2)接受TCP连接

Acceptor的数据成员包括acceptSocket_、acceptChannel_,Acceptor的acceptSocket_是listening socket(即server socket)。

acceptChannel_用于观察acceptSocket_的readable事件,可读事件发生,Channel::handleEvent()中回调Acceptor::handleRead(),

后者调用accept(2)来接受新连接,并回调用户callback,注意callback 中传入的第一个参数是accept返回的connfd。

void Acceptor::handleRead()
{
    loop_->assertInLoopThread();
    InetAddress peerAddr(0);
    //FIXME loop until no more
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // string hostport = peerAddr.toIpPort();
        // LOG_TRACE << "Accepts of " << hostport;
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr);
        }
        else
        {
            sockets::close(connfd);
        }
    }
    else
    {
        // Read the section named "The special problem of
        // accept()ing when you can't" in libev's doc.
        // By Marc Lehmann, author of libev.
        if (errno == EMFILE)
        {
            ::close(idleFd_);
            idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
            ::close(idleFd_);
            idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
        }
    }
}

在构造函数中:

acceptChannel_.setReadCallback(
    boost::bind(&Acceptor::handleRead, this));

设置用户回调函数:

// 传入connfd
typedef boost::function < void (int sockfd,
                                const InetAddress &) > NewConnectionCallback;

void setNewConnectionCallback(const NewConnectionCallback &cb)
{
    newConnectionCallback_ = cb;
}

开始监听:

void Acceptor::listen()
{
    loop_->assertInLoopThread();
    listenning_ = true;
    acceptSocket_.listen();
    acceptChannel_.enableReading();
}

测试代码:

simba@ubuntu:~$ telnet 127.0.0.1 8888

#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void newConnection(int sockfd, const InetAddress &peerAddr)
{
    printf("newConnection(): accepted a new connection from %s\n",
           peerAddr.toIpPort().c_str());
    ::write(sockfd, "How are you?\n", 13);
    sockets::close(sockfd);
}

int main()
{
    printf("main(): pid = %d\n", getpid());

    InetAddress listenAddr(8888);
    EventLoop loop;

    Acceptor acceptor(&loop, listenAddr);
    acceptor.setNewConnectionCallback(newConnection);
    acceptor.listen();

    loop.loop();
}

使用telnet 连接服务器,服务器输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test07 20131108 07:22:30.560145Z  3960 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 main(): pid = 3960 20131108 07:22:30.675116Z  3960 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 07:22:30.675684Z  3960 TRACE EventLoop EventLoop created 0xBFED7324 in thread 3960 - EventLoop.cc:76 20131108 07:22:30.676073Z  3960 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 07:22:30.676577Z  3960 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104 20131108 07:22:30.676988Z  3960 TRACE loop EventLoop 0xBFED7324 start looping - EventLoop.cc:108 20131108 07:22:40.687957Z  3960 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 07:22:41.606525Z  3960 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 07:22:41.607053Z  3960 TRACE printActiveChannels {6: IN}  - EventLoop.cc:271 newConnection(): accepted a new connection from 127.0.0.1:56409 20131108 07:22:51.617500Z  3960 TRACE poll  nothing happended - EPollPoller.cc:74

telnet 端输出如下:

simba@ubuntu:~$ telnet 127.0.0.1 8888 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. How are you? Connection closed by foreign host. simba@ubuntu:~$ 

从输出可以看出,acceptSocket_.sockfd_ = 6,客户端连接上来,监听套接字发生可读事件,调用accept() 接收连接后调用用户回调函数newConnection()。

3、TcpServer/TcpConnection

Acceptor类的主要功能是socket、bind、listen

一般来说,在上层应用程序中,我们不直接使用Acceptor,而是把它作为TcpServer的成员

boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor

TcpServer还包含了一个TcpConnection列表

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
ConnectionMap connections_; // 连接列表

此外,还有一个IO线程池对象和一个acceptor Eventloop*, 通过setThreadNum()设置IO线程池的线程个数(不包括main Reactor)

关于EventLoopThread, EventLoopThreadPool 类参见这里

boost::scoped_ptr<EventLoopThreadPool> threadPool_;
EventLoop* loop_;  // the acceptor loop
void TcpServer::setThreadNum(int numThreads)
{
    assert(0 <= numThreads);
    threadPool_->setThreadNum(numThreads);
}

TcpConnection与Acceptor类似,有两个重要的数据成员,Socket(connfd)与Channel

boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;

时序图分析:

在TcpServer 构造函数中先初始化acceptor_成员,acceptor_(new Acceptor(loop, listenAddr)),在构造函数体内:

// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
    boost::bind(&TcpServer::newConnection, this, _1, _2));

调用TcpServer::start(),开始Acceptor::listen(), 已连接队列不为空,TcpServer::acceptor_.acceptChannel_ 可读,poll返回,调用

Channel::handleEvent()处理活动通道,调用Acceptor::handleRead(),函数中调用accept(2)来接受新连接,并回调TcpServer::newConnection(), 函数中先创建一个TcpConnectionPtr 对象,在TcpConnection 构造函数体中:

// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
    boost::bind(&TcpConnection::handleRead, this, _1));

添加进TcpServer::connections_, 设置连接回调函数和消息到来回调函数,如下:

//传入connfd
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    ......
    TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));

    connections_[connName] = conn; // conn 是TcpConnectionPtr 对象
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->connectEstablished();
}

最后调用TcpConnection::connectEstablished()

void TcpConnection::connectEstablished()
{
    channel_->enableReading();  // TcpConnection所对应的通道加入到Poller关注

    connectionCallback_(shared_from_this());
}

现在已经建立了一个新连接,对等方发送数据到connfd,内核接收缓冲区不为空,TcpConnection::channel_ 可读事件发生,poll返回,调用Channel::handleEvent()处理活动通道,调用TcpConnection::handleRead()

void TcpConnection::handleRead(Timestamp receiveTime)
{
    ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
    messageCallback_(shared_from_this(), buf, n);
}

shared_from_this()  会用当前对象的裸指针构造一个临时智能指针对象,引用计数加1,但马上会被析构,又减1,故无论调用多少次,对引用计数都没有影响。

测试程序:

simba@ubuntu:~$ telnet 127.0.0.1 8888

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void onConnection(const TcpConnectionPtr &conn)
{
    if (conn->connected())
    {
        printf("onConnection(): new connection [%s] from %s\n",
               conn->name().c_str(),
               conn->peerAddress().toIpPort().c_str());
    }
    else
    {
        printf("onConnection(): connection [%s] is down\n",
               conn->name().c_str());
    }
}

void onMessage(const TcpConnectionPtr &conn,
               const char *data,
               ssize_t len)
{
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
}

int main()
{
    printf("main(): pid = %d\n", getpid());

    InetAddress listenAddr(8888);
    EventLoop loop;

    TcpServer server(&loop, listenAddr, "TestServer");
    server.setConnectionCallback(onConnection);
    server.setMessageCallback(onMessage);
    server.start();

    loop.loop();
}

同样地,使用telnet 去连接,服务器端输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test08 main(): pid = 7557 20131108 09:37:51.098888Z  7557 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 09:37:51.099825Z  7557 TRACE EventLoop EventLoop created 0xBFAD3D08 in thread 7557 - EventLoop.cc:62 20131108 09:37:51.100692Z  7557 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 09:37:51.101548Z  7557 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104 20131108 09:37:51.102063Z  7557 TRACE loop EventLoop 0xBFAD3D08 start looping - EventLoop.cc:94 20131108 09:38:01.116672Z  7557 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 09:38:10.616161Z  7557 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 09:38:10.616774Z  7557 TRACE printActiveChannels {6: IN}  - EventLoop.cc:257 20131108 09:38:10.616894Z  7557 INFO  TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410 - TcpServer.cc:93 20131108 09:38:10.617007Z  7557 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x827D7F8 fd=8 - TcpConnection.cc:62 20131108 09:38:10.617103Z  7557 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 09:38:10.617152Z  7557 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 09:38:10.617166Z  7557 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:78 20131108 09:38:10.617174Z  7557 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56410 20131108 09:38:10.617266Z  7557 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:83 20131108 09:38:10.617275Z  7557 TRACE newConnection [5] usecount=2 - TcpServer.cc:122 20131108 09:38:20.627567Z  7557 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 09:38:30.638037Z  7557 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 09:38:40.648523Z  7557 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 09:38:46.891543Z  7557 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 09:38:46.891599Z  7557 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257 20131108 09:38:46.891611Z  7557 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1] 20131108 09:38:46.891744Z  7557 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 09:38:56.901306Z  7557 TRACE poll  nothing happended - EPollPoller.cc:74

可以看到,fd = 6 是监听套接字,fd = 8是返回来的已连接套接字,那么fd = 7去哪了呢?其实是被acceptor的 idleFd_ 占据了。

连接建立的时候回调onConnection(),我们在telnet 上输入aaaa,服务器端消息到来,fd=8可读事件发生,回调onMessage(),加上\r\n 所以收到6个字节数据。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏钟绍威的专栏

内存屏障保证缓存一致性优化

 在前面内存系统重排序提到,“写缓存没有及时刷新到内存,导致不同处理器缓存的值不一样”,出现这种情况是糟糕的,所幸处理器遵循缓存一致性协议能够保证足够的可见性又...

2209
来自专栏How things work?

30分钟了解同步,异步,阻塞,非阻塞

同步,异步,阻塞,非阻塞,在一些场景下,它们是同一个概念的不同名字;在另一些场景下,它们是不同的概念。

1.2K15
来自专栏JAVA技术站

设计模式学习之-生产者与消费者模式 原

别人说的几句废话,拿来充充场面,哈哈,Java 5之前实现同步存取时,可以使用普通的一个集合,然后在使用线程的协作和线程同步可以实现生产者,消费者模式,主要的...

892
来自专栏java一日一条

支持生产阻塞的线程池

在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细...

481
来自专栏开发与安全

muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类

1、EventLoopThread(IO线程类) 任何一个线程,只要创建并运行了EventLoop,都称之为IO线程 IO线程不一定是主线程 muduo并发模型...

2125
来自专栏linux驱动个人学习

Linux内核线程kernel thread详解--Linux进程的管理与调度(十)

Linux内核可以看作一个服务进程(管理软硬件资源,响应用户进程的种种合理以及不合理的请求)。

815
来自专栏大闲人柴毛毛

Java并发编程的艺术(一)——并发编程需要注意的问题

并发是为了提升程序的执行速度,但并不是多线程一定比单线程高效,而且并发编程容易出错。若要实现正确且高效的并发,就要在开发过程中时刻注意以下三个问题: 上下文切换...

2855
来自专栏北京马哥教育

最全服务器模型详解——从单线程阻塞到多线程非阻塞

前言的前言 服务器模型涉及到线程模式和IO模式,搞清楚这些就能针对各种场景有的放矢。该系列分成三部分: 单线程/多线程阻塞I/O模型 单线程非阻塞I/O模型...

3275
来自专栏C/C++基础

进程与线程的区别

在开发工作中,尤其是对负载较大的服务端程序的开发,为充分发挥处理器多核性能,提高硬件资源利用率,增加系统吞吐量,少不了并发编程。并发编程一般通过多进程和多线程的...

663
来自专栏微服务生态

支持生产阻塞的线程池

我们使用线程池的时候,经常使用默认的丢弃策略,那么我们也可以自定义策略,那么下面的文章可以看下。

401

扫码关注云+社区