前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Chromium】Base库的ThreadPool

【Chromium】Base库的ThreadPool

原创
作者头像
lealc
修改2024-03-25 10:37:43
870
修改2024-03-25 10:37:43
举报
文章被收录于专栏:Chromium学习Chromium学习

源码

先附上可用于学习的开源代码:Base库

喜欢可以帮忙Star一下

前言

编译:参考Base库即可

环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17

介绍

Base库的ThreadPool分为几部分,这里简单介绍一下基本用法

  • ThreadPoolInstance:定义大部分通用的线程池接口,并不能直接使用,某些接口(例如Start)为纯虚接口
  • ThreadPoolImpl:官方定义的真正线程池对外接口类,线程安全
  • ThreadPoolInstance::InitParams:线程启动参数,包括环境、数量以及空闲退出时长等
  • WorkerThreadObserver:工作线程观察者,提供针对工作线程的一些观测能力
  • TaskTraits:提供对执行的异步任务细节补充的能力,例如线程优先级等
  • ThreadGroup:线程组,线程组的接口和基本实现。线程组是线程池中一部分线程的子集(有关在发布任务和创建任务运行器时选择线程组的逻辑,请参阅 GetThreadGroupForTraits())
  • ThreadGroupImpl:一个运行任务的工作线程组。在调用Start()之前,线程组不会创建线程。任务可以随时发布,但在调用Start()之后才会运行。这个类是线程安全的。
  • TaskExecutor:TaskExecutor可以执行具有特定TaskTraits扩展ID的任务。为了处理通过//base/task/post_task.h API发布的任务,应该调用RegisterTaskExecutor()来注册TaskExecutor。
  • PooledTaskRunnerDelegate:PooledParallelTaskRunner和* PooledSequencedTaskRunner的委托接口。

本次仅介绍与线程池紧密相关的几个实现,ThreadGroup相关内容可以移步这篇ThreadPool的ThreadGroup

ThreadPoolInstance::InitParams

线程池启动环境配置,必须要有。

源码及相关说明如下:

代码语言:C++
复制
struct BASE_EXPORT InitParams{
	enum class CommonThreadPoolEnvironment {
      // 使用默认环境(无环境)。
      DEFAULT,
      #if defined(OS_WIN)
      // 将线程池的工作线程放置在 COM MTA 中。
      COM_MTA,
      // 将线程池的前台工作线程放置在 COM STA 中。这是为了模拟SequencedWorkerPool和BrowserThreadImpl的行为,而ThreadPool已经取代了它们。需要 COM STA 的任务应该使用CreateCOMSTATaskRunner()而不是Create(Sequenced)TaskRunnerWithTraits() + 这个初始化参数。
      DEPRECATED_COM_STA_IN_FOREGROUND_GROUP,
      #endif  // defined(OS_WIN)
    };

    InitParams(int max_num_foreground_threads_in);
    ~InitParams();

    // 前台线程组中可以同时运行的未阻塞任务的最大数量。
    int max_num_foreground_threads;

    // 在运行序列化和并行任务时,是否初始化 COM。
    CommonThreadPoolEnvironment common_thread_pool_environment = CommonThreadPoolEnvironment::DEFAULT;

    // 建议的未使用线程可以回收的时间间隔。
    TimeDelta suggested_reclaim_time = TimeDelta::FromSeconds(30);
};

WorkerThreadObserver

直接上源码,说明附在里面

代码语言:C++
复制
// 用于观察 ThreadPool 工作线程的主函数入口和退出的接口。

class WorkerThreadObserver {
public:
    virtual ~WorkerThreadObserver() = default;

    // 在 ThreadPool 工作线程的主函数开始之前调用,任何任务运行之前。
    virtual void OnWorkerThreadMainEntry() = 0;

    // 在 ThreadPool 工作线程的主函数结束时调用,无法再运行任务时。
    virtual void OnWorkerThreadMainExit() = 0;
};

ThreadPoolInstance

存放于\Chromium\Base\task\thread_pool\thread_pool.h

这是线程池的接口和一些静态方法定义,主要是用于抛任务使用的线程池示例,因有纯虚接口,所以不能直接使用,需要继承后再使用。

注意

  • 线程池的接口和静态方法,用于管理 post_task.h API 使用的实例。
  • 线程池在调用 Start() 之前不会创建线程。任务可以随时发布,但直到调用 Start() 后才会运行。
  • 该类的实例方法是线程安全的。
  • 除了进程的一个调用点管理进程的实例之外,所有线程池的用户都应该通过 base/task/post_task.h 而不是通过这个接口。

ThreadPool中相关源码解析

本节主要讲解一下ThreadPool部分核心能力的源码实现及背后的理解。

ScopedExecutionFence

ScopedExecutionFence用于在其作用域内阻止在ThreadPoolInstance中调度新的任务。它创建一个执行隔离的范围,防止新任务被调度执行。当ScopedExecutionFence对象被销毁时,之前被阻止的任务将被释放,允许它们继续执行。

这个机制可以用来控制任务的执行时机,例如在某些情况下需要暂时暂停任务的执行,或者在执行一些关键操作时需要确保没有其他任务被调度。通过创建ScopedExecutionFence对象并在需要的范围内使用它,可以实现这种控制和隔离的效果。

代码语言:C++
复制
// ScopedExecutionFence(最佳尝试)防止在其作用域内在ThreadPoolInstance中调度任何/BEST_EFFORT优先级的新任务。在其销毁时,被抢占的任务将被释放。
// 注意:ScopedExecutionFence的构造函数不会等待当前正在运行的任务(因为它们在进入此作用域之前发布,并且不违反契约;其中一些可能是CONTINUE_ON_SHUTDOWN,并且等待它们完成是不明智的)。

class BASE_EXPORT ScopedExecutionFence {
public:
    ScopedExecutionFence();
    ~ScopedExecutionFence();

private:
    DISALLOW_COPY_AND_ASSIGN(ScopedExecutionFence);
};

class BASE_EXPORT ScopedBestEffortExecutionFence {
public:
    ScopedBestEffortExecutionFence();
    ~ScopedBestEffortExecutionFence();

private:
    DISALLOW_COPY_AND_ASSIGN(ScopedBestEffortExecutionFence);
};

析构函数

代码语言:C++
复制
// 在线上环境中不允许销毁 ThreadPoolInstance,它将始终被泄漏。
// 在测试中,只有在 JoinForTesting() 返回之后才应该销毁 ThreadPoolInstance。
virtual ~ThreadPoolInstance() = default;

创建

代码语言:C++
复制
// 创建一个准备就绪的线程池。参数 |name| 用于标记,它不能为空。它应该标识创建 ThreadPoolInstance 的组件。
// 线程池在调用 Start() 之前不会创建线程。任务可以随时发布,但直到调用 Start() 后才会运行。在测试中,建议使用 base::test::ScopedTaskEnvironment(确保隔离性)。

static void Create(StringPiece name);

// 使用默认参数创建并启动一个线程池。参数 |name| 用于标记,它不能为空。它应该标识调用此方法的组件。
// 此方法会调用 Start(),之后再次调用它是无效的。如果失败,会触发 CHECK。
// 在测试中,建议使用 base::test::ScopedTaskEnvironment(确保隔离性)。
static void CreateAndStartWithDefaultParams(StringPiece name);

启动

代码语言:C++
复制
// 允许线程池根据 |init_params| 规范创建线程并运行任务。 
// // 如果指定了 |worker_thread_observer|,当一个工作线程进入和退出其主函数时,它将被通知。
// // 在 JoinForTesting() 返回之前,它不能被销毁(在生产环境中不能被销毁)。 
// // 如果失败,会触发 CHECK。 
virtual void Start( const InitParams& init_params, WorkerThreadObserver* worker_thread_observer = nullptr) = 0;

// 与 CreateAndStartWithDefaultParams() 相同,但允许调用者将 Create() 和 StartWithDefaultParams() 分开调用。
void StartWithDefaultParams();

等待

代码语言:C++
复制
// 等待直到没有待处理的非延迟任务。可以在测试中调用此方法,在所有非延迟任务运行后验证条件是否满足。
// 不等待延迟任务。在调用期间等待从其他线程发布的非延迟任务。当关闭完成时立即返回。
virtual void FlushForTesting() = 0;

// 当没有未完成的非延迟任务时,返回并调用 |flush_callback|。|flush_callback| 可以在任何线程上被调用,不应执行大量的工作。
// 当需要在刷新期间执行当前线程上的其他工作时,可以使用此方法。一次只能有一个 FlushAsyncForTesting() 在等待。
virtual void FlushAsyncForTesting(OnceClosure flush_callback) = 0;

// 加入所有线程。已经运行的任务被允许完成执行。此方法只能调用一次。在此调用期间或之后,不允许使用此线程池实例创建任务运行器或发布任务。
virtual void JoinForTesting() = 0;

终止

代码语言:C++
复制
// Shutdown() 是一个同步关闭线程池的方法。一旦调用此方法,只有使用 BLOCK_SHUTDOWN 行为发布的任务将会被运行。当该方法返回时:

// 所有已经运行的 SKIP_ON_SHUTDOWN 任务已经完成执行。
// 所有已发布的 BLOCK_SHUTDOWN 任务已经完成执行。CONTINUE_ON_SHUTDOWN 任务可能仍在运行。
// 需要注意的是,实现可以保持线程和其他资源的活动状态,以便在 Shutdown() 返回后继续运行 CONTINUE_ON_SHUTDOWN 任务。此方法只能调用一次。
virtual void Shutdown() = 0;

全局实例

支持设置自定义的线程池来替代默认线程池

代码语言:C++
复制
// 将 |thread_pool| 注册为处理通过 post_task.h API 发布的任务的线程池。在测试中,推荐使用 base::test::ScopedTaskEnvironment(确保隔离)。
static void Set(std::unique_ptr<ThreadPoolInstance> thread_pool);


// 获取通过 SetInstance() 或 Create() 设置的 ThreadPoolInstance。
// 应该非常少使用此方法;大多数线程池的使用者应该使用 post_task.h API。
// 特别是,不要这样做: if (!ThreadPoolInstance::Get()) { ThreadPoolInstance::Set(...); base::PostTask(...); } 
// 而是确保在进程初始化阶段的一个确定的位置尽早调用 SetInstance()。
static ThreadPoolInstance* Get();

其他

代码语言:C++
复制
  // Sets whether a fence prevents execution of tasks of any / BEST_EFFORT
  // priority.
  virtual void SetHasFence(bool can_run) = 0;
  virtual void SetHasBestEffortFence(bool can_run) = 0;


// 返回在此线程池中可以同时运行的具有 |traits| 的非单线程非阻塞任务的最大数量。|traits| 不能包含 TaskPriority::BEST_EFFORT。// 请不要使用此方法。
// 要处理 n 个项,请发布 n 个任务,每个任务处理一个项,而不是使用 GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated() 任务,每个任务处理 n/GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated() 个项。
int GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated( const TaskTraits& traits) const = 0;

ThreadPoolImpl

这是Base库默认实现的线程池类,同样是线程安全,继承关系,有需要的话同样模仿ThreadPoolImpl来实现自定义线程池

代码语言:C++
复制
class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
                                   public TaskExecutor,
                                   public ThreadGroup::Delegate,
                                   public PooledTaskRunnerDelegate

源码解析

针对ThreadPoolImpl的源码实现进行解析,方便我们自己继承使用

构造
代码语言:C++
复制
// 使用生产 TaskTracker 创建 ThreadPoolImpl。 
// |histogram_label| 用于标记,不能为空。 
explicit ThreadPoolImpl(StringPiece histogram_label);
// 仅用于测试。使用自定义 TaskTracker 和 TickClock 创建 ThreadPoolImpl。 
ThreadPoolImpl(StringPiece histogram_label, 
               std::unique_ptr<TaskTrackerImpl> task_tracker,
               const TickClock* tick_clock);
~ThreadPoolImpl() override;
继承(均为Public属性)

以下函数均为Public继承而来的能力:

代码语言:C++
复制
  // ThreadPoolInstance:
  void Start(const ThreadPoolInstance::InitParams& init_params,
             WorkerThreadObserver* worker_thread_observer) override;
  int GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
      const TaskTraits& traits) const override;
  void Shutdown() override;
  void FlushForTesting() override;
  void FlushAsyncForTesting(OnceClosure flush_callback) override;
  void JoinForTesting() override;
  void SetHasFence(bool has_fence) override;
  void SetHasBestEffortFence(bool has_best_effort_fence) override;

  // TaskExecutor:
  bool PostDelayedTask(const Location& from_here,
                       const TaskTraits& traits,
                       OnceClosure task,
                       TimeDelta delay) override;
  scoped_refptr<TaskRunner> CreateTaskRunner(const TaskTraits& traits) override;
  scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner(const TaskTraits& traits) override;
  scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner(const TaskTraits& traits,SingleThreadTaskRunnerThreadMode thread_mode) override;
#if defined(OS_WIN)
  scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(const TaskTraits& traits,SingleThreadTaskRunnerThreadMode thread_mode) override;
#endif  // defined(OS_WIN)
  scoped_refptr<UpdateableSequencedTaskRunner> CreateUpdateableSequencedTaskRunner(const TaskTraits& traits);
Public函数
代码语言:C++
复制
// 返回 ThreadPool 上调度的下一个任务的 TimeTicks(如果是立即执行,则返回 Now();如果没有任务,则返回 nullopt)。
// 这是线程安全的,即使在调用此方法时并行发布任务,也是安全的,但这种情况下可能存在竞争,无法确定此调用是否及时看到新任务。
Optional<TimeTicks> NextScheduledRunTimeForTesting() const;

// 强制发布成熟的延迟任务(例如,当时间被模拟并且比 ServiceThread 上的实时延迟更快时)。
void ProcessRipeDelayedTasksForTesting();
Private函数

以下均为Private函数,不对外提供,仅用来进行对线程池的管控

代码语言:C++
复制
// 在更新 |has_fence_| 或 |has_best_effort_fence_| 后调用。设置 TaskTracker 中的 CanRunPolicy,并根据需要唤醒工作线程。
void UpdateCanRunPolicy();

// 如果设置了 |all_tasks_user_blocking_|,则将 |traits| 的优先级设置为 TaskPriority::USER_BLOCKING。
TaskTraits SetUserBlockingPriorityIfNeeded(TaskTraits traits) const;

// 报告心跳指标。
void ReportHeartbeatMetrics() const;

// 获取适用于给定 TaskTraits 的 ThreadGroup。
const ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) const;

// ThreadGroup::Delegate:
// 获取适用于给定 TaskTraits 的 ThreadGroup。
ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override;

// 将 |task| 作为 |sequence| 的一部分发布以由适当的线程组执行。在 |task| 经过 TaskTracker::WillPostTask() 和 |task| 的延迟运行时间之后才能调用此方法。
bool PostTaskWithSequenceNow(Task task, scoped_refptr<Sequence> sequence);

// PooledTaskRunnerDelegate:
bool PostTaskWithSequence(Task task, scoped_refptr<Sequence> sequence) override;
bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
bool IsRunningPoolWithTraits(const TaskTraits& traits) const override;
void UpdatePriority(scoped_refptr<TaskSource> task_source, TaskPriority priority) override;
成员变量
代码语言:C++
复制
// base/task/thread_pool 中所有类使用的时钟实例。必须比其他所有内容存活时间更长,以确保 
Now() 的一致性。
ThreadPoolClock thread_pool_clock_;

const std::unique_ptr<TaskTrackerImpl> task_tracker_;
std::unique_ptr<Thread> service_thread_;
DelayedTaskManager delayed_task_manager_;
PooledSingleThreadTaskRunnerManager single_thread_task_runner_manager_;

// 表示所有任务都被视为以 TaskPriority::USER_BLOCKING 发布。由于此标志在 Start() 中设置,因此不适用于在 Start() 之前发布的任务或在 Start() 之前创建的 TaskRunner。
// TODO(fdoray): 在实验后删除。https://crbug.com/757022
AtomicFlag all_tasks_user_blocking_;

std::unique_ptr<ThreadGroup> foreground_thread_group_;
std::unique_ptr<ThreadGroupImpl> background_thread_group_;

// 指示此 TaskScheduler 是否已启动。访问受 |sequence_checker_| 控制。
bool started_ = false;

// 是否 --disable-best-effort-tasks 开关阻止执行 BEST_EFFORT 任务直到关闭。
const bool has_disable_best_effort_switch_;

// 是否有一个阻止执行任何/ BEST_EFFORT 优先级任务的 fence。访问受 |sequence_checker_| 控制。
bool has_fence_ = false;
bool has_best_effort_fence_ = false;

#if DCHECK_IS_ON()
// 一旦 JoinForTesting() 返回,就会设置。
AtomicFlag join_for_testing_returned_;
#endif

#if defined(OS_WIN) && defined(COM_INIT_CHECK_HOOK_ENABLED)
// 为支持的构建提供 COM 初始化验证。
base::win::ComInitCheckHook com_init_check_hook_;
#endif

// 断言操作与 Start() 顺序一致。
SEQUENCE_CHECKER(sequence_checker_);

TrackedRefFactoryThreadGroup::Delegate tracked_ref_factory_;

// 不允许拷贝
DISALLOW_COPY_AND_ASSIGN(ThreadPoolImpl);

使用示例

老规矩,直接上代码,里面附有对应注释说明:

代码语言:C++
复制
#define kMaxNumForegroundThreads 4;

class MyObserver : public base::WorkerThreadObserver {
public:
    // Invoked at the beginning of the main function of a ThreadPool worker,
    // before any task runs.
    void OnWorkerThreadMainEntry() override {
        L_TRACE(L"%s", __FUNCTIONW__);
    }

    // Invoked at the end of the main function of a ThreadPool worker, when it
    // can no longer run tasks.
    void OnWorkerThreadMainExit() override {
        L_TRACE(L"%s", __FUNCTIONW__);
    };
};


int main() {
    // 定义最大线程池后台线程个数
    int max_num_foreground_threads = kMaxNumForegroundThreads;
    // 初始化线程池参数
    base::ThreadPoolInstance::InitParams init_params(max_num_foreground_threads);

  	// 创建线程池实例,此时线程未启动
    std::unique_ptr<base::internal::ThreadPoolImpl> thread_pool_ = std::make_unique<base::internal::ThreadPoolImpl>("Test Thread Pool");

    // 定义线程池工作线程观察者
    MyObserver observer;

    // 线程池线程启动,不会一下子到最大线程数
    thread_pool_->Start(init_params, &observer);
  
    // 定义线程任务细节
    base::TaskTraits traits;
    for (int i = 0; i < 10; ++i) {
        // 抛延时异步任务
        thread_pool_->PostDelayedTask(FROM_HERE, traits, base::BindOnce(static_cast<void (*)(base::TimeDelta)>(
            &base::PlatformThread::Sleep),
            base::TimeDelta::FromMilliseconds(20)), base::TimeDelta::FromMilliseconds(20));
    }
    L_TRACE(L"%s", __FUNCTIONW__);
    
    // 等待线程池所有线程任务做完
    thread_pool_->FlushForTesting();
    // 等待线程池所有线程退出
    thread_pool_->JoinForTesting();

    L_TRACE(L"%s", __FUNCTIONW__);
}

谢谢各位看到这里,如果有感兴趣的模块或者代码需要攻略,也可以留言,会不定时更新。喜欢可以去github点点赞,再次感谢🙏

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码
  • 前言
  • 介绍
  • ThreadPoolInstance::InitParams
  • WorkerThreadObserver
  • ThreadPoolInstance
  • ThreadPool中相关源码解析
    • ScopedExecutionFence
      • 析构函数
        • 创建
          • 启动
            • 等待
              • 终止
                • 全局实例
                  • 其他
                  • ThreadPoolImpl
                    • 源码解析
                      • 构造
                      • 继承(均为Public属性)
                      • Public函数
                      • Private函数
                      • 成员变量
                  • 使用示例
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档