前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【系列教程】多线程实现都需要注意什么?

【系列教程】多线程实现都需要注意什么?

原创
作者头像
会玩code
发布2022-08-31 08:57:47
4080
发布2022-08-31 08:57:47
举报
文章被收录于专栏:会玩code会玩code

首发地址

day05 多线程实现都需要注意什么?

项目仓库地址

https://github.com/lzs123/CProxy,欢迎fork and star!

往期教程

day01-从一个基础的socket服务说起

day02 真正的高并发还得看IO多路复用

day03 C++项目开发配置最佳实践(vscode远程开发配置、格式化、代码检查、cmake管理配置)

day04 高性能服务设计思路


工作线程如何初始化?

在我们的设计中,工作线程本身是一个事件循环,启动后会陷入阻塞,等待事件发生。为了达到这个效果,线程启动时需要做一些初始化工作。 我们定义了EventLoopThread类,类的定义如下

代码语言:javascript
复制
class EventLoopThread {
    public:
    EventLoopThread()
    : thread_(std::bind(&EventLoopThread::ThreadFunc, this)),
      mutex_(),
      cond_(){};
    ~EventLoopThread();
    void StartLoop();
    void ThreadFunc();
    void AddChannel(SP_Channel);
    void AddConn(SP_Conn);
    SP_EventLoop GetLoop() { return loop_; }
    
    private:
    SP_EventLoop loop_;
    bool started_;
    std::thread thread_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

SP开头的代表对应类的shared_ptr智能指针类型;比如SP_EventLoop =std::shared_ptr<EventLoop>

EventLoopThread构造函数中,创建了一个std::thread对象,并以EventLoopThread::ThreadFunc作为线程执行函数。

代码语言:javascript
复制
void EventLoopThread::ThreadFunc() try {
  if (loop_) {
    throw "loop_ is not null";
  }
  loop_ = SP_EventLoop(new EventLoop());
  {
    std::unique_lock<std::mutex> lock(mutex_);
    started_ = true;
    cond_.notify_all();
  }
  loop_->Loop();
} catch (std::exception& e) {
  SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
  abort();
}

我们先看看第5行:loop_ = SP_EventLoop(new EventLoop());初始化了一个EventLoop并赋值到EventLoopThread成员变量loop_上,我们先看看EventLoop的定义

代码语言:javascript
复制
class EventLoop {
 public:
  EventLoop() : poller_(SP_Epoll(new Epoll())){};
  void Loop();
  void AddToPoller(SP_Channel channel);
  void UpdateToPoller(SP_Channel channel);
  void RemoveFromPoller(SP_Channel channel);

 private:
  SP_EventDispatcher poller_;
};

可以看到,在EventLoop的构造函数中,初始化了一个Epoll对象赋值到变量poller_上。poller_本身是个EventDispacther对象,Epoll继承了EventDispatcher,表示基于epoll实现的事件分发; 这样做的好处是如果要新增一种事件分发机制,比如项目要支持mac环境,我们需要用kqueue代替epoll实现事件分发。这个时候我们只需要修改EventLoop的构造函数,将新的事件分发对象Kqueue赋值给poller_即可。 我们再看看Epoll在初始化时做了什么。

代码语言:javascript
复制
Epoll::Epoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)), epoll_events_(EVENTSNUM) {
  assert(epoll_fd_ > 0);
}

调用了epoll_create1,创建了一个epoll实例,也就是说,一个Epoll对象初始化时,就已经在内核准备好了一个eventpoll对象,我们可以添加套接字并监听相关事件了。 看回EventLoopThread::ThreadFunc, 在初始化一个EventLoop对象后,EventLoopThread::ThreadFunc会再调用loop_->Loop(),这个就是我们之前说的事件循环了。

代码语言:javascript
复制
void EventLoop::Loop() {
  std::vector<SP_Channel> ready_channels;
  for (;;) {
    ready_channels.clear();
    ready_channels = poller_->WaitForReadyChannels();
    for (SP_Channel chan : ready_channels) {
      chan->HandleEvents();
    }
  }
}

EventLoop::Loop本身是个死循环,大概逻辑也比较简单,就是不断从poller_中获取触发事件的channel列表,再遍历该列表,调用对应的HandleEvent事件处理函数。 当没有事件时,该循环会阻塞在WaitForReadyChannels处,底层其实是阻塞在epoll_wait。(阻塞过程中线程是挂起状态,并不会占用cpu)。 我们简单回顾下线程的初始化。

  1. 创建了一个EventLoop对象,底层通过调用epoll_create1创建了epoll实例,可以通过该epoll实例添加事件监听。
  2. 之后调用EventLoop::Loop,在没有事件时,线程会陷入阻塞;当有事件发生时,会调用注册时对应的handleEvents方法进行处理。

如何控制线程启动的顺序?

上面我们讲了线程的初始化,但初始化后,EventLoopThread还需要调用StartLoop才能开始工作。这其实是为了让主线程等待线程池中的工作线程完成初始化。

为什么要控制?

首先讲讲主线程为什么要等待工作线程完成初始化。 在我们的线程模型设计中,主线程负责监听接收新连接请求,然后选择线程池中的一个工作线程,将新连接套接字交给工作线程处理。 假设工作线程不需要StartLoop,在工作线程初始化后直接加入到线程池。

代码语言:javascript
复制
void EventLoopThreadPool::start() {
  for (int i = 0; i < thread_num_; i++) {
    SP_EventLoopThread t(new EventLoopThread());
    // t->StartLoop();
    threads_.emplace_back(t);
  }
}

当有新连接时,主线程通过从线程池中获取一个工作线程。但这个时候,我们是没法保证选出的工作线程是已经初始化了loop_的。因为EventLoopThread::ThreadFunc的执行是异步的,执行顺序可能如下。

在主线程选中将新连接添加到工作线程中时,工作线程的loop_此时还未初始化,可能会导致程序直接coredump。 所以,我们必须想办法让工作线程的EventLoop初始化在主线程开始接收新连接请求之前。

如何控制?

这实际上是一个多线程的通知问题,我们主要采用的是mutexcondition这两个武器,通过条件变量来完成通知。

在C++中,我们通常使用condition_variable搭配互斥量mutex来处理线程间同步问题。主要用到的是condition_variable::notify_xxcondition_variable::wait函数。 顾名思义,wait就是一个等待的作用,假设我们在线程A进行wait,流程如下:

lock获取到锁,

调用wait,会先自动unlock释放锁,然后将线程阻塞住;

被其他线程唤醒后,会自动lock获取锁,继续执行wait的下一行代码

唤醒线程的函数是notify_allnotify_one,两者的区别在于 notify_one()每次只唤醒一个线程,那么notify_all()函数会唤醒所有等待中的线程(当最终能抢到锁的只有一个线程)。调用流程如下:

lock获取到锁

调用notify_all/notify_one,唤醒等待中的线程

释放锁

我们为EventLoopThread引入了StartLoop方法,大概效果如下

为了阅读方便,在这里再贴一遍EventLoopThread相关的代码

代码语言:javascript
复制
// lib/event_loop_thread.h
class EventLoopThread {
    public:
    ...
    void StartLoop();
    void ThreadFunc();
    ...
    private:
    ...
    bool started_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

// lib/event_loop_thread.cpp
void EventLoopThread::ThreadFunc() try {
  if (loop_) {
    throw "loop_ is not null";
  }
  loop_ = SP_EventLoop(new EventLoop());
  {
    std::unique_lock<std::mutex> lock(mutex_);
    started_ = true;
    cond_.notify_all();
  }
  loop_->Loop();
} catch (std::exception& e) {
  SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
  abort();
}

void EventLoopThread::StartLoop() {
  std::unique_lock<std::mutex> lock(mutex_);
  while (!started_) cond_.wait(lock);
}

首先,我们需要明确,在工作线程初始化loop_后,就代表该线程已经准备完成,可以接收处理套接字了。所以我们在完成loop_的初始化后,将started_置为true,然后就发送notify通知唤醒等待线程。 而在StartLoop函数中,我们先检查started_是否为false,如果是true,代表工作线程已经初始化完loop_了,这种情况StartLoop不再需要wait,直接返回即可;如果started_false,则陷入等待,直到工作线程完成loop_初始化后唤醒。

如何将套接字添加到工作线程?

最后,我们仔细聊聊新连接套接字是如何添加到工作线程中的。 没有请求时,主线程会阻塞在accept调用,当有新连接请求时,accept会返回新连接套接字accept_fd。 主线程会先将accept_fd封装成一个Conn对象,上一节《day04 高性能服务设计思路》讲到项目中有多种连接,这些连接有一个共同的基类Conn, Conn主要是将套接字封装成一个Channel,并设置该Channel各种事件回调处理逻辑。不同类型的Conn有各自的回调处理逻辑。

接下来,主线程通过EventLoopThreadPool::PickRandThread获取一个工作线程。

代码语言:javascript
复制
SP_EventLoopThread EventLoopThreadPool::PickRandThread() {
  SP_EventLoopThread t;
  {
    std::unique_lock<std::mutex> lock(thread_mutex_);
    t = threads_[next_work_thread_Idx_];
    next_work_thread_Idx_ = (next_work_thread_Idx_ + 1) % thread_num_;
  }
  return t;
}

这里我们直接采用轮训策略选取线程池中的线程。 获取到工作线程后,我们直接调用EventLoopThread::AddConn,将该连接交由工作线程处理。

代码语言:javascript
复制
// lib/event_loop_thread.cpp
void EventLoopThread::AddConn(SP_Conn conn) { 
    loop_->AddToPoller(conn->GetChannel()); 
}

// lib/event_loop.cpp
void EventLoop::AddToPoller(SP_Channel channel) {
    poller_->PollAdd(channel); 
}

// lib/epoll.cpp
void Epoll::PollAdd(SP_Channel channel) {
  int fd = channel->getFd();
  epoll_event event;
  event.data.fd = fd;
  event.events = channel->GetEvents();
  if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) {
    SPDLOG_CRITICAL("epoll_ctl fd: {} err: {}", fd, strerror(errno));
  } else {
    fd2chan_[fd] = channel;
  }
}

可以发现,底层是调用epoll_ctl将套接字fd加到对应工作线程的epoll实例上。 这里值得注意的是,【套接字添加到工作线程的epoll实例】这个动作是在主线程上完成,由于epoll是线程安全的,所以在主线程直接操作工作线程的epoll实例是没有问题的。

继续思考

有没有办法将【套接字添加到工作线程的epoll实例】这个动作放到工作线程上完成呢?其实这种做法更为普遍,比如有些时候为了避免加锁,提高操作效率,某些操作需要由主线程触发,由工作线程执行。 这里的难点在于工作线程是本身是个无限循环,在没有事件发生时,会一直阻塞在epoll_wait上,这种情况下,主线程如何通知工作线程执行操作呢? 这里介绍一种思路,我们可以在EventLoop初始化时,通过eventfd()调用创建一个套接字event_fdEventLoop添加监听event_fd的读事件。 在EventLoop::Loop函数中,每次处理完一轮读写后,都会再执行一个函数doPendingFns(), 伪代码如下

代码语言:javascript
复制
void EventLoop::Loop() {
  std::vector<SP_Channel> ready_channels;
  for (;;) {
    ready_channels.clear();
    ready_channels = poller_->WaitForReadyChannels();
    for (SP_Channel chan : ready_channels) {
      chan->HandleEvents();
    }
    doPendingFns();
  }
}

void doPendingFns() {
  std::vector<Functor> fns;
  {
      MutexLockGuard lock(mutex_);
      fns.swap(pendingFns_);
  }
  for (auto fn : fns) {
    fn();
  }
}

主线程在需要工作线程执行某个函数时,只需要往工作线程的pendingFns列表添加对应的函数,再往event_fd随便写一些数据,让工作线程退出阻塞,工作线程最终会在doPendingFns遍历执行pendingFns列表中的全部函数。


如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 首发地址
  • 项目仓库地址
  • 往期教程
  • 工作线程如何初始化?
  • 如何控制线程启动的顺序?
    • 为什么要控制?
      • 如何控制?
      • 如何将套接字添加到工作线程?
        • 继续思考
        相关产品与服务
        云服务器
        云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档