/// <summary>
/// 该方法必须在 locked 下执行
/// </summary>
/// <param name="workerThread"></param>
/// <param name="workItem"></param>
/// <param name="workerThreadCall">是否是当前池内的线程调用该方法</param>
/// <returns></returns>
bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall)
{
workerThread = null;
workItem = null;
if (this._workQueue.Count > 0)
{
if (this._freeTreads.Count > 0)
{
workerThread = this._freeTreads.Dequeue();
workItem = this._workQueue.Dequeue();
this._workingTreads.Add(workerThread);
return true;
}
else
{
if (this._allThreads.Count < this._threads)
{
workerThread = new WorkerThread();
workItem = this._workQueue.Dequeue();
this._allThreads.Add(workerThread);
this._workingTreads.Add(workerThread);
return true;
}
return false;
}
}
else
{
if (!workerThreadCall)
return false;
double t = this._keepAliveTime;
if (t < 0)
{
this._workQueue.TrimExcess();
return false;
}
//此代码块只有当前池内的线程完成工作了以后访问到,从 QueueWorkItem 方法调用该方法是不会执行此代码块的,因为 this.workQueue.Count > 0
if (this._freeTreads.Count == this._allThreads.Count && this._workingTreads.Count == 0 && this._freeTreads.Count > 0)
{
/*
*能执行到这,说明池内没有了任何任务,并且是最后一个活动线程执行完毕
*此时从池中取出一个线程来执行 Tick 方法
*/
DateTime now = DateTime.Now;
int threadId = Thread.CurrentThread.ManagedThreadId;
if (this._allThreads.Any(a => a.ThreadId == threadId))//既然只有当前池内的线程能访问到这,这句判断是不是有点多余了- -
{
workerThread = this._freeTreads.Dequeue();//弹出一个 WorkerThread 对象,此时不需将弹出的 WorkerThread 对象放入 workingTreads 队列中,因为该对象是供池内自身计时用,相对外界是不知道的,保证外界调用 GetAvailableThreads 方法能得到一个合理的结果
workItem = new WorkItem((state) =>
{
this.Tick((WorkerThread)state);
}, workerThread);
this._spin = true;
try
{
this._releaseTime = now.AddMilliseconds(t);//设置待释放线程的时间点
}
catch (ArgumentOutOfRangeException)
{
this._releaseTime = DateTime.MaxValue;
}
return true;
}
}
return false;
}
}
void ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem)
{
workerThread.SetWork(workItem.Execute);
workerThread.Complete += this.WorkComplete;
workerThread.Activate();
}
void WorkComplete(WorkerThread workerThread)
{
//避免无法调用终结器,务必将 this.WorkComplete 从 workerThread.Complete 中移除,取出 workerThread 的时候再加上
workerThread.Complete -= this.WorkComplete;
if (this._disposed)
return;
WorkerThread nextWorkerThread = null;
WorkItem nextWorkItem = null;
lock (this._lockObject)
{
if (this._disposed)
return;
this._workingTreads.Remove(workerThread);
this._freeTreads.Enqueue(workerThread);
this.AdjustPool();
if (!this.TryGetWorkerThreadAndWorkItem(out nextWorkerThread, out nextWorkItem, true))
{
return;
}
}
this.ActivateWorkerThread(nextWorkerThread, nextWorkItem);
}
/// <summary>
/// 该方法必须在 locked 下执行
/// </summary>
void AdjustPool()
{
while (this._allThreads.Count > this._threads && this._freeTreads.Count > 0)
{
WorkerThread workerThread = this._freeTreads.Dequeue();
this._allThreads.Remove(workerThread);
workerThread.Dispose();
}
}
这个类代码还是挺多的(貌似有点占篇幅- - ),虽然代码里都加上了注释,但我还是想给大家简单说说其实现思路以及内部一些核心相关成员,方便大家更快的理解。
上面就是 WorkerThreadPool 的一些核心字段以及方法,至于其它的成员就不做详细说明了。 为了方便管理,池内用了 _freeTreads 和 _workingTreads 两个集合来维护池内线程状态。所以每次从空闲线程 _freeTreads 取出 workerThread 执行任务的时候,都必须将 workerThread 添加到 _workingTreads 集合中;每个 workerThread 执行完任务都会将自己从 _workingTreads 移除,同时将自己置为空闲线程添加到 _freeTreads 集合中等待接受下一个任务来临,所以 WorkComplete 方法体内最后都要调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及一个待处理的任务,然后执行,这样就形成了一个循环,只要有任务,池内就会一直处于满负荷状态。
开篇提到一个需求:没有爬取任务的时候,需要减少甚至清空池内的所有线程,以免池内线程一直挂着占用系统资源。因此我给 IThreadPool 加了一个属性:KeepAliveTime。通过这个属性,可以给线程池设定一个时间,即线程池在指定的时间内都没有接收到任何任务,则会自行将池内的线程给销毁。在 WorkerThreadPool 中这个功能的实现很简单,在最后一个任务被执行完了以后,会自动从池内取出一个空闲的 workerThread 执行计时操作,也就是 WorkerThreadPool.Tick 方法,其实现也就是自旋计时,如果过了指定时间后都没有接受到任务,则自动将池内的线程给销毁。这个计时实现很简陋- - ,技术有限,想不到其它好办法了。
我们的这个线程池设计简单,功能不是很强,但很适合我们现在的程序,至少让我用的安心。目前已经在服务器上跑了一年半,一切都很正常。小程进入园子已有3年,在这么好的平台上小程一直都只知道汲取,却从未想过回报。因此,我想给大家分享点东西,虽然这个 WorkerThreadPool 简单,没什么高深的技术,但也算是个小结晶。如果大家有好建议,小程将万分感谢!