由于项目需求,需要开发一些程序去爬取一些网站的信息,算是小爬虫程序吧。爬网页这东西是要经过网络传输,如果程序运行起来串行执行请求爬取,会很慢,我想没人会这样做。为了提高爬取效率,必须使用多线程并行爬取。这时候就需要线程池了。池的概念,我想做开发的都应该知道,目的就是对资源的合理运用。刚开始的时候,我首先想到的就是 .net 框架下的线程池 ThreadPool,毕竟是自带的,在性能、稳定性方面肯定没问题。但在琢磨了几天后,.net 框架下自带的这个 ThreadPool 让我很不放心。1.ThreadPool 是一个静态类!!也就是说,当程序运行起来以后,这个池是整个应用程序域共享的,.net 框架很大,程序运行了以后,除了咱们自己往这个共享池里塞任务,谁知道有没有其他的类、代码、任务也会往里塞 workItem 呢?也就是说,假如我设置这个共享池大小为 10,但实际为我们工作的线程会不到 10 个,这就会导致程序运行时达不到我们预期的效果。2.目前我们的爬虫程序设计是像一个服务一样挂着,只要程序启动了以后就会一直运行着,除非手动停止。因此,在没有爬取任务的时候,需要减少甚至清空池内的所有线程,以免池内线程一直挂着占用系统资源。由于 .net 自带的这个线程池是共享的,我还真不敢随意调整它的大小,对于我这种控制欲极强的程序员来说,这是万万接受不了的。虽然.net 自带的 ThreadPool 用法简单,功能强大,而且它还可以智能的调节池内线程池数量,但我还是决定抛弃它,因为,我需要一个可控的线程池!于是开始到网上到处查找有没有其它现成的线程池。百度、谷歌了好久,发现在.net界比较成熟的就 SmartThreadPool,对 SmartThreadPool 简单了解以后,还是觉得它不是我想要的,于是决定,自造一个。于是,借助强大的网络,又开始网上到处搜索有关线程池如何实现以及相关注意事项之类的信息,也拜读过一些网上开源的项目,如 .net 的 SmartThreadPool、java 里的 ThreadPoolExecutor 等,虽然没接触过 java,但 java 和 C# 犹如亲兄弟,大同小异,让我这 .net coder 读起来不是很费劲。基于前人的实现思路,再融入自己的思想,脑中自己的池也慢慢浮现…
根据需求,首先定义基本接口
public interface IThreadPool : IDisposable
{
/// <summary>
/// 线程池大小
/// </summary>
int Threads { get; set; }
/// <summary>
/// 一个以毫秒为单位的值,表示从最后一个活动的线程执行完任务后开始计时,在指定的时间内线程池都没有接收到任何任务,则释放掉池内的所有线程。若设置值小于 0,则不会释放池内线程。如未指定,默认为 -1。
/// </summary>
double KeepAliveTime { get; set; }
/// <summary>
/// 获取当前线程池内的空闲线程数量
/// </summary>
/// <returns></returns>
int GetAvailableThreads();
/// <summary>
/// 获取当前线程池内工作项总数
/// </summary>
/// <returns></returns>
int GetWorkCount();
/// <summary>
/// 向线程池中添加工作项
/// </summary>
/// <param name="callback"></param>
/// <param name="state"></param>
/// <returns></returns>
bool QueueWorkItem(WaitCallback callback, object state);
}
可以看到,我定义的线程池接口成员就几个(命名都是来自 .net 自带 ThreadPool,哈哈),它们的用途上面代码也都带有注释。接下来,我们看下核心实现。首先,介绍类 WorkerThread:
internal class WorkerThread : IDisposable
{
Thread _thread;
AutoResetEvent _waitEvent;
Action _action;
bool _disposed = false;
public WorkerThread()
{
this._waitEvent = new AutoResetEvent(false);
this._thread = new Thread(this.Run);
this._thread.IsBackground = true;
this._thread.Start();
}
//是否正在执行工作
public bool IsWorking
{
get;
private set;
}
public event Action<WorkerThread> Complete;
public int ThreadId
{
get
{
return this._thread.ManagedThreadId;
}
}
public ThreadState ThreadState
{
get
{
return this._thread.ThreadState;
}
}
public void SetWork(Action act)
{
this.CheckDisposed();
if (this.IsWorking)
throw new Exception("正在执行工作项");
this._action = act;
}
public void Activate()
{
this.CheckDisposed();
if (this.IsWorking)
throw new Exception("正在执行工作项");
if (this._action == null)
throw new Exception("未设置任何工作项");
this._waitEvent.Set();
}
void Run()
{
while (!this._disposed)
{
this._waitEvent.WaitOne();
if (this._disposed)
break;
try
{
this.IsWorking = true;
this._action();
}
catch (ThreadAbortException)
{
if (!this._disposed)
Thread.ResetAbort();
}
catch (Exception)
{
}
finally
{
this.IsWorking = false;
this._action = null;
var completeEvent = this.Complete;
if (completeEvent != null)
{
try
{
completeEvent(this);
}
catch
{
}
}
}
}
}
void CheckDisposed()
{
if (this._disposed)
throw new ObjectDisposedException(this.GetType().Name);
}
public void Dispose()
{
if (this._disposed)
return;
try
{
this._disposed = true;
//注意:只有调用 Dispose() 的不是当前对象维护的线程才调用 Abort
if (Thread.CurrentThread.ManagedThreadId != this._thread.ManagedThreadId)
{
this._thread.Abort();
}
}
finally
{
this._waitEvent.Dispose();
}
}
}
这个类主要是对线程的封装,我称之为 WorkerThread。它主要负责接收线程池分配的一个任务,然后执行,线程池内维护的就是这么一个类对象。介绍下它的几个核心成员:
WorkerThread 这个类代码也不是很多,百来行而已。总结来说它作用就是 SetWork(Action act) –> Activate() –> _action() –> WaitOne() –> SetWork(Action act) –> Activate()…无限循环…希望大家看得明白。
了解 WorkerThread 了以后,再来看下真正实现 IThreadPool 接口的类 WorkerThreadPool。前面提到,线程池的作用就是池内维护一定数量可重复利用的线程,WorkerThreadPool 负责 WorkerThread 的创建、接收用户任务并将任务分配给池内空闲的线程去执行用户任务,功能其实就这么简单。废话不多说,直接上代码: