我需要一个返回数据的协同线。如果返回数据,则不再可用。如果协同线被取消,数据将在下一次呼叫中可用。我需要包装协同线有相同的行为,但有时它被取消,而包装的协同线已经完成。
我可以用下面的代码来再现这种行为。
import asyncio
loop = asyncio.get_event_loop()
fut = asyncio.Future()
async def wait():
return await fut
task = asyncio.ensure_future(wait())
async def test():
await asyncio.sleep(0.1)
fut.set_result('data')
print ('fut', fut)
print ('task', task)
task.cancel()
await asyncio.sleep(0.1)
print ('fut', fut)
print ('task', task)
loop.run_until_complete(test())
输出清楚地显示,在协同线完成后,包装协同线被取消,这意味着数据将永远丢失。我不能屏蔽这两个电话,因为如果我被取消了,我就没有任何数据可以返回。
fut <Future finished result='data'>
task <Task pending coro=<wait() running at <ipython-input-8-6d115ded09c6>:7> wait_for=<Future finished result='data'>>
fut <Future finished result='data'>
task <Task cancelled coro=<wait() done, defined at <ipython-input-8-6d115ded09c6>:6>>
在我的例子中,这是因为有两个期货,一个是验证包装的协同线,另一个是取消包装协同线,有时是一起验证的。我可能会选择推迟取消(通过asyncio.sleep(0)
),但我确定它不会发生意外吗?
对于一个任务来说,这个问题更有意义:
import asyncio
loop = asyncio.get_event_loop()
data = []
fut_data = asyncio.Future()
async def get_data():
while not data:
await asyncio.shield(fut_data)
return data.pop()
fut_wapper = asyncio.Future()
async def wrapper_data():
task = asyncio.ensure_future(get_data())
return await task
async def test():
task = asyncio.ensure_future(wrapper_data())
await asyncio.sleep(0)
data.append('data')
fut_data.set_result(None)
await asyncio.sleep(0)
print ('wrapper_data', task)
task.cancel()
await asyncio.sleep(0)
print ('wrapper_data', task)
print ('data', data)
loop.run_until_complete(test())
task <Task cancelled coro=<wrapper_data() done, defined at <ipython-input-2-93645b78e9f7>:16>>
data []
数据已被消耗,但任务已被取消,因此无法检索数据。直接等待get_data()
将有效,但不能取消。
发布于 2019-07-28 09:23:40
我认为你需要首先保护等待的未来不被取消,然后发现你自己的取消。如果未来尚未完成,则将取消信息传播到其中(有效地取消shield()
)并退出。如果未来已经完成,忽略取消并返回数据。
代码将如下所示,并且为了避免全局vars并使用asyncio.run()
(如果使用Python3.6,则可以转到run_until_complete()
):
import asyncio
async def wait(fut):
try:
return await asyncio.shield(fut)
except asyncio.CancelledError:
if fut.done():
# we've been canceled, but we have the data - ignore the
# cancel request
return fut.result()
# otherwise, propagate the cancellation into the future
fut.cancel()
# ...and to the caller
raise
async def test():
loop = asyncio.get_event_loop()
fut = loop.create_future()
task = asyncio.create_task(wait(fut))
await asyncio.sleep(0.1)
fut.set_result('data')
print ('fut', fut)
print ('task', task)
task.cancel()
await asyncio.sleep(0.1)
print ('fut', fut)
print ('task', task)
asyncio.run(test())
请注意,忽略取消请求可能被认为是滥用取消机制。但是,如果知道任务是在完成之后(最好是立即完成),在您的情况下,这可能是正确的事情。请谨慎行事。
https://stackoverflow.com/questions/57178928
复制相似问题