python的生产者消费者模型,看这篇就够了

首先先来解释下,什么是「生产者消费者模型」生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。以上是wiki百科对于模型的解释,用我的话总结一下就是:一个愿打,一个愿挨

从上图中可以看到生产者消费者之间用中间类似一个队列一样的东西串起来。这个队列可以想像成一个存放产品的“仓库”,生产者只需要关心这个“仓库”,并不需要关心具体的消费者,对于生产者而言甚至都不知道有这些消费者存在。对于消费者而言他也不需要关心具体的生产者,到底有多少生产者也不是他关心的事情,他只要关心这个“仓库”中还有没有东西。这种模型是一种松耦合模型

那么接下来就用代码来演示一下

class Consumer(threading.Thread):
   def __init__(self, queue):
       # 从python3 开始,继承有了极大的改善
       super().__init__()
       # 我喜欢用单下划线去标识私有变量
       self._queue = queue

   # 继承了Thread类后,需要重写run方法来自定义事件
   def run(self):
       while True:
           # msg即是我们说的消息,也是仓库中的货物
           msg = self._queue.get()
           # 我会在生产者中加入quit关键字,保证程序能够自动退出
           if isinstance(msg, str) and msg == 'quit':
               break
           print(f"I'm a thread, and I received {msg}!!")
       print('Bye byes!')


def producer():
   # 生产者生产一个新的队列,用于存放消息
   queue = Queue.Queue()
   # 初始化一个消费者实例
   worker = Consumer(queue)
   # 开启消费者线程
   worker.start()  
   start_time = time.time()
   # 退出条件
   while time.time() - start_time < 5:
       queue.put('something at %s' % time.time())
       time.sleep(1)
   queue.put('quit')
   worker.join()


if __name__ == '__main__':
   producer()

那么如果在爬虫的时候,我们就可以把仓库中的货物想像成是一个个的url,生产者产生url链接,消费者获取url连接并从中得到数据,在队列的帮助下可以使用多线程加快爬虫速度。

import time, threading, Queue
import requests

class Consumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self._queue = queue

    def run(self):
        while True:
            _content = self._queue.get()
            print(_content)
            if isinstance(_content, str) and _content == 'quit':
                break
            _res = requests.get(content)
        print('Bye byes!')

def Producer():
    _urls = ['url1', 'url2', 'url3', 'url4']
    _q = Queue.Queue()
    # 一次打开4个队列
    _workers = build_worker_pool(_q, 4)
    _start_time = time.time()
    for _url in _urls:
        _q.put(_url)
    for _w in _workers:
        _q.put('quit')
    for _w in _workers:
        # join的作用就是为了保证所有的子线程都结束了,再结束父线程
        _w.join()
    _t = time.time() - start_time
    print(f"Done! Time taken: {_t}")

def build_worker_pool(queue, size):
    _workers = []
    for _ in range(size):
        _worker = Consumer(queue)
        _worker.start()
        _workers.append(_worker)
    return workers

if __name__ == '__main__':
    Producer()

好啦~今天的内容就到这里差不多咯~

如果你对今天的内容还感兴趣的话,何不点个赞再走呢?如果感兴趣到想赞赏我,就不要犹豫啦~


原文发布于微信公众号 - 猿媛牧场(xpchuiit)

原文发表时间:2018-06-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏魏琼东

一步一步教你使用AgileEAS.NET基础类库进行应用开发-基础篇-ORM访问器及其配置

系列回顾          本系列前面有三篇文章介绍和演示了AgileEAS.NET平台ORM组件的开发流程及其常见的使用方式,通过前面的三篇文章,大家都可以正...

1939
来自专栏谈补锅

汇编语言学习

   7、1Byte = 8bit ;    1KB = 1024B ;  1MB = 1024KB ;   1GB = 1024MB

2313
来自专栏SDNLAB

深度数据包检测DPI开发解析

深度数据包检测(DPI) 深度数据包检测(Deep packet inspection,缩写为 DPI)是一种特殊的网络技术,一般网络设备只会查看以太网头部、I...

4347
来自专栏MessageQueue

读Kafka Consumer源码

这是OpenMessaging-Java项目GitHub上的一段介绍,大致是说OpenMessaging项目致力于建立MQ领域的标准。

1042
来自专栏技术笔记

使用ESLint+Prettier来统一前端代码风格

想起自己刚入行的时候,从svn上把代码checkout下来,看到同事写的代码,大括号居然换行了。心中暗骂,这个人是不是个**,大括号为什么要换行?年轻气盛的我,...

4322
来自专栏技术墨客

Nodejs学习笔记(1)——安装nodejs

    关于大名鼎鼎的Nodejs是什么就不用再介绍了,他的牛逼之处数都数不完——让javascript称霸全宇宙、将一个只用于前端的编程语言同时可以制霸前后端...

762
来自专栏向治洪

xmpp即时通讯四

     TLS协商(5节)后,如果需要SASL协商(6节)与资源绑定(7节),XML节可通过流来发送。定义了三种XML节用于 'jabber:client'与...

1935
来自专栏技术总结

iOS进阶之消息转发机制

2706
来自专栏深度学习之tensorflow实战篇

mongodb11天之屠龙宝刀(三)基本操作:增删改查与mysql对比

mongodb11天之屠龙宝刀(三)基本操作:增删改查与mysql对比 基本概念_id和ObjectId: 1._id   MongoDB 中存储的文档必有...

2993
来自专栏java思维导图

深入源码分析Java线程池的实现原理

程序的运行,其本质上,是对系统资源(CPU、内存、磁盘、网络等等)的使用。如何高效的使用这些资源是我们编程优化演进的一个方向。今天说的线程池就是一种对CPU利用...

902

扫码关注云+社区