首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在websocket流停止接收数据后重新启动coroutine?

如何在websocket流停止接收数据后重新启动coroutine?
EN

Stack Overflow用户
提问于 2022-06-26 07:06:05
回答 2查看 173关注 0票数 0

我正在编写一个异步应用程序来监视密码市场和交易/订单事件的价格,但是由于一个未知的原因,一些流在几个小时后停止接收数据。我不熟悉asyncio包,我希望能帮助您找到解决方案。

基本上,下面的代码通过密码交换建立websocket连接,以侦听6个符号流(ETH/美元、BTC/美元、BNB/美元、.)并通过两个帐户(user1、user2)进行交易。应用程序使用库ccxtpro。公共方法watch_ohlcv获得价格流,而私有方法watchMyTradeswatchOrders在帐户级别获得新的订单和交易事件。

问题是一个或多个流在几个小时后被中断,而对象response变为空或None。我想检测和重新启动这些流后,他们停止工作,我如何做到这一点?

代码语言:javascript
运行
复制
# tasks.py
@app.task(bind=True, name='Start websocket loops')
def start_ws_loops(self):
    ws_loops()

# methods.py
def ws_loops():

    async def method_loop(client, exid, wallet, method, private, args):

        exchange = Exchange.objects.get(exid=exid)

        if private:
            account = args['account']
        else:
            symbol = args['symbol']

        while True:
            try:

                if private:
                    response = await getattr(client, method)()
                    if method == 'watchMyTrades':
                        do_stuff(response)

                    elif method == 'watchOrders':
                        do_stuff(response)

                else:
                    response = await getattr(client, method)(**args)
                    if method == 'watch_ohlcv':
                        do_stuff(response)

                # await asyncio.sleep(3)

            except Exception as e:
                print(str(e))
                break
        
        await client.close()

    async def clients_loop(loop, dic):

        exid = dic['exid']
        wallet = dic['wallet']
        method = dic['method']
        private = dic['private']
        args = dic['args']

        exchange = Exchange.objects.get(exid=exid)
        parameters = {'enableRateLimit': True, 'asyncio_loop': loop, 'newUpdates': True}

        if private:
            log.info('Initialize private instance')
            account = args['account']
            client = exchange.get_ccxt_client_pro(parameters, wallet=wallet, account=account)

        else:
            log.info('Initialize public instance')
            client = exchange.get_ccxt_client_pro(parameters, wallet=wallet)

        mloop = method_loop(client, exid, wallet, method, private, args)
        await gather(mloop)
        await client.close()

    async def main(loop):

        lst = []
        private = ['watchMyTrades', 'watchOrders']
        public = ['watch_ohlcv']

        for exid in ['binance']:
            for wallet in ['spot', 'future']:
                
                # Private
                for method in private:
                    for account in ['user1', 'user2']:
                        lst.append(dict(exid=exid,
                                        wallet=wallet,
                                        method=method,
                                        private=True,
                                        args=dict(account=account)
                                        ))
                
                # Public
                for method in public:
                    for symbol in ['ETH/USD', 'BTC/USD', 'BNB/USD']:
                        lst.append(dict(exid=exid,
                                        wallet=wallet,
                                        method=method,
                                        private=False,
                                        args=dict(symbol=symbol,
                                                  timeframe='5m',
                                                  limit=1
                                                  )
                                        ))

        loops = [clients_loop(loop, dic) for dic in lst]
        await gather(*loops)

    loop = asyncio.new_event_loop()
    loop.run_until_complete(main(loop))
EN

回答 2

Stack Overflow用户

发布于 2022-06-30 16:12:05

让我与大家分享我的经验,因为我正在处理同样的问题。

预计CCXT在运行一段时间后不会停止运行。

不幸的是,实践和理论是不同的,错误1006经常发生。我使用Binance,OKX,Bitmex和BTSE ( BTSE不受CCXT支持),我的代码运行在AWS服务器上,所以我应该没有任何连接问题。就错误1006而言,Binance和OKX是最差的。老实说,在谷歌上研究过之后,我只知道1006是一个NetworkError,我知道CCXT会自动重新订阅频道。我在网上找到的所有其他解释都无法说服我。如果有人能给我更多关于这个错误的信息,我会很感激的。

在任何情况下,每次引发异常时,我都将其作为包含mls、方法、交换、描述ecc中的时间等信息的字典放在exception_list中。然后将exception_list传递给handle_exception方法。在这种情况下,如果列表包含两个1006异常在X时间handle_exception返回,我们不是同步的市场数据和交易必须停止。我取消了所有的限价订单,然后发出一声“蜂鸣声”(呼叫人为干预)。

至于你的第二个问题:

在这些流停止工作后重新启动,我如何做到这一点?

记住你是并发运行任务

如果return_exceptions为False (默认),则第一个引发的异常将立即传播到等待gather()的任务。aws序列中的其他等待程序不会被取消,并将继续运行。

您可以在这里()中找到关于重新启动单个任务的信息

在您的示例中,由于您使用的是单个exchange (Binance),并且没有在CCXT中实现取消订阅,因此您必须关闭连接并重新启动所有任务。您仍然可以在链接中使用上面的示例来实现自动化。如果您使用的是多个exchange,则可以设计代码,使您能够关闭并仅重新启动失败的Exchange。

您的另一个选项是定义主要粒度更大的任务,这样每个任务都与单个和定义良好的exchange/user/方法/符号相关,而每个任务都订阅了单个通道。这将导致代码更详细、更不优雅,但它将帮助您捕获异常,并最终只重新启动特定的协同过程。

显然,我假设在错误1006之后,信道状态是取消订阅的。

最后的想法:

永远不要让机器人无人看管

拥有一支在伦敦工作的工程师团队的专业做市商不去酒吧,而他们的商标(通常位于交易所内)执行数千项交易。

我希望这能帮助您,或者至少帮助您找到正确的方向来处理异常和重新启动任务。

票数 1
EN

Stack Overflow用户

发布于 2022-07-05 19:25:13

你需要使用回调。

例如:

代码语言:javascript
运行
复制
ws = self.ws = await websockets.connect(END_POINTS, compression=None) # step 1

await self.ws.send(SEND_YOUR_SUBSCRIPTION_MESSAGES) # step 2
while True:
     response = await self.ws.recv()
        if response:
            await handler(response)

最后,与await handler(response)一样,您将向handler()发送响应。

这个handler()是回调,它是实际使用从交换服务器接收到的数据的函数。在这个handler()中,您可以做的是检查响应是您想要的数据(出价/询问价格等)还是抛出类似于ConnectionClosedError的异常,在这种情况下,您可以通过在处理程序中执行步骤1和步骤2重新启动websocket。

因此,基本上在回调方法中,您需要处理数据或重新启动websocket并再次将处理程序传递给它以接收响应。

希望这能有所帮助。我无法共享完整的代码,因为我需要为敏感的业务逻辑清理它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72759546

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档