我有两套套路。一个处理到websocket服务器的连接。另一个做了些武断的事。当异常发生时,我想终止程序。但是,处理到websocket服务器的连接的共同例程继续运行。
请参阅此示例(不需要任何websocket服务器,只需运行此代码并尝试重新连接到服务器):
import asyncio
import websockets
async def task1():
""" A dummy co-routine
After five iterations. raise an exception"""
i = 1
我试图从异步队列中提取任务,并在出现异常时调用给定的错误处理程序。排队的项作为字典(由enqueue_task排队)提供,其中包含任务、可能的错误处理程序以及错误处理程序可能需要的任何args/kwargs。由于我希望在任务完成时处理任何错误,所以我将每个任务映射到已退出队列的字典,并在出现异常时尝试访问它。
async def _check_tasks(self):
try:
while self._check_tasks_task or not self._check_task_queue.empty():
tasks = []
更新:异步只需执行它被告知的操作,您就可以很好地处理这些异常--请参阅我标记为问题解决方案的后续答案。下面是最初的问题,并以稍加修改的例子来澄清这个问题及其解决方案。
我一直在尝试调试一个在很大程度上依赖于异步的库。在编写一些示例代码时,我意识到有时执行键盘中断(CTRL)(很少!)引发了可怕的..。
Task exception was never retrieved
我一直在努力确保我派生的所有任务都能优雅地处理asyncio.CancelledError,在花了太多时间调试它之后,我意识到只有当异步任务之一被阻塞操作卡住时,我才会收到这条错误消息。
封锁?您真的不应该在任务中执行阻塞工作
import asyncio
import random
async def producer(q: asyncio.Queue):
for _ in range(100):
await q.put(random.randint(1, 10))
async def consumer(q: asyncio.Queue):
while True:
num = await q.get()
print("Got From Queue: ", num)
q.task_done()
async de
我不知道我是否正确地理解了asyncio,或者我是否做错了什么。但是,在调用循环中的下一个app之前,我希望运行每个app的所有函数。
在进入app中的下一个cycle之前,每个函数都必须按照特定的顺序运行。
我的剧本:
# Cycle through the bots in one loop rather than restarting the loop in an infinite loop
for app in cycle(applications):
print('Going to bot: ' + str(app[1]))
app[2].set_foc
我正在尝试两种方法来阻止无限循环的运行:
supervisor_1:任务按程序取消
supervisor_2:任务用Ctrl+C停止
虽然supervisor_2不会在中断时抛出任何错误,但我不能从获取Task was destroyed but it is pending!中获得supervisor_1。知道为什么吗?
以下是代码:
import asyncio
import aioredis
from functools import partial
class Listener:
def __init__(self, redis_conn):
我第一次与httpx.AsyncClient协同使用异步,并试图找出当其中一些任务可能失败时如何完成任务列表。我正在使用我在几个地方找到的模式,在这些地方,我用coroutine函数填充异步队列,并在asyncio.gather内部有一组处理该队列的工作人员。通常,如果执行该工作的函数引发异常,则会看到整个脚本在处理过程中失败,并与RuntimeWarning: coroutine foo was never awaited一起报告异常,表明您从未完成列表。
我找到了asyncio.gather的asyncio.gather选项,这是有帮助的,但不是完全的。当我得到异常的次数与调用gather
我试图通过list和aiohttp异步地从URL的list中获取HTML源代码的asyncio,但我得到了两个例外:
TypeError('An asyncio.Future, a coroutine or an awaitable is ' TypeError: An asyncio.Future, a coroutine or an awaitable is required Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001C92141F310>raise
如果任何人能帮助我使用Python和async/await,任何帮助都将不胜感激! 我需要侦听websocket以获取消息,因此我设置了以下代码: import websockets
import asyncio
my_socket = "ws://......."
# I set a "while True" here to reconnect websocket if it stop for any reason
while True:
try:
async with websockets.connect(my_socket)
我有一个父函数,它应该在一个数据集上运行两个测试。如果这些测试中的任何一个失败,父函数应该返回fail。我想用异步异步运行这两个测试,一旦其中一个测试失败,父函数应该返回fail并取消另一个测试。
我对异步通信很陌生,我用条件阅读了一些示例,但不知道如何用条件编写异步。
到目前为止,我可以通过在任何失败的测试中抛出异常来处理它。
以下是我的基本代码:
async def test1(data):
# run some test on data and return true on pass and throw exception on fail
async def test2(dat
我正在尝试将asyncio功能集成到我的kafka主题侦听器中,并且遇到了一些问题(在python中异步编程非常新)。
我创建了一个confluent-kafka consumer,用于收听一个主题。主题经常有消息,性能是最重要的(因此引入了异步io)。
main()函数如下所示:
def main(self):
while True:
try:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
as