分享一个自制的 .net线程池2

        /// <summary>
        /// 该方法必须在 locked 下执行
        /// </summary>
        /// <param name="workerThread"></param>
        /// <param name="workItem"></param>
        /// <param name="workerThreadCall">是否是当前池内的线程调用该方法</param>
        /// <returns></returns>
        bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall)
        {
            workerThread = null;
            workItem = null;


            if (this._workQueue.Count > 0)
            {
                if (this._freeTreads.Count > 0)
                {
                    workerThread = this._freeTreads.Dequeue();
                    workItem = this._workQueue.Dequeue();
                    this._workingTreads.Add(workerThread);


                    return true;
                }
                else
                {
                    if (this._allThreads.Count < this._threads)
                    {
                        workerThread = new WorkerThread();
                        workItem = this._workQueue.Dequeue();
                        this._allThreads.Add(workerThread);
                        this._workingTreads.Add(workerThread);


                        return true;
                    }
                    return false;
                }
            }
            else
            {
                if (!workerThreadCall)
                    return false;


                double t = this._keepAliveTime;
                if (t < 0)
                {
                    this._workQueue.TrimExcess();
                    return false;
                }


                //此代码块只有当前池内的线程完成工作了以后访问到,从 QueueWorkItem 方法调用该方法是不会执行此代码块的,因为 this.workQueue.Count > 0
                if (this._freeTreads.Count == this._allThreads.Count && this._workingTreads.Count == 0 && this._freeTreads.Count > 0)
                {
                    /*
                     *能执行到这,说明池内没有了任何任务,并且是最后一个活动线程执行完毕
                     *此时从池中取出一个线程来执行 Tick 方法
                     */
                    DateTime now = DateTime.Now;
                    int threadId = Thread.CurrentThread.ManagedThreadId;
                    if (this._allThreads.Any(a => a.ThreadId == threadId))//既然只有当前池内的线程能访问到这,这句判断是不是有点多余了- -
                    {
                        workerThread = this._freeTreads.Dequeue();//弹出一个 WorkerThread 对象,此时不需将弹出的 WorkerThread 对象放入 workingTreads 队列中,因为该对象是供池内自身计时用,相对外界是不知道的,保证外界调用 GetAvailableThreads 方法能得到一个合理的结果
                        workItem = new WorkItem((state) =>
                       {
                           this.Tick((WorkerThread)state);
                       }, workerThread);


                        this._spin = true;
                        try
                        {
                            this._releaseTime = now.AddMilliseconds(t);//设置待释放线程的时间点
                        }
                        catch (ArgumentOutOfRangeException)
                        {
                            this._releaseTime = DateTime.MaxValue;
                        }


                        return true;
                    }
                }


                return false;
            }
        }


        void ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem)
        {
            workerThread.SetWork(workItem.Execute);
            workerThread.Complete += this.WorkComplete;
            workerThread.Activate();
        }
        void WorkComplete(WorkerThread workerThread)
        {
            //避免无法调用终结器,务必将  this.WorkComplete 从 workerThread.Complete 中移除,取出 workerThread 的时候再加上
            workerThread.Complete -= this.WorkComplete;
            if (this._disposed)
                return;


            WorkerThread nextWorkerThread = null;
            WorkItem nextWorkItem = null;


            lock (this._lockObject)
            {
                if (this._disposed)
                    return;


                this._workingTreads.Remove(workerThread);
                this._freeTreads.Enqueue(workerThread);
                this.AdjustPool();


                if (!this.TryGetWorkerThreadAndWorkItem(out  nextWorkerThread, out  nextWorkItem, true))
                {
                    return;
                }
            }


            this.ActivateWorkerThread(nextWorkerThread, nextWorkItem);
        }


        /// <summary>
        /// 该方法必须在 locked 下执行
        /// </summary>
        void AdjustPool()
        {
            while (this._allThreads.Count > this._threads && this._freeTreads.Count > 0)
            {
                WorkerThread workerThread = this._freeTreads.Dequeue();
                this._allThreads.Remove(workerThread);
                workerThread.Dispose();
            }
        }

这个类代码还是挺多的(貌似有点占篇幅- - ),虽然代码里都加上了注释,但我还是想给大家简单说说其实现思路以及内部一些核心相关成员,方便大家更快的理解。

  • _threads:一个类型为 int 的字段。表示当前池的大小
  • _allThreads:一个类型为 List<WorkerThread> 的字段。用于存储池内创建的所有 WorkerThread
  • _workingTreads:一个类型为 List<WorkerThread> 的字段。用于存储正在执行任务的 WorkerThread
  • _freeTreads:一个类型为 Queue<WorkerThread> 的字段。用于存储处于空闲状态的 WorkerThread
  • _workQueue:一个类型为 Queue<WorkItem> 的字段。用于存储用户往当前池里塞的所有任务
  • SetPoolSize(int threads):设置线程池大小。这个方法主要做了两件事:1.设置线程池大小,也就是字段 _threads 的值。2.调整线程池内线程。当设置的值小于当前池内的大小时,则释放掉多出的空闲线程;当设置的值大于当前池大小时,如果 _workQueue 队列有待处理的任务的话,会尝试着创建新的 WorkerThread 去执行 _workQueue 队列里的任务,目的就是为了使当前池一直处于满负荷状态。
  • bool QueueWorkItem(WaitCallback callback, object state):向线程池中添加任务。每次调用这个方法,都会将 callback 和 state 封装成一个 WorkItem,然后将封装的 WorkItem 对象放入 _workQueue 队列。然后尝试调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及 _workQueue 队列第一个任务,如果获取成功(即有可用的 WorkerThread 和待处理的 WorkItem),就会将取出的 WorkItem 分配给取出的 WorkerThread 去执行。
  • bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall):尝试从池内取出一个处于空闲的 WorkerThread 和待处理的 WorkItem。这个方法的实现不是很复杂,如果池内有空闲的 WorkerThread 和待处理的 WorkItem,则返回 true,否则返回 false。目前我们这个线程池内 WorkerThread 的创建不是伴随线程池创建而创建,而是真正需要用到的时候才会去创建。即当有任务往池里塞的时候,首先会判断 _freeTreads 集合内是否有空闲的 WorkerThread,如果有,则弹出一个空闲的 WorkerThread 去执行任务,同时将弹出的 WorkerThread 添加到 _workingTreads 集合中,没有的话才会去创建新的 WorkerThread 去执行任务,同时也会将新建的 WorkerThread 添加到 _workingTreads 集合中。
  • ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem):这个方法体内的实现很简单,就是将 workItem 分配给 workerThread,同时调用 workerThread.Activate() 激活线程执行任务,调用 workerThread.Activate()会将当前池内的方法 WorkComplete(WorkerThread workerThread) 绑定到 workerThread 定义的 Complete 事件上,每当 workerThread 执行完任务以后,都会触发 workerThread.Complete 事件,以通知其所在的线程池。
  • WorkComplete(WorkerThread workerThread):每当 workerThread 执行完任务以后,都会调用该方法。该方法参数是一个 WorkerThread 对象,也就是说每个 workerThread 执行完任务后都会将自己作为参数调用这个方法。在这个方法内主要是做三件事:1.将执行完任务的 workerThread 从 _workingTreads 集合中移除,然后将 workerThread 添加到空闲线程队列 _freeTreads 中。2.调整线程池线程(如果有必要的话),为什么在这要进行调整线程池呢?因为会出现这种情况,比如当前线程池大小是 10,正在工作的线程为 6 个,空闲线程也就是 4 个,这时候我们调用 SetPoolSize(5),也就是将线程池大小设置为 5,减少了线程池的容量,虽然在 SetPoolSize 方法内会调整了一遍线程池大小,但 SetPoolSize 方法内只会销毁掉空闲的线程,也就是 4 个空闲线程会被销毁,这时候池内其实还是存在 6 个线程。所以还需要销毁一个,这时候怎么办呢?不可能在 SetPoolSize 方法内把正在执行任务的线程给终止掉吧?因此,workerThread 每次执行完任务后都要执行一次调整线程池的操作,以保证池内的线程数量是正确的。3.调用 TryGetWorkerThreadAndWorkItem 方法,如果有待处理的任务的话,则继续处理下一个任务,这样就达到了持续处理 _workQueue 队列内任务的目的。

上面就是 WorkerThreadPool 的一些核心字段以及方法,至于其它的成员就不做详细说明了。 为了方便管理,池内用了 _freeTreads 和 _workingTreads 两个集合来维护池内线程状态。所以每次从空闲线程 _freeTreads 取出 workerThread 执行任务的时候,都必须将 workerThread 添加到 _workingTreads 集合中;每个 workerThread 执行完任务都会将自己从 _workingTreads 移除,同时将自己置为空闲线程添加到 _freeTreads 集合中等待接受下一个任务来临,所以 WorkComplete 方法体内最后都要调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及一个待处理的任务,然后执行,这样就形成了一个循环,只要有任务,池内就会一直处于满负荷状态。

开篇提到一个需求:没有爬取任务的时候,需要减少甚至清空池内的所有线程,以免池内线程一直挂着占用系统资源。因此我给 IThreadPool 加了一个属性:KeepAliveTime。通过这个属性,可以给线程池设定一个时间,即线程池在指定的时间内都没有接收到任何任务,则会自行将池内的线程给销毁。在 WorkerThreadPool 中这个功能的实现很简单,在最后一个任务被执行完了以后,会自动从池内取出一个空闲的 workerThread 执行计时操作,也就是 WorkerThreadPool.Tick 方法,其实现也就是自旋计时,如果过了指定时间后都没有接受到任务,则自动将池内的线程给销毁。这个计时实现很简陋- - ,技术有限,想不到其它好办法了。

我们的这个线程池设计简单,功能不是很强,但很适合我们现在的程序,至少让我用的安心。目前已经在服务器上跑了一年半,一切都很正常。小程进入园子已有3年,在这么好的平台上小程一直都只知道汲取,却从未想过回报。因此,我想给大家分享点东西,虽然这个 WorkerThreadPool 简单,没什么高深的技术,但也算是个小结晶。如果大家有好建议,小程将万分感谢!

原文发布于微信公众号 - 我为Net狂(dotNetCrazy)

原文发表时间:2016-09-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏wannshan(javaer,RPC)

dubbo路由机制代码分析1

这回说说,dubbo路由特性,dubbo的路由干的事,就是一个请求过来, dubbo依据配置的路由规则,计算出哪些提供者可以提供这次的请求服务。 所以,它的...

53170
来自专栏安恒网络空间安全讲武堂

从零基础到成功解题之0ctf-ezdoor

22340
来自专栏coolblog.xyz技术专栏

Dubbo 源码分析 - 集群容错之 Cluster

为了避免单点故障,现在的应用至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多台服务器。这样,同一环境下的服务提供者数量会大于1。对于服务消费者来说...

13320
来自专栏java思维导图

Java中高级面试题

技术文章第一时间送达! 本文作者是CyanQueen,欢迎点击阅读原文 一.基础知识: 1)集合类:List和Set比较,各自的子类比较(ArrayList,V...

39850
来自专栏大闲人柴毛毛

轻量级线程池的实现

写在前面 最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运...

54640
来自专栏开发技术

shiro源码篇 - shiro的session的查询、刷新、过期与删除,你值得拥有

    老公酷爱网络游戏,老婆无奈,只得告诫他:你玩就玩了,但是千万不可以在游戏里找老婆,不然,哼哼。。。     老公嘴角露出了微笑:放心吧亲爱的,我绝对不会...

39920
来自专栏微信公众号:Java团长

从并发编程到分布式系统——如何处理海量数据(上)

在这里想写写自己在学习并发处理的学习思路,也会聊聊自己遇到的那些坑,以此为记,希望鞭策自己不断学习、永不放弃!

10110
来自专栏叔叔的博客

Java调用Groovy

12830
来自专栏梧雨北辰的开发录

iOS面试知识总结之基本概念总结

凡经历过iOS面试的我们总会发觉,即使实际开发中做过许多项目,也难免为一个普通的面试题受挫。这也许不是因为我们技术不过关,而是因为在平时我们忽略了怎样将用到的知...

42470
来自专栏H2Cloud

C++中消息自动派发之一 About JSON

1. 闲序   游戏服务器之间通信大多采用异步消息通信。而消息打包常用格式有:google protobuff,facebook thrift, 千千万万种自定...

26830

扫码关注云+社区

领取腾讯云代金券