前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >缩略muduo库(9):TcpServer

缩略muduo库(9):TcpServer

作者头像
看、未来
发布2021-10-09 14:15:54
3120
发布2021-10-09 14:15:54
举报

准备讲解了,这里就直接放代码吧。

代码语言:javascript
复制
#pragma once

#include "EventLoop.hpp"
#include "Accept.hpp"
#include "EventLoopThreadPool.hpp"
#include "InetAddr.hpp"
#include "nocopyable.hpp"
#include "callback.hpp"

#include <string>
#include <functional>
#include <atomic>
#include <unordered_map>
#include <memory>


class TcpServer : nocpoyable
{
public:
    //using ThreadInitCallback = std::function<void(EventLoop *)>;

    enum Option
    {
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop* loop,
        const InetAddress& listenAddr,
        const std::string& nameArg,
        Option option = kNoReusePort);
    ~TcpServer(); // force out-line dtor, for std::unique_ptr members.

    const std::string& ipPort() const { return ipport_; }
    const std::string& name() const { return name_; }
    EventLoop* getLoop() const { return loop_; }

    //设置底层subloop个数
    void setThreadNum(int numThreads);

    //开启服务器监听
    void start();

    void setThreadInitCallback(const ThreadInitCallback& cb)
    {
        threadInitCallback_ = cb;
    }

    void setConnectionCallback(const ConnectionCallback& cb)
    {
        connectionCallback_ = cb;
    }

    void setMessageCallback(const MessageCallback& cb)
    {
        messageCallback_ = cb;
    }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {
        writeCompleteCallback_ = cb;
    }

private:
    /// Not thread safe, but in loop
    void newConnection(int sockfd, const InetAddress& peerAddr);
    /// Thread safe.
    void removeConnection(const TcpConnectionptr& conn);
    /// Not thread safe, but in loop
    void removeConnectionInLoop(const TcpConnectionptr& conn);

    using ConnectionMap = std::unordered_map<std::string, TcpConnectionptr>;

    EventLoop* loop_; //baseloop
    const std::string name_;
    const std::string ipport_;
    std::unique_ptr<Accept> acceptor_;
    std::shared_ptr<EventLoopThreadPool> threadpool_;

    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    ThreadInitCallback threadInitCallback_;

    std::atomic_int started_;

    int nextConnId_;
    ConnectionMap connections_;
};
代码语言:javascript
复制
#include "TcpServer.hpp"
#include "Logger.hpp"
#include "TcpConnection.hpp"

#include <functional>
#include <string.h>

EventLoop* CheckLoopNotNull(EventLoop* loop) {
    if (loop == nullptr) {
        LOG_FATAL("%s:%s:%d mainloop is null \n", __FILE__, __FUNCTION__, __LINE__);
    }

    return loop;
}

TcpServer::TcpServer(EventLoop* loop,
    const InetAddress& listenAddr,
    const std::string& nameArg,
    Option option = kNoReusePort)
    :loop_(CheckLoopNotNull(loop)),
    ipport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Accept(loop, listenAddr, option == kReusePort)),
    threadpool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(),
    messageCallback_(),
    nextConnId_(1)
{
    //当有新用户连接时,会执行NewConnectionCallback
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}


//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
    //根据轮询算法,选择一个subloop,唤醒subloop
    EventLoop* ioloop = threadpool_->GetNextLoop();
    char buf[64] = { 0 };
    snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    //通过sockfd获取其本机IP
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;

    if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {

    }
    InetAddress localAddr(::getLocalAddr(sockfd));

    //根据连接成功的fd,创建TCPConnection连接对象
    TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
    
    ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    //把当前connfd封装成channel分发给subloop

}

TcpServer::~TcpServer() {}


//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
    threadpool_->setThreadNum(numThreads);
}

//开启服务器监听
void TcpServer::start() {
    if (started_ == 0) {  //防止被多次start
        threadpool_->start(threadInitCallback_);
        loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));
        ++started_;
    }
}

void TcpServer::removeConnection(const TcpConnectionptr& conn) {
    loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

/// Not thread safe, but in loop
void TcpServer::removeConnectionInLoop(const TcpConnectionptr& conn) {
    connections_.erase(conn->name());
   
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档