前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >缩略muduo库(8):TcpConnection

缩略muduo库(8):TcpConnection

作者头像
看、未来
发布2021-10-09 15:33:51
2680
发布2021-10-09 15:33:51
举报

最近VScode坏了,莫名其妙连不上虚拟机了,很难受。 已经判定不是Linux的问题,因为用cmd可以远程连接上。 所以这份就用VS先顶一下了,报了一堆的错也看不清楚。


文章目录

CallBack.hpp

存放一些回调声明。

代码语言:javascript
复制
#pragma once

#include <functional>

class Buffer;
class TcpConnection;
class timestamp;

using TcpConnectionptr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void (const TcpConnectionptr&)>;
using CloseCallback = std::function<void (const TcpConnectionptr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionptr&)>;
using MessageCallback =  std::function<void (const TcpConnectionptr&,Buffer*,timestamp)>;
using ThreadInitCallback = std::function<void(EventLoop*)>;

TcpConnection.hpp

代码语言:javascript
复制
#pragma once

#include "nocopyable.hpp"
#include "InetAddr.hpp"
#include "callback.hpp"
#include "buffer.hpp"
#include "timestamp.hpp"

#include <memory>
#include <string>
#include <atomic>

class Channel;
class EventLoop;
class Socket;

//TcpServer 通过 Acceptor,当有一个新的accept函数拿到connfd,TCPConnection设置回调,转channel,进poller,调用channel的回调操作
class TcpConnection :public nocpoyable, public std::enable_shared_from_this<TcpConnection>
{ //得到当前对象的智能指针
public:
    using TcpConnectionPtr = std::shared_ptr<TcpConnection>;

    TcpConnection(EventLoop* loop,
        const std::string& name,
        int sockfd,
        const InetAddress& localaddr,
        const InetAddress& peeraddr);

    ~TcpConnection();

    EventLoop* getLoop() const { return loop_; }
    const std::string& name() const { return name_; }
    const InetAddress& localAddress() const { return localaddr_; }
    const InetAddress& peerAddress() const { return peeraddr_; }

    bool connected() const { return state_ == kConnected; }

    void send(const std::string &buf);
    void shutdown();

    void setConnectionCallback(const ConnectionCallback& cb)
    {
        connectionCallback_ = cb;
    }

    void setMessageCallback(const MessageCallback& cb)
    {
        messageCallback_ = cb;
    }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {
        writeCompleteCallback_ = cb;
    }

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
    {
        highWaterMarkCallback_ = cb;
        highWaterMark_ = highWaterMark;
    }

    void connectEstablished();
    void connectDestroyed();


private:
    enum State
    {
        kDisconnected,
        kConnecting,
        kConnected,
        kDisconnecting
    };

    void setState(State s) { state_ = s; }

    void handleRead(timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

    void sendInLoop(const void* message, size_t len);
    void shutdownInLoop();

    EventLoop* loop_; //存在于subloop中
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localaddr_;
    const InetAddress peeraddr_;

    Buffer inputBuffer_;
    Buffer outputBuffer_;
    size_t highWaterMark_;

    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    ThreadInitCallback threadInitCallback_;
    HighWaterMarkCallback highWaterMarkCallback_;
    CloseCallback closeCallback_;
};

TcpConnection.cc

代码语言:javascript
复制
#include "logger.hpp"
#include "tcpconnection.hpp"
#include "eventloop.hpp"
#include "channel.hpp"
#include "socket.hpp"
#include "eventloop.hpp"

#include <functional>
#include <errno.h>

static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d mainloop is null!\n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpConnection::TcpConnection(EventLoop* loop,
    const std::string& nameArg,
    int sockfd,
    const InetAddress& localaddr,
    const InetAddress& peeraddr)
    : loop_(CheckLoopNotNull(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localaddr_(localaddr),
    peeraddr_(peeraddr),
    highWaterMark_(64 * 1024 * 1024)
{
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite, this));
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose, this));
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError, this));
    LOG_DEBUG("TcpConnection::ctor[%s] at %p fd= %d \n", name_, this, sockfd);

    //socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{}


void TcpConnection::handleRead(timestamp receiveTime)
{
    int saveerrno = 0;
    ssize_t n = inputBuffer_.readFD(channel_->fd(), &saveerrno);

    if (n > 0)
    {
        messageCallback_(std::shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0)
    {
        errno = saveerrno;
        LOG_ERROR("TcpConnection::handleRead\n");
        handleError();
    }
}

void TcpConnection::handleWrite()
{
    if (channel_->isWriting())
    {
        int saveErrno = 0;
        ssize_t n = outputBuffer_.writeFD(channel_->fd(), &saveErrno);

        if (n > 0)
        {
            outputBuffer_.retrieve(n);

            if (outputBuffer_.readablebuffer() == 0)
            {
                channel_->disableWriting();
                if (writeCompleteCallback_)
                {
                    loop_->queueInLoop(std::bind(writeCompleteCallback_, std::shared_from_this()));
                }
                if (state_ == kDisconnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleRead\n");
        }
    }
    else
    {
        LOG_ERROR("Connection fd = %d is done\n", channel_->fd());
    }
}

void TcpConnection::handleClose()
{
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr connptr(std::shared_from_this());
    connectionCallback_(connptr);

    closeCallback_(connptr);
}

void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen = sizeof optval;

    int err;

    if (::getsockopt(channel_->fd(), SOL_SOCKET, optval, &optlen) < 0) {
        err = errno;

    }
    else {
        err = 0;
    }

    LOG_ERROR("TcpConnection::handleError [%s] - SO_ERROR = %d\n", name_.c_str(), err);
}


void TcpConnection::send(const std::string &buf) {
    if (state_ == kConnected) {
        if(loop_->isInLoopThread()) {
            sendInLoop(buf.c_str(),buf.size());
        }
        else {
            loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));
        }
    }
}

//应用写的快,内核写得慢,所以需要缓冲区,且设置水位线,防止写入过快
void TcpConnection::sendInLoop(const void* message, size_t len)
{
    ssize_t wrote = 0;
    ssize_t remaining = len;
    bool faulterr = false;

    if (state_ == kDisconnected) {
        LOG_ERROR("disconnected,give up writing! \n");
        return;
    }

    //channel第一次开始写数据,而且缓冲区没有待发送数据
    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
        wrote = ::write(channel_->fd(), message, len);

        if (wrote >= 0) {
            remaining = len - wrote;
            
            //数据一次性发送完成,不用再对poller设置epollout事件
            if (remaining == 0 && WriteCompleteCallback_) {
                loop_->queueInLoop(std::bind(WriteCompleteCallback_,shared_from_this()));
            }
        }
        else {
            wrote = 0;

            if (errno != EWOULDBLOCK) {
                LOG_ERROR("TcpConnect::sendInLoop\n");

                if (errno == EPIPE || errno == ECONNRESET) {
                    faulterr = true;
                }
            }
            
        }
    }

    //说明这次write没有把数据全部发送,剩余数据需要保存到缓冲区中,
    //给channel注册epollout事件,poller会通知相应sock->channel调用相应回调方法
    if (!faulterr && wrote > 0) {
        size_t oldLen = outputBuffer_.readableBytes();
        if (oldLen + remaining >= highWaterMark_
            && oldLen < highWaterMark_
            && highWaterMarkCallback_)
        {
            loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
        }
        outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
        if (!channel_->isWriting())
        {
            channel_->enableWriting();
        }
    }
}

void TcpConnection::connectEstablished()
{
    setState(kConnected);
    channel_->tie(shared_from_this());
    channel_->enableReading();

    connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed()
{
    if (state_ == kConnected) {
        setState(kDisconnected);
        channel_->disableAll();
        ConnectionCallback_(shared_from_this());
    }
    channel_->remove();
}

void TcpConnection::shutdown()
{
    if (state_ == kConnected){
        setState(kDisconnecting);
        // FIXME: shared_from_this()?
        loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
    }
}

void TcpConnection::shutdownInLoop()
{
    if (!channel_->isWriting())
    {
        // we are not writing
        socket_->shutdownWrite();
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/09/09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • CallBack.hpp
  • TcpConnection.hpp
  • TcpConnection.cc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档