前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 自带异步队列的大坑

Python 自带异步队列的大坑

作者头像
青南
发布2020-06-04 16:55:25
5.9K0
发布2020-06-04 16:55:25
举报
文章被收录于专栏:未闻Code

我们在使用 Python 的 asyncio 写异步程序的时候,可能会使用asyncio.Queue来实现一个异步队列,通过它来让生产者和消费者进行通信。

但如果你的异步队列没有填写maxsize参数,那么可能会产生让你意料之外的结果。我们来看一段代码:

代码语言:javascript
复制
import asyncio
import random
import aiohttp


async def producer(queue):
    for _ in range(10):
        sleep_time = random.randint(1, 2)
        await queue.put(sleep_time)


async def consumer(queue):
    while True:
        sleep_time = await queue.get()
        size = queue.qsize()
        print(f'当前队列有:{size} 个元素')
        url = f'http://httpbin.org/delay/{sleep_time}'
        async with aiohttp.ClientSession() as client:
            resp = await client.get(url)
            print(await resp.json())

async def main():
    queue = asyncio.Queue()
    asyncio.create_task(producer(queue))
    con = asyncio.create_task(consumer(queue))
    await con


asyncio.run(main())

这段代码把 producerconsumer分别创建成异步任务,期望实现的效果是生产者不停生产数据放进异步队列,消费者不停从队列读取数据,然后发起网络请求。生产者与消费者利用 IO 等待时间实现并行。

但如果你运行一下这段代码,你会发现一件很奇怪的事情,如下图所示:

当我们的消费者开始消费的时候,队列里面实际上已经有10条数据了!由于图中代码第19行是先读取了一条数据再打印剩余的数量,所以打印的是当前队列有:9 个元素

所以,生产者与消费者根本没有并行。是生产者里面的循环完全运行完成了,才开始运行的消费者!

如果在实际代码中,你的生产者生产了几百万条数据,那么此时所有数据全部都堆放在异步队列里面,很容易就把你的内存撑爆了!

那么这个问题要如何解决呢?实际上非常简单,使用maxsize参数指定异步队列的大小:

代码语言:javascript
复制
queue = asyncio.Queue(maxsize=3)

我们这里设定为3,再运行看看效果:

看到这里,可能有人会说,这仅仅是生产者先把异步队列堆满,才能进行消费,并没有什么本质区别啊,本质上还是先只有生产者运行,等他跑不动了(队列满了),消费者才能运行,还是没有实现并行啊。

这是由于,在上面的例子中,生产者的速度远远超过消费者的速度,所以才会出现生产者总是堆满队列的问题。

为了说明生产者和消费者能真正利用 IO 等待时间进行并行,我们改一下代码:

代码语言:javascript
复制
import asyncio
import random
import aiohttp


async def producer(queue):
    for i in range(10):
        await queue.put(i)
        await asyncio.sleep(random.randint(1, 3))


async def consumer(queue):
    while True:
        sleep_time = await queue.get()
        size = queue.qsize()
        print(f'当前队列有:{size} 个元素')
        url = 'http://httpbin.org/delay/2'
        async with aiohttp.ClientSession() as client:
            resp = await client.get(url)
            print(await resp.json())

async def main():
    queue = asyncio.Queue(maxsize=3)
    asyncio.create_task(producer(queue))
    con = asyncio.create_task(consumer(queue))
    await con


asyncio.run(main())

生产者生产数据后,随机休眠1-3秒。而消费者请求的网址总是2秒返回数据。这样一来,有时候生产者快,有时候消费者快。我们来看看运行效果:

可以看到,当生产者快的时候,异步队列里面的数据就会堆积,当消费者快的时候,异步队列里面的数据就会变少。说明生产者与消费者实现了利用 IO 等待时间进行并行操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 未闻Code 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档