我使用的是一个网络库,它提供了一个包装器,用于在asyncio
中使用协程函数。当我写了一个随机关闭连接的测试(看看我的程序在坏的情况下是否有弹性)时,我发现它无限期地挂起。
这看起来像是我正在使用的库提供的包装器中的一个错误,因为程序在等待来自loop.add_reader()
或loop.add_writer()
的回调时挂起,但随后我找不到如何在套接字关闭时收到通知。
这是一个最小的程序,它显示了我的程序发生了什么:
import asyncio
import socket
async def kill_later(c):
await asyncio.sleep(0.1)
c.close()
async def main():
loop = asyncio.get_running_loop()
c = socket.create_connection(('www.google.com', 80))
c.setblocking(0)
ev = asyncio.Event()
loop.add_reader(c, ev.set)
# Closes the socket after 0.1 ms:
asyncio.create_task(kill_later(c))
print("waiting...")
#### ↓ THIS WAITS FOREVER ↓ ####
await ev.wait()
asyncio.run(main())
我的问题是:如何收到asyncio
循环关闭套接字的通知?
编辑:由于流行的需求,使套接字是非阻塞的,但这没有区别,因为add_reader()
不会尝试在套接字上执行任何IO,只是观察它何时准备好。
发布于 2019-05-30 19:25:34
你的测试程序有缺陷。对c.close()
的调用不会模拟被另一端关闭的套接字,它会关闭您自己的文件描述符,并使其不可访问。您可以认为close(fd)
打破了数字fd和底层操作系统资源之间的联系。在此之后,读取和轮询fd变得毫无意义,因为这个数字不再表示任何东西。因此,epoll()
不能也不会将关闭的文件描述符报告为“可读”。
测试您想要测试的条件的方法是让other end关闭连接。要做到这一点,最简单的方法是生成另一个进程或线程作为模拟服务器。例如:
import asyncio, threading, socket, time
def start_mock_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 10000))
s.listen(1)
def serve():
conn, addr = s.accept()
time.sleep(1)
conn.close()
s.close()
threading.Thread(target=serve).start()
async def main():
loop = asyncio.get_running_loop()
start_mock_server()
c = socket.create_connection(('localhost', 10000))
c.setblocking(0)
ev = asyncio.Event()
loop.add_reader(c.fileno(), ev.set)
print("waiting...")
await ev.wait()
print("done")
asyncio.run(main())
https://stackoverflow.com/questions/56367271
复制相似问题