我目前正在研究一个大型异步Python项目的一个特定领域,我想知道如何在不进行一些荒谬的盗用的情况下实现程序的某种await queue.is_no_longer_full()功能。我认为使用异步的原语可能是我最好的选择,但坦率地说,我对这些概念并不熟悉,不足以合理地解决这个特定的问题。
为了举例,我对这个问题做了简单的解释,但现实是,虽然这个项目有类似的风格,但它要复杂得多--请不要理会其中一些类的简单性。
为了让任何愿意帮助(谢谢!)的人更容易,这里是完整代码的要点。出于StackOverflow的目的,下面是我所拥有的:
设置
Obj:在被访问之前需要进行一些异步处理的对象。class Obj(object):
async def process(self):
print("Processing our obj {}.".format(id(self)))
await asyncio.sleep(random.randint(1,5))
self.items = random.sample(range(1000), 1000)Pool:一个管理asyncio.Queue对象集成的类。class Pool(object):
def __init__(self):
self.queue = asyncio.LifoQueue(maxsize=10)
async def put(self, number):
if self.queue.full():
print("Queue is currently full.")
await self.queue.put(number)
async def get(self):
# REFERENCE POINT #1.1
return await self.queue.get()
async def report_on_number(self, number, good):
# REFERENCE POINT #1.2
print("Number {} has been reported.")
await asyncio.sleep(random.randint(1, 101))
if good:
await self.put(number)Filter:用于过滤对象的类。class Filter(object):
async def filter(self, number):
print("Filtering our number {}.".format(number))
await asyncio.sleep(random.randint(1, 101))
return TrueMain:程序的主要逻辑。class Main(object):
def __init__(self):
self.objects = [Obj() for objects in range(3)]
self.pool = Pool()
self.filter = Filter()
async def setup(self):
async def pipeline(object):
await object.process()
await asyncio.wait([process(number) for number in object.items])
async def process(number):
# REFERENCE POINT #2
if await self.filter.filter(number):
print("Number {} was filtered.".format(number))
await self.pool.put(number)
await asyncio.wait([pipeline(object) for object in self.objects])asyncio.run(Main().setup())问题
我正试图完成两件事,我相信这两个问题都可以通过一个或多个异步原语的集成来解决--我不太确定。
# REFERENCE POINT #1.1 & #1.2:当用户从Pool类中提取数据时,他/她有能力将该数字重新插入到队列中,如果它在用户任务期间达到了其目的。如果该数字没有正确地用于其目的,则用户将报告该号码,并且该号码将不会被添加回队列中。这是由某种report_on_number功能完成的。
2. # REFERENCE POINT #2:在这里,这是我不确定如何正确解决的设计缺陷。具体来说,问题是我只希望在self.filter.filter(number)对象中有一个新的点来保存asyncio.loop中的资源(这是我前面提到的await queue.is_no_longer_full()“魔术”方法)时才调用它。对于当前的实现,所有的数字都会被过滤,然后没有插入到池中的数字就会在Pool.put功能中停滞。这不是我真正想要的。
修复尝试
在来到这里之前,我尝试了几种不同的方法来解决这个问题。更多的引导方法是在while中添加一个Main.process循环。
async def process(number):
while self.filter.queue.full():
asyncio.sleep(100)
if await self.filter.filter(number):
print("Number {} was filtered.".format(number))
await self.pool.put(number)当然,这完全是错误的做法。这似乎达到了一个标准,但它并不保证水平的“罚款”,我坦率地寻找。
第二次尝试是使用在asyncio.Condition()类内初始化的Main,并在初始化时传递给Pool和Filter类。然而,由于我认为核心异步原语缺乏舒适感,所以我无法成功地实现。我尝试了几种不同的实现,但都没有成功--通常情况下,事情会被锁在外面,而且我还没能想出一种有效的方法来确保事物被适当地解锁。
希望我明白了我的观点,希望有人愿意帮助我!提前谢谢你。
发布于 2019-07-20 23:23:34
在用我为这个问题编写的演示完成问题之后,我自己解决了自己的问题。如果有人在找这个,这就是我最终想出的:
class Pool(object):
def __init__(self, condition):
self.condition = condition
self.queue = asyncio.LifoQueue(maxsize=10)
async def put(self, number):
if self.queue.full():
print("-> Queue is currently full.")
async with self.condition:
await self.condition.wait()
print("Adding {} to queue.".format(number))
await self.queue.put(number)
async def get(self):
return await self.queue.get()
async def report_on_number(self, number, good):
print(" -> Number {} has been reported.".format(number))
await asyncio.sleep(random.randint(2, 4))
if good:
print(" -> Adding number {} back into the queue.".format(number))
await self.queue.put(number)
print(" -> Added successfully!")
else:
print(" -> Releasing lock.")
self.condition.release()
print(" -> Lock released completed.")class Main(object):
def __init__(self):
self.objects = [Obj() for objects in range(3)]
self.condition = asyncio.Condition()
self.pool = Pool(self.condition)
self.filter = Filter()
async def setup(self):
async def pipeline(object):
await object.process()
await asyncio.wait([process(number) for number in object.items])
async def process(number):
async with self.condition:
print("Processing number ", number)
if await self.filter.filter(number):
await self.pool.put(number)
await asyncio.wait([pipeline(object) for object in self.objects])async def test(main):
await asyncio.sleep(5)
print("\n\n-------- First run ---------- \n\n")
print("\n -> Getting some number.")
number = await main.pool.get()
print(" -> Got number", number)
await main.pool.report_on_number(number, False)
await asyncio.sleep(5)
print("\n\n-------- Second run ---------- \n\n")
print("\n -> Getting some number.")
number = await main.pool.get()
print(" -> Got number", number)
await main.pool.report_on_number(number, True)
await asyncio.sleep(5)
print("\n\n-------- Third run ---------- \n\n")
print("\n -> Getting some number.")
number = await main.pool.get()
print(" -> Got number", number)
await main.pool.report_on_number(number, False)
main = Main()
loop = asyncio.get_event_loop()
loop.create_task(main.setup())
loop.create_task(test(main))
loop.run_forever()其中产出:
... A bunch of processing/filtering printouts above.
...
Processing number 864
Filtering our number 864.
-> Queue is currently full.
-------- First run ----------
-> Getting some number.
-> Got number 34
-> Number 34 has been reported.
-> Releasing lock.
-> Lock released completed.
Processing number 866
Filtering our number 866.
Adding 866 to queue.
Processing number 121
Filtering our number 121.
-> Queue is currently full.
-------- Second run ----------
-> Getting some number.
-> Got number 866
-> Number 866 has been reported.
-> Adding number 866 back into the queue.
-> Added successfully!
-------- Third run ----------
-> Getting some number.
-> Got number 866
-> Number 866 has been reported.
-> Releasing lock.
-> Lock released completed.
Processing number 55
Filtering our number 55.
Adding 55 to queue.
Processing number 19
Filtering our number 19.
-> Queue is currently full.它工作得很结实!
https://stackoverflow.com/questions/57121674
复制相似问题