前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >缩略muduo库(4):事件循环 EventLoop

缩略muduo库(4):事件循环 EventLoop

作者头像
看、未来
发布2021-10-09 15:33:33
2390
发布2021-10-09 15:33:33
举报

文章目录

获取线程ID

每一个线程都有一个EventLoop,每个loop里面都会有很多的channel,每个channel的任务都要在自己的线程中完成。 为了管理这些线程,设置了一份获取线程ID的代码,辅助管理。

代码语言:javascript
复制
#pragma once

#include<unistd.h>
#include<sys/syscall.h>

namespace CurrentThread{
    //__thread:这个变量虽然是一个全局变量,但是加上这个修饰的话,其在每一个线程中都会保有一份缓存,互不干扰
    extern __thread int t_CachedTid;   
    extern __thread char t_TidString[32]; 
    extern __thread int t_TidStringLength; 
    extern __thread const char* t_ThreadNum;

    void cacheTid();    //做一份缓存

    inline int tid(){
        if (__builtin_expect(t_CachedTid == 0, 0)){ //底层优化,不再赘述
        cacheTid();
        }
        return t_CachedTid;
    }


    // for logging
    inline const char* tidString() {
        return t_TidString;
    }

    // for logging
    inline int tidStringLength() {
        return t_TidStringLength;
    }

    inline const char* name(){
        return t_ThreadNum;
    }
}
代码语言:javascript
复制
#include "currenthread.hpp"

#include<stdio.h>

namespace CurrentThread{
    __thread int t_cachedTid = 0;
    __thread char t_tidString[32];
    __thread int t_tidStringLength = 6;
    __thread const char* t_threadName = "unknown";

    void cacheTid(){
        if (t_CachedTid == 0){
            t_CachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
            t_TidStringLength = snprintf(t_tidString, sizeof t_tidString, "%5d ", t_cachedTid);
        }
    }
}

事件循环 EventLoop

太恐怖了,cc文件莫名丢失,赶紧来备份一下。。。

代码语言:javascript
复制
#pragma once

#include "nocopyable.hpp"
#include "timestamp.hpp"
#include "currenthread.hpp"

#include <atomic>
#include <functional>
#include <memory>
#include <sys/eventfd.h> //去了解一下
#include <fcntl.h>
#include <mutex>
#include <vector>

class Channel;
class Poller;

class EventLoop:public nocpoyable{
public:
    //typedef std::function<void()> Functor;
    using Functor = std::function<void()>;

    EventLoop();
    ~EventLoop();  
 
    void loop();
    void quit();

    timestamp pollReturnTime() const { return pollReturnTime_; }

    void runInLoop(Functor cb); //在当前loop中运行
    void queueInLoop(Functor cb);//放到队列中,唤醒loop所在的线程去执行

    void wakeup();  //唤醒loop所在的线程

    //Poller的方法
    void updateChannel(Channel* channel);
    void removeChannel(Channel* channel);
    bool hasChannel(Channel* channel);

    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
    
private:

    void handleRead();  // waked up
    void doPendingFunctors();   //执行回调

    using ChannelList = std::vector<Channel*>;
    std::atomic_bool looping_;  //原子操作的bool值
    std::atomic_bool quit_; //退出loop循环
    std::atomic_bool callingPendingFunctors_; //是否有需要回调的操作
    const pid_t threadId_;
    timestamp pollReturnTime_;  //返回发生事件的时间点

    std::unique_ptr<Poller> poller_;
  
    int wakeupFd_; 
    //当mainloop获取一个新用户的channel时,通过轮询算法选择一个subloop,
    //通过该成员唤醒subloop

    std::unique_ptr<Channel> wakeupChannel_;

    ChannelList activeChannels_;
    Channel* currentActiveChannel_;

    std::mutex mutex_;
    std::vector<Functor> pendingFunctors_;   //存储loop需要的所有回调操作
};
代码语言:javascript
复制
#include "eventloop.hpp"
#include "logger.hpp"
#include "poller.hpp"
#include "channel.hpp"

#include <mutex>

//防止一个线程创建多个EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;

//定义默认的poller超时时间
const int kPollTimers = 10000;

//通过轮询的方式唤醒channel
int createEventfd(){
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0){
        LOG_ERROR("Failed in eventfd%d\n", errno);
    }
    return evtfd;
}

EventLoop::EventLoop()
    : looping_(false),
      quit_(false),
      callingPendingFunctors_(false),
      threadId_(CurrentThread::tid()),
      poller_(Poller::newDefaultPoller(this)),
      wakeupFd_(createEventfd()),
      wakeupChannel_(new Channel(this, wakeupFd_)),
      currentActiveChannel_(nullptr)
{
    LOG_DEBUG("EventLoop created %p in thread\n", threadId_);
    if (t_loopInThisThread){
        LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
    }
    else{
        t_loopInThisThread = this;
    }

    //设置wakeup事件类型及发生事件后的回调操作
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));

    //每一个EventLoop都将监听wakeupchannel的EPOLL读事件
    wakeupChannel_->enableReading();
}

EventLoop::~EventLoop(){
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

void EventLoop::loop(){
    looping_ = true;
    quit_ = false; // FIXME: what if someone calls quit() before loop() ?
    LOG_INFO("EventLoop %p start looping \n",this);

    while (!quit_){
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimers, &activeChannels_);

        for (Channel *channel : activeChannels_){
            currentActiveChannel_ = channel;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;

        //执行当前EventLoop事件循环需要的事件操作
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop looping\n",this);
    looping_ = false;
}

void EventLoop::quit(){
    quit_ = true;
   
    if (!isInLoopThread()){
        wakeup();
    }
}

void EventLoop::runInLoop(Functor cb){
    if (isInLoopThread()){
        cb();
    }
    else{
        queueInLoop(std::move(cb));
    }
}

void EventLoop::queueInLoop(Functor cb){
    
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.push_back(std::move(cb));
    }

    if (!isInLoopThread() || callingPendingFunctors_){
        wakeup();
    }
}

void EventLoop::updateChannel(Channel *channel){
    poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel){
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel){
    return poller_->hasChannel(channel);
}

void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one){
        LOG_ERROR("EventLoop::wakeup() writes %d bytes instead of 8 \n",n);
    }
}

void EventLoop::handleRead(){
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one){
        LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8\n", n);
    }
}

void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (const Functor &functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/09/04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 获取线程ID
  • 事件循环 EventLoop
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档