先附上可用于学习的开源代码:Base库
喜欢可以帮忙Star一下
编译:参考Base库即可
环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17
另:本章节需要提前了解ThreadPool
这个类是管理线程池中线程的核心类,包含线程的创建、观察、阻塞、回收等事件,ThreadGroup类提供接口,ThreadGroupImpl类提供具体实现。
线程组的接口和基本实现。线程组是线程池中的一部分线程的子集(有关在发布任务和创建任务运行器时选择线程组的逻辑,请参阅 GetThreadGroupForTraits())。
这个Delegate类对于ThreadGroup意味着它提供了一种机制来确定任务应该被重新入队的线程组。当ThreadGroup运行完一个任务后,如果任务源(TaskSource)仍然非空,就会调用Delegate的GetThreadGroupForTraits方法来确定应该将任务源重新入队到哪个线程组中。
Delegate类是一个接口类,它定义了一个纯虚函数GetThreadGroupForTraits,需要由具体的实现类来实现。通过实现这个方法,可以根据任务的特性(TaskTraits)来决定任务源应该重新入队到哪个线程组中。
这种设计可以让ThreadGroup在处理任务时,根据任务的特性选择合适的线程组来处理任务,从而实现任务的分配和调度。通过Delegate类的实现,可以根据具体的需求和策略来决定任务应该被重新入队到哪个线程组中,以实现更灵活和高效的任务处理。
class BASE_EXPORT Delegate {
public:
virtual ~Delegate() = default;
// 当 ThreadGroup 在从非空的 TaskSource 中运行任务后,调用此方法。实现必须返回应该重新入队的 TaskSource 所在的线程组。
virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
};
这个枚举类WorkerEnvironment对于ThreadGroup意味着它定义了不同的工作环境选项,用于指定在工作线程中初始化的特殊环境。
具体来说,WorkerEnvironment枚举类包含以下选项:
通过使用WorkerEnvironment枚举类,可以在创建ThreadGroup时指定工作线程的特殊环境要求。这样可以确保工作线程在运行任务时具备所需的环境,以满足特定的需求和使用场景。例如,在使用COM组件的情况下,可以选择适当的工作环境来确保COM组件的正确初始化和使用。
enum class WorkerEnvironment {
// No special worker environment required.
NONE,
#if defined(OS_WIN)
// Initialize a COM MTA on the worker.
COM_MTA,
// Initialize a COM STA on the worker.
COM_STA,
#endif // defined(OS_WIN)
};
将ThreadGroup与当前线程绑定有以下好处:
总的来说,将ThreadGroup与当前线程绑定可以提供更好的任务分配、上下文共享、线程安全性和代码简化。这对于多线程编程中的任务调度和协作非常有用,可以提高系统的性能、可维护性和可扩展性。
// Registers the thread group in TLS.
void BindToCurrentThread();
// Resets the thread group in TLS.
void UnbindFromCurrentThread();
// Returns true if the thread group is registered in TLS.
bool IsBoundToCurrentThread() const;
在ThreadGroup类中,这些方法的含义如下:
这些方法提供了一种管理ThreadGroup与线程之间关联的机制。通过绑定和解绑ThreadGroup与线程,可以确保在正确的线程上执行任务,并在需要时进行相应的操作和处理。
这些方法在ThreadGroup类中具有以下含义:
附上源码:
// Removes |task_source| from |priority_queue_|. Returns a
// RegisteredTaskSource that evaluats to true if successful, or false if
// |task_source| is not currently in |priority_queue_|, such as when a worker
// is running a task from it.
RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source);
// Updates the position of the TaskSource in |transaction_with_task_source| in
// this ThreadGroup's PriorityQueue based on the TaskSource's current traits.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke UpdateSortKeyImpl().
virtual void UpdateSortKey(TransactionWithOwnedTaskSource transaction_with_task_source) = 0;
// Pushes the TaskSource in |transaction_with_task_source| into this
// ThreadGroup's PriorityQueue and wakes up workers as appropriate.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke PushTaskSourceAndWakeUpWorkersImpl().
virtual void PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source) = 0;
// Removes all task sources from this ThreadGroup's PriorityQueue and enqueues
// them in another |destination_thread_group|. After this method is called,
// any task sources posted to this ThreadGroup will be forwarded to
// |destination_thread_group|.
//
// TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool
// experiment is complete.
void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup* destination_thread_group);
// Returns true if a task with |priority| running in this thread group should
// return ASAP, either because this priority is not allowed to run or because
// work of higher priority is pending. Thread-safe but may return an outdated
// result (if a task unnecessarily yields due to this, it will simply be
// re-scheduled).
bool ShouldYield(TaskPriority priority) const;
// Prevents new tasks from starting to run and waits for currently running
// tasks to complete their execution. It is guaranteed that no thread will do
// work on behalf of this ThreadGroup after this returns. It is
// invalid to post a task once this is called. TaskTracker::Flush() can be
// called before this to complete existing tasks, which might otherwise post a
// task during JoinForTesting(). This can only be called once.
virtual void JoinForTesting() = 0;
// Returns the maximum number of non-blocked tasks that can run concurrently
// in this ThreadGroup.
//
// TODO(fdoray): Remove this method. https://crbug.com/687264
virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0;
// Reports relevant metrics per implementation.
virtual void ReportHeartbeatMetrics() const = 0;
// Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
// called after an update to CanRunPolicy in TaskTracker.
virtual void DidUpdateCanRunPolicy() = 0;
这是一个基类BaseScopedWorkersExecutor的定义,派生类必须从这个基类派生出一个ScopedWorkersExecutor类来执行在作用域结束时对工作线程的操作,当所有锁都已释放。
BaseScopedWorkersExecutor类具有以下成员函数和成员变量:
派生类应该从BaseScopedWorkersExecutor派生出一个具体的ScopedWorkersExecutor类,并实现特定的操作,以在作用域结束时对工作线程执行相应的操作。这个基类提供了一个框架和基础设施,用于管理待释放的任务源,并在适当的时候执行相应的操作。
class BaseScopedWorkersExecutor {
public:
void ScheduleReleaseTaskSource(RegisteredTaskSource task_source);
protected:
BaseScopedWorkersExecutor();
~BaseScopedWorkersExecutor();
private:
std::vector<RegisteredTaskSource> task_sources_to_release_;
DISALLOW_COPY_AND_ASSIGN(BaseScopedWorkersExecutor);
};
这是一个ScopedReenqueueExecutor类的定义,它允许在作用域结束时,当所有锁都已释放时,将任务源推送到ThreadGroup的优先级队列中。
ScopedReenqueueExecutor类具有以下成员函数和成员变量:
class ScopedReenqueueExecutor {
public:
ScopedReenqueueExecutor();
~ScopedReenqueueExecutor();
// A TransactionWithRegisteredTaskSource and the ThreadGroup in which it
// should be enqueued.
void SchedulePushTaskSourceAndWakeUpWorkers(
TransactionWithRegisteredTaskSource transaction_with_task_source,
ThreadGroup* destination_thread_group);
private:
// A TransactionWithRegisteredTaskSource and the thread group in which it
// should be enqueued.
Optional<TransactionWithRegisteredTaskSource> transaction_with_task_source_;
ThreadGroup* destination_thread_group_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(ScopedReenqueueExecutor);
};
了解了以上ThreadGroup相关类后,再来了解一下ThreadGroup本身的源码实现
ThreadGroup(TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate,
ThreadGroup* predecessor_thread_group = nullptr);
const TrackedRef<TaskTracker> task_tracker_;
const TrackedRef<Delegate> delegate_;
这是ThreadGroup类的构造函数的定义。它接受一个TaskTracker对象的TrackedRef,一个Delegate对象的TrackedRef,以及一个可选的predecessor_thread_group参数。
构造函数的参数含义如下:
这个构造函数用于创建一个ThreadGroup对象,并初始化相关的成员变量和状态。通过传递TaskTracker和Delegate对象,可以将ThreadGroup与任务跟踪器和委托操作关联起来。predecessor_thread_group参数用于在构建的ThreadGroup的锁之前获取前任ThreadGroup的锁,以便在任务源转移过程中保持同步。
需要注意的是,predecessor_thread_group参数是一个可选参数,并且在UseNativeThreadPool实验完成后可能会被移除。
ThreadGroup* replacement_thread_group_ = nullptr;
这是ThreadGroup类的一个成员变量replacementthread_group的定义。如果replacementthread_group非空,则表示当前ThreadGroup无效,所有的任务源应该被调度到replacementthread_group上。这个成员变量用于支持UseNativeThreadPool实验。
当replacementthread_group非空时,表示当前ThreadGroup已经被替换,不再有效。所有的任务源应该被重新调度到replacementthread_group上,以确保任务的正确执行。这个机制可以用于实现线程池的替换和迁移,以支持系统的动态调整和优化。
size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
virtual void EnsureEnoughWorkersLockRequired(
BaseScopedWorkersExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;
void ReEnqueueTaskSourceLockRequired(
BaseScopedWorkersExecutor* workers_executor,
ScopedReenqueueExecutor* reenqueue_executor,
TransactionWithRegisteredTaskSource transaction_with_task_source)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
RunIntentWithRegisteredTaskSource TakeRunIntentWithRegisteredTaskSource(
BaseScopedWorkersExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Must be invoked by implementations of the corresponding non-Impl() methods.
void UpdateSortKeyImpl(
BaseScopedWorkersExecutor* executor,
TransactionWithOwnedTaskSource transaction_with_task_source);
void PushTaskSourceAndWakeUpWorkersImpl(
BaseScopedWorkersExecutor* executor,
TransactionWithRegisteredTaskSource transaction_with_task_source);
mutable CheckedLock lock_;
PriorityQueue priority_queue_ GUARDED_BY(lock_);
std::atomic<TaskPriority> min_allowed_priority_ GUARDED_BY(lock_) {
TaskPriority::BEST_EFFORT
};
这段代码展示了ThreadGroup类中一些与线程调度和任务源管理相关的函数和成员变量。
ThreadGroup类是一组运行任务的工作线程。在调用Start()之前,线程组不会创建线程。任务可以在任何时候发布,但直到调用Start()之后才会运行。
这个类是线程安全的,这意味着它可以在多个线程中同时使用而不会导致竞争条件或数据损坏。多个线程可以同时调用ThreadGroup的成员函数,而不需要额外的同步机制。这是通过在关键的代码段上使用锁来实现的,以确保对共享数据的访问是互斥的。
线程安全性对于多线程环境中的任务调度和执行非常重要。ThreadGroup类的线程安全性确保了任务的正确执行和数据的一致性,同时避免了竞争条件和数据损坏的问题。这使得开发人员可以在多线程环境中使用ThreadGroup类,而不必担心并发问题。
ThreadGroupImpl(StringPiece histogram_label,
StringPiece thread_group_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate);
这是ThreadGroupImpl类的构造函数的定义。它接受以下参数:
这个构造函数用于创建一个ThreadGroupImpl对象,并初始化相关的成员变量和状态。通过传递histogram_label、thread_group_label、priority_hint、task_tracker和delegate,可以将ThreadGroupImpl与直方图、线程标签、优先级提示、任务跟踪器和委托操作关联起来。这些参数提供了构建和管理ThreadGroupImpl对象所需的基本信息和功能。
void Start(int max_tasks,
int max_best_effort_tasks,
TimeDelta suggested_reclaim_time,
scoped_refptr<TaskRunner> service_thread_task_runner,
WorkerThreadObserver* worker_thread_observer,
WorkerEnvironment worker_environment,
Optional<TimeDelta> may_block_threshold = Optional<TimeDelta>());
这是ThreadGroupImpl类的Start函数的定义。它接受以下参数:
这个函数用于创建线程,并允许现有和未来的任务运行。线程组最多同时运行max_tasks / max_best_effort_tasks个任意/BEST_EFFORT优先级的未阻塞任务。在suggested_reclaim_time之后,它会回收未使用的线程。它使用service_thread_task_runner来监视阻塞任务。如果指定了worker_thread_observer,它会在工作线程进入和退出其主函数时通知观察者。worker_environment参数指定任务执行的环境。may_block_threshold参数是一个可选的超时时间,用于指定MAY_BLOCK ScopedBlockingCall中的任务被视为阻塞。这个函数只能调用一次。
函数内部会根据传入的参数进行一系列的初始化和配置操作,包括:
通过这些配置和初始化操作,ThreadGroupImpl开始运行,并允许任务在工作线程中执行。工作线程的数量和行为受到max_tasks、max_best_effort_tasks和其他配置参数的限制和控制。
需要注意的是,这个函数只能被调用一次。如果调用多次,会触发CHECK失败。这是为了确保ThreadGroupImpl只能被启动一次,避免重复创建和启动工作线程。
~ThreadGroupImpl() override;
这是ThreadGroupImpl类的析构函数的定义。它是一个虚析构函数,并且有一个override修饰符。
这段注释说明了ThreadGroupImpl对象的销毁规则:
void WaitForWorkersIdleForTesting(size_t n);
// Waits until at least |n| workers are idle.
void WaitForWorkersIdleLockRequiredForTesting(size_t n)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Waits until all workers are idle.
void WaitForAllWorkersIdleForTesting();
// Waits until |n| workers have cleaned up (since the last call to
// WaitForWorkersCleanedUpForTesting() or Start() if it wasn't called yet).
void WaitForWorkersCleanedUpForTesting(size_t n);
// Returns the number of workers in this thread group.
size_t NumberOfWorkersForTesting() const;
// Returns |max_tasks_|.
size_t GetMaxTasksForTesting() const;
// Returns the number of workers that are idle (i.e. not running tasks).
size_t NumberOfIdleWorkersForTesting() const;
这些函数是ThreadGroupImpl类中用于测试的函数,用于等待和获取关于工作线程状态的信息。
这些函数提供了一些用于测试目的的功能,可以用于等待工作线程的特定状态或获取线程组的相关信息。这对于编写测试用例和验证线程组的行为非常有用。例如,可以使用WaitForWorkersIdleForTesting()等待工作线程空闲,然后执行某些操作来验证任务的执行情况。或者使用WaitForWorkersCleanedUpForTesting()等待工作线程完成清理操作,以确保资源的正确释放。
// Values set at Start() and never modified afterwards.
struct InitializedInStart {
InitializedInStart();
~InitializedInStart();
#if DCHECK_IS_ON()
// Set after all members of this struct are set.
bool initialized = false;
#endif
// Initial value of |max_tasks_|.
size_t initial_max_tasks = 0;
// Suggested reclaim time for workers.
TimeDelta suggested_reclaim_time;
// Environment to be initialized per worker.
WorkerEnvironment worker_environment = WorkerEnvironment::NONE;
scoped_refptr<TaskRunner> service_thread_task_runner;
// Optional observer notified when a worker enters and exits its main.
WorkerThreadObserver* worker_thread_observer = nullptr;
bool may_block_without_delay;
// Threshold after which the max tasks is increased to compensate for a
// worker that is within a MAY_BLOCK ScopedBlockingCall.
TimeDelta may_block_threshold;
// The period between calls to AdjustMaxTasks() when the thread group is at
// capacity.
TimeDelta blocked_workers_poll_period;
} initialized_in_start_;
这是ThreadGroupImpl类中的一个嵌套结构体InitializedInStart的定义。这个结构体包含了在Start()函数中设置并在之后不再修改的一些值。
结构体成员包括:
这些成员变量在Start()函数中被设置,并在之后的运行中保持不变。它们存储了一些线程组的初始配置和参数,用于控制线程组的行为和性能。
配套函数如下:
InitializedInStart& in_start() {
#if DCHECK_IS_ON()
DCHECK(!initialized_in_start_.initialized);
#endif
return initialized_in_start_;
}
const InitializedInStart& after_start() const {
#if DCHECK_IS_ON()
DCHECK(initialized_in_start_.initialized);
#endif
return initialized_in_start_;
}
const std::string thread_group_label_;
const ThreadPriority priority_hint_;
// All workers owned by this thread group.
std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);
// Maximum number of tasks of any priority / BEST_EFFORT priority that can run
// concurrently in this thread group.
size_t max_tasks_ GUARDED_BY(lock_) = 0;
size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;
// Number of tasks of any priority / BEST_EFFORT priority that are currently
// running in this thread group.
size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;
// Number of workers running a task of any priority / BEST_EFFORT priority
// that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
// caused a max tasks increase yet.
int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;
// Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when
// it receives work from GetWork() (a worker calls GetWork() when its sleep
// timeout expires, even if its WakeUp() method hasn't been called). A worker
// is pushed on this stack when it receives nullptr from GetWork().
WorkerThreadStack idle_workers_stack_ GUARDED_BY(lock_);
// Signaled when a worker is added to the idle workers stack.
std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_
GUARDED_BY(lock_);
// Stack that contains the timestamps of when workers get cleaned up.
// Timestamps get popped off the stack as new workers are added.
base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_
GUARDED_BY(lock_);
// Whether an AdjustMaxTasks() task was posted to the service thread.
bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;
// Indicates to the delegates that workers are not permitted to cleanup.
bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;
// Counts the number of workers cleaned up since the last call to
// WaitForWorkersCleanedUpForTesting() (or Start() if it wasn't called yet).
// |some_workers_cleaned_up_for_testing_| is true if this was ever
// incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
// specific number of workers being cleaned up via
// WaitForWorkersCleanedUpForTesting().
size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
#if DCHECK_IS_ON()
bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
#endif
// Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
// incremented.
std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
GUARDED_BY(lock_);
#if DCHECK_IS_ON()
// Set at the start of JoinForTesting().
AtomicFlag join_for_testing_started_;
#endif
这些成员变量是ThreadGroupImpl类中的一些状态变量和数据结构,用于管理和跟踪线程组的状态和工作线程的情况。
// ThreadPool.DetachDuration.[thread group name] histogram. Intentionally
// leaked.
HistogramBase* const detach_duration_histogram_;
// ThreadPool.NumTasksBeforeDetach.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_tasks_before_detach_histogram_;
// ThreadPool.NumTasksBetweenWaits.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_tasks_between_waits_histogram_;
// ThreadPool.NumWorkers.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_workers_histogram_;
// ThreadPool.NumActiveWorkers.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_active_workers_histogram_;
// Ensures recently cleaned up workers (ref.
// WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
// they have a raw reference to |this| (and to TaskTracker) which can
// otherwise result in racy use-after-frees per no longer being part of
// |workers_| and hence not being explicitly joined in JoinForTesting():
// https://crbug.com/810464. Uses AtomicRefCount to make its only public
// method thread-safe.
TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
这些成员变量是ThreadGroupImpl类中的一些标识和引用计数相关的变量。
谢谢各位看到这里,如果有感兴趣的模块或者代码需要攻略,也可以留言,会不定时更新。喜欢可以去github点点赞,再次感谢🙏
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。