首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >对asyncio.Queue在复杂项目结构上实现异步原语的几点看法

对asyncio.Queue在复杂项目结构上实现异步原语的几点看法
EN

Stack Overflow用户
提问于 2019-07-20 03:56:10
回答 1查看 114关注 0票数 0

我目前正在研究一个大型异步Python项目的一个特定领域,我想知道如何在不进行一些荒谬的盗用的情况下实现程序的某种await queue.is_no_longer_full()功能。我认为使用异步的原语可能是我最好的选择,但坦率地说,我对这些概念并不熟悉,不足以合理地解决这个特定的问题。

为了举例,我对这个问题做了简单的解释,但现实是,虽然这个项目有类似的风格,但它要复杂得多--请不要理会其中一些类的简单性。

为了让任何愿意帮助(谢谢!)的人更容易,这里是完整代码的要点。出于StackOverflow的目的,下面是我所拥有的:

设置

  • Obj:在被访问之前需要进行一些异步处理的对象。
代码语言:javascript
复制
class Obj(object):
    async def process(self):
        print("Processing our obj {}.".format(id(self)))
        await asyncio.sleep(random.randint(1,5))
        self.items = random.sample(range(1000), 1000)
  • Pool:一个管理asyncio.Queue对象集成的类。
代码语言:javascript
复制
class Pool(object):
    def __init__(self):
        self.queue = asyncio.LifoQueue(maxsize=10)

    async def put(self, number):
        if self.queue.full():
            print("Queue is currently full.")

        await self.queue.put(number)

    async def get(self):
        # REFERENCE POINT #1.1
        return await self.queue.get()

   async def report_on_number(self, number, good):
        # REFERENCE POINT #1.2
        print("Number {} has been reported.")
        await asyncio.sleep(random.randint(1, 101))

        if good:
            await self.put(number)
  • Filter:用于过滤对象的类。
代码语言:javascript
复制
class Filter(object):
    async def filter(self, number):
        print("Filtering our number {}.".format(number))
        await asyncio.sleep(random.randint(1, 101))
        return True
  • Main:程序的主要逻辑。
代码语言:javascript
复制
class Main(object):
    def __init__(self):
        self.objects = [Obj() for objects in range(3)]
        self.pool = Pool()
        self.filter = Filter()

    async def setup(self):
        async def pipeline(object):
            await object.process()
            await asyncio.wait([process(number) for number in object.items])

        async def process(number):
            # REFERENCE POINT #2
            if await self.filter.filter(number):
                print("Number {} was filtered.".format(number))
                await self.pool.put(number)

        await asyncio.wait([pipeline(object) for object in self.objects])
  • 运行代码。
代码语言:javascript
复制
asyncio.run(Main().setup())

问题

我正试图完成两件事,我相信这两个问题都可以通过一个或多个异步原语的集成来解决--我不太确定。

# REFERENCE POINT #1.1 & #1.2:当用户从Pool类中提取数据时,他/她有能力将该数字重新插入到队列中,如果它在用户任务期间达到了其目的。如果该数字没有正确地用于其目的,则用户将报告该号码,并且该号码将不会被添加回队列中。这是由某种report_on_number功能完成的。

2. # REFERENCE POINT #2:在这里,这是我不确定如何正确解决的设计缺陷。具体来说,问题是我只希望在self.filter.filter(number)对象中有一个新的点来保存asyncio.loop中的资源(这是我前面提到的await queue.is_no_longer_full()“魔术”方法)时才调用它。对于当前的实现,所有的数字都会被过滤,然后没有插入到池中的数字就会在Pool.put功能中停滞。这不是我真正想要的。

修复尝试

在来到这里之前,我尝试了几种不同的方法来解决这个问题。更多的引导方法是在while中添加一个Main.process循环。

代码语言:javascript
复制
async def process(number):
    while self.filter.queue.full():
        asyncio.sleep(100)

    if await self.filter.filter(number):
        print("Number {} was filtered.".format(number))
        await self.pool.put(number)

当然,这完全是错误的做法。这似乎达到了一个标准,但它并不保证水平的“罚款”,我坦率地寻找。

第二次尝试是使用在asyncio.Condition()类内初始化的Main,并在初始化时传递给PoolFilter类。然而,由于我认为核心异步原语缺乏舒适感,所以我无法成功地实现。我尝试了几种不同的实现,但都没有成功--通常情况下,事情会被锁在外面,而且我还没能想出一种有效的方法来确保事物被适当地解锁。

希望我明白了我的观点,希望有人愿意帮助我!提前谢谢你。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-20 23:23:34

在用我为这个问题编写的演示完成问题之后,我自己解决了自己的问题。如果有人在找这个,这就是我最终想出的:

代码语言:javascript
复制
class Pool(object):
    def __init__(self, condition):
        self.condition = condition
        self.queue = asyncio.LifoQueue(maxsize=10)

    async def put(self, number):
        if self.queue.full():
            print("-> Queue is currently full.")

            async with self.condition:
                await self.condition.wait()

        print("Adding {} to queue.".format(number))
        await self.queue.put(number)

    async def get(self):
        return await self.queue.get()

    async def report_on_number(self, number, good):
        print("   -> Number {} has been reported.".format(number))
        await asyncio.sleep(random.randint(2, 4))
        if good:
            print("   -> Adding number {} back into the queue.".format(number))
            await self.queue.put(number)
            print("   -> Added successfully!")
        else:
            print("   -> Releasing lock.")
            self.condition.release()
            print("   -> Lock released completed.")
代码语言:javascript
复制
class Main(object):
    def __init__(self):
        self.objects = [Obj() for objects in range(3)]
        self.condition = asyncio.Condition()
        self.pool = Pool(self.condition)
        self.filter = Filter()

    async def setup(self):
        async def pipeline(object):
            await object.process()
            await asyncio.wait([process(number) for number in object.items])

        async def process(number):
            async with self.condition:
                print("Processing number ", number)
                if await self.filter.filter(number):
                    await self.pool.put(number)

        await asyncio.wait([pipeline(object) for object in self.objects])
代码语言:javascript
复制
async def test(main):

    await asyncio.sleep(5)

    print("\n\n-------- First run ---------- \n\n")
    print("\n -> Getting some number.")
    number = await main.pool.get()
    print(" -> Got number", number)
    await main.pool.report_on_number(number, False)

    await asyncio.sleep(5)
    print("\n\n-------- Second run ---------- \n\n")
    print("\n -> Getting some number.")
    number = await main.pool.get()
    print(" -> Got number", number)
    await main.pool.report_on_number(number, True)

    await asyncio.sleep(5)
    print("\n\n-------- Third run ---------- \n\n")
    print("\n -> Getting some number.")
    number = await main.pool.get()
    print(" -> Got number", number)
    await main.pool.report_on_number(number, False)

main = Main()
loop = asyncio.get_event_loop()
loop.create_task(main.setup())
loop.create_task(test(main))
loop.run_forever()

其中产出:

代码语言:javascript
复制
... A bunch of processing/filtering printouts above.
...

Processing number  864
Filtering our number 864.
-> Queue is currently full.

-------- First run ---------- 

 -> Getting some number.
 -> Got number 34
   -> Number 34 has been reported.
   -> Releasing lock.
   -> Lock released completed.
Processing number  866
Filtering our number 866.
Adding 866 to queue.
Processing number  121
Filtering our number 121.
-> Queue is currently full.

-------- Second run ---------- 

 -> Getting some number.
 -> Got number 866
   -> Number 866 has been reported.
   -> Adding number 866 back into the queue.
   -> Added successfully!

-------- Third run ---------- 

 -> Getting some number.
 -> Got number 866
   -> Number 866 has been reported.
   -> Releasing lock.
   -> Lock released completed.
Processing number  55
Filtering our number 55.
Adding 55 to queue.
Processing number  19
Filtering our number 19.
-> Queue is currently full.

它工作得很结实!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57121674

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档