首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >python Websockets (asyncio ver)强制关闭连接

python Websockets (asyncio ver)强制关闭连接
EN

Stack Overflow用户
提问于 2018-06-30 04:57:13
回答 1查看 2.9K关注 0票数 1

我正在为python >3.5编写代码。我使用的是Websockets 6.0库,这里是:https://github.com/aaugustin/websockets,我一直叫它们asyncio Websockets,因为它们是基于异步的。在我的搜索中,有很多“丢失的连接”,但我正在研究如何取消当前的ws.recv()。对.start()的调用将创建一个帮助器线程来启动异步事件循环。然后接收函数启动并调用连接函数,websocket ws被实例化。然后,接收函数工作秋季消息。当我准备停止时,会调用一个.stop()。我期望stop函数能停止等待的ws.recv()。然后,在keep_running标志设置为false并运行ws.close()的情况下,我期望ws.recv()结束,when keep_running循环结束。这不是正在发生的事情。我看到了所有的三个停止,但从来没有receive stop

代码语言:javascript
复制
command is: stop
Do command is stopped
Stop 1
Stop 2
Stop 3
^CException ignored in: <module 'threading' from '/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py'>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1294, in _shutdown
    t.join()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
(pyalmondplus) Pauls-MBP:pyalmondplus paulenright$ 

参考代码:

代码语言:javascript
复制
import threading
import asyncio
import websockets
import json

class PyAlmondPlus:
    def __init__(self, api_url, event_callback=None):
        self.api_url = api_url
        self.ws = None
        self.loop = asyncio.get_event_loop()
        self.receive_task = None
        self.event_callback = event_callback
        self.keep_running = False

    async def connect(self):
        print("connecting")
        if self.ws is None:
            print("opening socket")
            self.ws = await websockets.connect(self.api_url)
        print(self.ws)

    async def disconnect(self):
        pass

    async def send(self, message):
        pass

    async def receive(self):
        print("receive started")
        while self.keep_running:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()
            print(recv_data)
        print("receive ended")

    def start(self):
        self.keep_running = True
        print("Start 1")
        print("Start 2")
        t = threading.Thread(target=self.start_loop, args=())
        print("Start 3")
        t.start()
        print("Receiver running")

    def start_loop(self):
        print("Loop helper 1")
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(True)
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.receive())
        print("Loop helper 2")

    def stop(self):
        print("Stop 1")
        self.keep_running = False
        print("Stop 2")
        self.ws.close()
        print("Stop 3")
EN

回答 1

Stack Overflow用户

发布于 2018-06-30 07:01:07

我正在研究如何取消当前的ws.recv() ...我看到了所有三个停止,但从来没有接收停止。

您的receive协程可能会挂起,等待一些数据到达,因此它无法检查keep_running标志。

停止正在运行的协程的简单而强大的方法是cancel驱动它的异步Task。这将立即取消暂停协程,并生成它正在等待的CancelledError。当使用cancel时,你根本不需要keep_running标志,异常将自动终止循环。

对.start()的调用创建了一个帮助线程来启动异步事件循环。

这是可行的,但是您并不真的需要为每个PyAlmondPlus实例创建一个新的线程和一个全新的事件循环。Asyncio被设计为在单个线程内运行,因此一个事件循环实例可以承载任意数量的协程。

这是一个可能的设计,实现了这两个想法(没有用实际的web套接字测试):

代码语言:javascript
复制
# pre-start a single thread that runs the asyncio event loop
bgloop = asyncio.new_event_loop()
_thread = threading.Thread(target=bgloop.run_forever)
_thread.daemon = True
_thread.start()

class PyAlmondPlus:
    def __init__(self, api_url):
        self.api_url = api_url
        self.ws = None

    async def connect(self):
        if self.ws is None:
            self.ws = await websockets.connect(self.api_url)

    async def receive(self):
        # keep_running is not needed - cancel the task instead
        while True:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()

    async def init_receive_task(self):
        self.receive_task = bgloop.create_task(self.receive())

    def start(self):
        # use run_coroutine_threadsafe to safely submit a coroutine
        # to the event loop running in a different thread
        init_done = asyncio.run_coroutine_threadsafe(
            self.init_receive_task(), bgloop)
        # wait for the init coroutine to actually finish
        init_done.result()

    def stop(self):
        # Cancel the running task. Since the event loop is in a
        # background thread, request cancellation with
        # call_soon_threadsafe.
        bgloop.call_soon_threadsafe(self.receive_task.cancel)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51109305

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档