前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Chromium】Base库的SimpleThread

【Chromium】Base库的SimpleThread

原创
作者头像
lealc
发布2024-03-13 16:47:32
1560
发布2024-03-13 16:47:32
举报
文章被收录于专栏:Chromium学习Chromium学习

源码

先附上可用于学习的开源代码:Base库

喜欢可以帮忙Star一下

前言

编译:参考Base库即可

环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17

SimpleThread

首先需要说明的是,既然有了base::Thread,为什么还要有base::SimpleThread?

官方解释:你应该使用 Thread(thread.h)替代。Thread 是 Chrome 基于消息循环的线程抽象,如果你是在浏览器中运行的线程,那么很可能有假设你的线程将具有关联的消息循环。

这是一个简单的线程接口,它与本地操作系统线程相关联。只有在不需要关联消息循环的线程时才应使用该接口,最好的例子是单元测试。

使用最简单的接口是 DelegateSimpleThread,它将创建一个新线程,并在该新线程中执行 Delegate 的虚拟 Run() 方法,直到完成并退出线程。例如:

代码语言:C++
复制
// 在这里面实现自定义的线程运行逻辑
 class MyThreadRunner : public DelegateSimpleThread::Delegate { ... };

 MyThreadRunner runner;
 DelegateSimpleThread thread(&runner, "good\_name\_here");

 // Start 方法将在成功启动和初始化线程后返回。
 // 新创建的线程将调用 runner->Run(),并一直运行直到返回。
 thread.Start();

 // thread.Join() 方法将等待线程退出。必须 调用 Join!
 thread.Join();
 // SimpleThread 对象仍然有效,但是你不能再次调用 Join 或 Start 方法。

可以理解为base::Thread的一个简化版、轻量版

SimpleThread和base::Thread相同,也具有线程选项、线程管控基本功能,不同的是

SimpleThread新增了两个模块:DelegateSimpleThread和DelegateSimpleThreadPool

由于SimpleThread不包含线程循环,所以必须要定义其线程做的事情,也就是源码所给出的DelegateSimpleThread这个类,向我们展示了如何正确使用SimpleThread来操作线程,如果没有扩展的需要,可以直接使用DelegateSimpleThread以及包含的DelegateSimpleThread::Delegate来分别定义线程以及对应的逻辑,Delegate实际就是线程将自己的执行逻辑委托出去,抽象了一下。

类图一览

SimpleThread类图
SimpleThread类图

SimpleThread::Option

SimpleThread的Option简单很多,仅提供了优先级、堆栈大小、是否可join这些设置

代码语言:C++
复制
struct BASE_EXPORT Options {
  public:
  Options() = default;
  explicit Options(ThreadPriority priority_in) : priority(priority_in) {}
  ~Options() = default;

  // Allow copies.
  Options(const Options& other) = default;
  Options& operator=(const Options& other) = default;

  // A custom stack size, or 0 for the system default.
  size_t stack_size = 0;

  ThreadPriority priority = ThreadPriority::NORMAL;

  // If false, the underlying thread's PlatformThreadHandle will not be kept
  // around and as such the SimpleThread instance will not be Join()able and
  // must not be deleted before Run() is invoked. After that, it's up to
  // the subclass to determine when it is safe to delete itself.
  bool joinable = true;
};

其中优先级定义如下:

代码语言:C++
复制
// Valid values for priority of Thread::Options and SimpleThread::Options, and
// SetCurrentThreadPriority(), listed in increasing order of importance.
enum class ThreadPriority : int {
  // 适用于不应中断高优先级工作的线程。   
  BACKGROUND,     
  // 默认优先级级别。    
  NORMAL,     
  // 适用于为显示生成数据的线程(大约 60Hz)。
  DISPLAY,     
  // 适用于低延迟、抗干扰的音频。
  REALTIME_AUDIO,
};

SimpleThread源码

接下来对SimpleThread源码进行详细解读

成员函数

对外接口提供

代码语言:C++
复制
// 创建一个 SimpleThread。|options| 应该用于管理涉及线程创建和管理的特定配置。
// 每个线程都有一个名称,它是一个显示字符串,用于标识线程。
// 直到调用 Start() 方法之前,线程不会被创建。
explicit SimpleThread(const std::string& name);
SimpleThread(const std::string& name, const Options& options);

~SimpleThread() override;

// 启动线程,并在线程启动和初始化后(即调用 ThreadMain() 后)才返回。
void Start();

// 加入线程。如果使用 StartAsync() 启动线程,则首先等待线程干净地启动,然后加入线程。
void Join();

// 启动线程,但立即返回,而不等待线程首先进行初始化(即不等待 ThreadMain() 被运行)。
void StartAsync();

// 子类应该重写 Run 方法。
virtual void Run() = 0;

// 返回线程 ID,在线程启动后才有效。如果使用 Start() 启动线程,则在调用 Start() 后它将有效。
// 如果使用 StartAsync() 启动线程,则在 HasBeenStarted() 返回 True 之前不能调用此方法。
PlatformThreadId tid();

// 如果线程已启动并初始化(即 ThreadMain() 已运行),则返回 True。
// 如果使用 StartAsync() 启动线程,但尚未初始化(即 ThreadMain() 尚未运行),则返回 False。
bool HasBeenStarted();

// 如果曾经调用过 Join(),则返回 True。
bool HasBeenJoined() const { return joined_; }

// 如果调用过 Start() 或 StartAsync(),则返回 true。
bool HasStartBeenAttempted() { return start_called_; }

// 从 PlatformThread::Delegate 中重写:
void ThreadMain() override;

成员变量

代码语言:C++
复制
const std::string name_;
const Options options_;
PlatformThreadHandle thread_;  // PlatformThread handle, reset after Join.
WaitableEvent event_;          // Signaled if Start() was ever called.
PlatformThreadId tid_ = kInvalidThreadId;  // The backing thread's id.
bool joined_ = false;                      // True if Join has been called.
// Set to true when the platform-thread creation has started.
bool start_called_ = false;

部分源码解析

Start和StartAsync

同步启动会进行等待启动线程成功后才会返回,异步则会直接返回,线程是否启动成功由调用方决策

代码语言:C++
复制
void SimpleThread::Start() {
  StartAsync();
  ScopedAllowBaseSyncPrimitives allow_wait;
  event_.Wait();  // Wait for the thread to complete initialization.
}

void SimpleThread::StartAsync() {
  DCHECK(!HasStartBeenAttempted()) << "Tried to Start a thread multiple times.";
  start_called_ = true;
  BeforeStart();
  bool success =
      options_.joinable
          ? PlatformThread::CreateWithPriority(options_.stack_size, this,
                                               &thread_, options_.priority)
          : PlatformThread::CreateNonJoinableWithPriority(
                options_.stack_size, this, options_.priority);
  CHECK(success);
}
ThreadMain

线程实际执行的主要函数,可以看到委托出去的Delegate定义的Run会最后执行,前面加上了BeforeRun的hook以及Start函数同步等待事件的触发

代码语言:C++
复制
void SimpleThread::ThreadMain() {
  tid_ = PlatformThread::CurrentId();
  PlatformThread::SetName(name_);

  // We've initialized our new thread, signal that we're done to Start().
  event_.Signal();

  BeforeRun();
  Run();
}
Join

join函数也加入了BeforeJoin的hook,方便继承子类自定义的需求使用

代码语言:C++
复制
void SimpleThread::Join() {
  DCHECK(options_.joinable) << "A non-joinable thread can't be joined.";
  DCHECK(HasStartBeenAttempted()) << "Tried to Join a never-started thread.";
  DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
  BeforeJoin();
  PlatformThread::Join(thread_);
  thread_ = PlatformThreadHandle();
  joined_ = true;
}
Run

SimpleThread提供了Run这个纯虚函数,这也是为什么不能直接使用SimpleThread的原因,需要子类继承后定义Run函数再使用

代码语言:C++
复制
  // Subclasses should override the Run method.
  virtual void Run() = 0;

DelegateSimpleThread

DelegateSimpleThread 是一个简单的线程,它将 Run() 委托给其 Delegate。非可加入(non-joinable)的 DelegateSimpleThread 在调用了 Run() 后可以安全地删除,从这个类的角度来看,它们的 Delegate 也可以在此时安全地删除(尽管实现必须确保在删除后,Run() 不会再使用 Delegate 的成员状态)。

源码解析

委托Run函数给DelegateSimpleThread::Delegate

代码语言:C++
复制
class BASE_EXPORT DelegateSimpleThread : public SimpleThread {
 public:
  class BASE_EXPORT Delegate {
   public:
    virtual ~Delegate() = default;
    virtual void Run() = 0;
  };

  DelegateSimpleThread(Delegate* delegate,
                       const std::string& name_prefix);
  DelegateSimpleThread(Delegate* delegate,
                       const std::string& name_prefix,
                       const Options& options);

  ~DelegateSimpleThread() override;
  void Run() override;

 private:
  Delegate* delegate_;

  DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThread);
};
Run

可以看到delegate是一次性使用,使用完后会置为空指针

代码语言:C++
复制
void DelegateSimpleThread::Run() {
  DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";

  // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run().
  // Member state must not be accessed after invoking Run().
  Delegate* delegate = delegate_;
  delegate_ = nullptr;
  delegate->Run();
}

DelegateSimpleThreadPool

DelegateSimpleThreadPool 允许您启动固定数量的线程,然后将作业分派给这些线程。当有大量需要以多线程方式完成的小任务,但又不想为每个小任务启动一个线程时,这非常方便。

只需调用 AddWork() 将委托添加到待处理的作业列表中。JoinAll() 将确保处理所有未完成的作业,并等待所有任务完成。您可以重用线程池,因此在调用 JoinAll() 后可以再次调用 Start()。

这和线程池PostTask有啥区别呢? DelegateSimpleThreadPool是启动固定数目一系列DelegateSimpleThread线程,然后将一系列DelegateSimpleThread::Delegate线程的委托作业丢给这些线程进行处理。(统一执行的都是里面的Run函数)

而线程池则是一系列工作线程,执行的是各种各样自定义的函数逻辑。

头文件一览

Public继承自DelegateSimpleThread::Delegate,表明每个里面的线程都会执行同样的Run函数

代码语言:C++
复制
class BASE_EXPORT DelegateSimpleThreadPool
    : public DelegateSimpleThread::Delegate {
 public:
  typedef DelegateSimpleThread::Delegate Delegate;

  DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads);
  ~DelegateSimpleThreadPool() override;

  // Start up all of the underlying threads, and start processing work if we
  // have any.
  void Start();

  // Make sure all outstanding work is finished, and wait for and destroy all
  // of the underlying threads in the pool.
  void JoinAll();

  // It is safe to AddWork() any time, before or after Start().
  // Delegate* should always be a valid pointer, NULL is reserved internally.
  void AddWork(Delegate* work, int repeat_count);
  void AddWork(Delegate* work) {
    AddWork(work, 1);
  }

  // We implement the Delegate interface, for running our internal threads.
  void Run() override;

 private:
  const std::string name_prefix_;
  int num_threads_;
  std::vector<DelegateSimpleThread*> threads_;
  base::queue<Delegate*> delegates_;
  base::Lock lock_;            // Locks delegates_
  WaitableEvent dry_;    // Not signaled when there is no work to do.

  DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThreadPool);
};

部分源码解析

构造函数

池子线程数目不可变更

代码语言:C++
复制
DelegateSimpleThreadPool::DelegateSimpleThreadPool(
    const std::string& name_prefix,
    int num_threads)
    : name_prefix_(name_prefix),
      num_threads_(num_threads),
      dry_(WaitableEvent::ResetPolicy::MANUAL,
           WaitableEvent::InitialState::NOT_SIGNALED) {}
Start

启动一系列DelegateSimpleThread,然后运行

代码语言:C++
复制
void DelegateSimpleThreadPool::Start() {
  DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
  for (int i = 0; i < num_threads_; ++i) {
    std::string name(name_prefix_);
    name.push_back('/');
    name.append(NumberToString(i));
    DelegateSimpleThread* thread = new DelegateSimpleThread(this, name);
    thread->Start();
    threads_.push_back(thread);
  }
}
Run

无休止的执行队列中的任务,直到队列传入空指针任务进来

注意这里我们初始化的是WaitableEvent::ResetPolicy::MANUAL,表明这个信号不会自动重置,每个线程都会不停的被激活去清空任务队列,直到某个线程拿到了空任务,才会调用Reset叫大家停下来

代码语言:C++
复制
void DelegateSimpleThreadPool::Run() {
  Delegate* work = nullptr;

  while (true) {
    dry_.Wait();
    {
      AutoLock locked(lock_);
      if (!dry_.IsSignaled())
        continue;

      DCHECK(!delegates_.empty());
      work = delegates_.front();
      delegates_.pop();

      // Signal to any other threads that we're currently out of work.
      if (delegates_.empty())
        dry_.Reset();
    }

    // A NULL delegate pointer signals us to quit.
    if (!work)
      break;

    work->Run();
  }
}
JoinAll

给任务队列塞线程数目的空指针任务,然后等待每个线程的退出,最后反初始化所有线程

代码语言:C++
复制
void DelegateSimpleThreadPool::JoinAll() {
  DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";

  // Tell all our threads to quit their worker loop.
  AddWork(nullptr, num_threads_);

  // Join and destroy all the worker threads.
  for (int i = 0; i < num_threads_; ++i) {
    threads_[i]->Join();
    delete threads_[i];
  }
  threads_.clear();
  DCHECK(delegates_.empty());
}
AddWork

新建一个任务,可以指定这个任务重复执行多少次

代码语言:C++
复制
void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
  AutoLock locked(lock_);
  for (int i = 0; i < repeat_count; ++i)
    delegates_.push(delegate);
  // If we were empty, signal that we have work now.
  if (!dry_.IsSignaled())
    dry_.Signal();
}

代码示例

简单功能

赋值stack_int为7,创建线程进行,Start之前不会进行赋值,Join后判断是否赋值成功,这也是常规线程使用逻辑,最好不要在Start和Join之间做一些逻辑

代码语言:C++
复制
class SetIntRunner : public DelegateSimpleThread::Delegate {
public:
    SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) {}
    ~SetIntRunner() override = default;

private:
    void Run() override { *ptr_ = val_; }

    int* ptr_;
    int val_;

    DISALLOW_COPY_AND_ASSIGN(SetIntRunner);
};

int stack_int = 0;

SetIntRunner runner(&stack_int, 7);
EXPECT_EQ(0, stack_int);

DelegateSimpleThread thread(&runner, "int_setter");
EXPECT_FALSE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());
EXPECT_EQ(0, stack_int);

thread.Start();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());

thread.Join();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_TRUE(thread.HasBeenJoined());
EXPECT_EQ(7, stack_int);

事件传递

创建线程处理一些耗时任务,当前线程阻塞等待执行结果,常适用于一些IO、文件等耗时任务

代码语言:C++
复制
class WaitEventRunner : public DelegateSimpleThread::Delegate {
public:
    explicit WaitEventRunner(WaitableEvent* event) : event_(event) {}
    ~WaitEventRunner() override = default;

private:
    void Run() override {
        EXPECT_FALSE(event_->IsSignaled());
      	// 做一些异步耗时操作,做完后触发信号告诉任务已经完成
        event_->Signal();
        EXPECT_TRUE(event_->IsSignaled());
    }

    WaitableEvent* event_;

    DISALLOW_COPY_AND_ASSIGN(WaitEventRunner);
};

// Create a thread, and wait for it to signal us.
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
    WaitableEvent::InitialState::NOT_SIGNALED);

WaitEventRunner runner(&event);
DelegateSimpleThread thread(&runner, "event_waiter");

EXPECT_FALSE(event.IsSignaled());
thread.Start();
event.Wait();
EXPECT_TRUE(event.IsSignaled());
thread.Join();

顺序执行

定义一些顺序执行原子任务放入到线程池中执行,Start线程池会启动执行,并且Start前也可以AddWork。

代码语言:C++
复制
// AtomicSequenceNumber is a thread safe increasing sequence number generator.
// Its constructor doesn't emit a static initializer, so it's safe to use as a
// global variable or static member.
class AtomicSequenceNumber {
 public:
  constexpr AtomicSequenceNumber() = default;

  // Returns an increasing sequence number starts from 0 for each call.
  // This function can be called from any thread without data race.
  inline int GetNext() { return seq_.fetch_add(1, std::memory_order_relaxed); }

 private:
  std::atomic_int seq_{0};

  DISALLOW_COPY_AND_ASSIGN(AtomicSequenceNumber);
};

class SeqRunner : public DelegateSimpleThread::Delegate {
public:
    explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) {}

private:
    void Run() override { seq_->GetNext(); }

    AtomicSequenceNumber* seq_;

    DISALLOW_COPY_AND_ASSIGN(SeqRunner);
};

AtomicSequenceNumber seq;
SeqRunner runner(&seq);
DelegateSimpleThreadPool pool("seq_runner", 10);

// Add work before we're running.
pool.AddWork(&runner, 300);

EXPECT_EQ(seq.GetNext(), 0);
pool.Start();

// Add work while we're running.
pool.AddWork(&runner, 300);

pool.JoinAll();

EXPECT_EQ(seq.GetNext(), 601);

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码
  • 前言
  • SimpleThread
  • 类图一览
  • SimpleThread::Option
  • SimpleThread源码
    • 成员函数
      • 成员变量
        • 部分源码解析
          • Start和StartAsync
          • ThreadMain
          • Join
          • Run
      • DelegateSimpleThread
        • 源码解析
          • Run
      • DelegateSimpleThreadPool
        • 头文件一览
          • 部分源码解析
            • 构造函数
            • Start
            • Run
            • JoinAll
            • AddWork
        • 代码示例
          • 简单功能
            • 事件传递
              • 顺序执行
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档