专栏首页python3用 Python 实现的线程池

用 Python 实现的线程池

为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。

下面是用Python实现的通用的线程池代码:

view plainprint?

  1. import Queuethreadingsys
  2. from threading import Thread  
  3. import time,urllib
  4. # working thread
  5. class Worker(Thread):  
  6.     worker_count = 0  
  7. def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):  
  8.         Thread.__init__( self, **kwds )  
  9. self.id = Worker.worker_count  
  10.         Worker.worker_count += 1  
  11. self.setDaemon( True )  
  12. self.workQueue = workQueue  
  13. self.resultQueue = resultQueue  
  14. self.timeout = timeout  
  15. def run( self ):  
  16. ''' the get-some-work, do-some-work main loop of worker threads '''
  17. while True:  
  18. try:  
  19. callable, args, kwds = self.workQueue.get(timeout=self.timeout)  
  20.                 res = callable(*args, **kwds)  
  21. print "worker[%2d]: %s" % (self.id, str(res) )  
  22. self.resultQueue.put( res )  
  23. except Queue.Empty:  
  24. break
  25. except :  
  26. print 'worker[%2d]' % self.id, sys.exc_info()[:2]  
  27. class WorkerManager:  
  28. def __init__( self, num_of_workers=10, timeout = 1):  
  29. self.workQueue = Queue.Queue()  
  30. self.resultQueue = Queue.Queue()  
  31. self.workers = []  
  32. self.timeout = timeout  
  33. self._recruitThreads( num_of_workers )  
  34. def _recruitThreads( self, num_of_workers ):  
  35. for i in range( num_of_workers ):  
  36.             worker = Worker( self.workQueue, self.resultQueue, self.timeout )  
  37. self.workers.append(worker)  
  38. def start(self):  
  39. for w in self.workers:  
  40.             w.start()  
  41. def wait_for_complete( self):  
  42. # ...then, wait for each of them to terminate:
  43. while len(self.workers):  
  44.             worker = self.workers.pop()  
  45.             worker.join( )  
  46. if worker.isAlive() and not self.workQueue.empty():  
  47. self.workers.append( worker )  
  48. print "All jobs are are completed."  
  49. def add_job( self, callable, *args, **kwds ):  
  50. self.workQueue.put( (callable, args, kwds) )  
  51. def get_result( self, *args, **kwds ):  
  52. return self.resultQueue.get( *args, **kwds )  

Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。

WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。

view plainprint?

  1. def test_job(id, sleep = 0.001 ):  
  2. try:  
  3. urllib.urlopen('https://www.gmail.com/').read()  
  4. except:  
  5. print '[%4d]' % id, sys.exc_info()[:2]  
  6. return id
  7. def test():  
  8. import socket
  9. socket.setdefaulttimeout(10)  
  10. print 'start testing'  
  11.     wm = WorkerManager(10)  
  12. for i in range(500):  
  13.         wm.add_job( test_job, i, i*0.001 )  
  14.     wm.start()  
  15.     wm.wait_for_complete()  
  16. print 'end testing' 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • PyQt5--QFontDiaglog

    py3study
  • 数据结构(三):队列

      队列通常用来实现消息(任务)的快速读写,即消息队列。消息队列的常用来解决如下问题:

    py3study
  • Python写自动化之SVN更新

    在远程机器上执行脚本时,为了能够保证脚本的实时性,我们一般会将脚本存放到SVN上,远程机器通过SVN的操作去更新脚本;

    py3study
  • 用Python从零开始设计数字图片识别神经网络--搭建基本架构

    望月从良
  • PyQt5--QFontDiaglog

    py3study
  • 数据结构(三):队列

      队列通常用来实现消息(任务)的快速读写,即消息队列。消息队列的常用来解决如下问题:

    py3study
  • Python包装授权

        包装和授权往往使用在定制某种类,其实现的多样性,只要你能想的到,就可以出现千变万化的授权、包装实现方式,上述仅仅提供参考。

    py3study
  • Python开发植物大战僵尸游戏,详细教程

    一墨编程学习
  • Objectiv-c - UICollectionViewLayout自定义布局-瀑布流Demo地址

    公开变量以及代理. 公开的变量是可以进行调用时设置,一般就为这些,delegate用来实现动态的高度设置

    gwk_iOS
  • 日常 Python 编程优雅之道

    Python 提供了一组独特的工具和语言特性来使你的代码更加优雅、可读和直观。为正确的问题选择合适的工具,你的代码将更易于维护。在本文中,我们将研究其中的三个工...

    前端博客 : alili.tech

扫码关注云+社区

领取腾讯云代金券