我正在编写一个异步应用程序来监视密码市场和交易/订单事件的价格,但是由于一个未知的原因,一些流在几个小时后停止接收数据。我不熟悉asyncio
包,我希望能帮助您找到解决方案。
基本上,下面的代码通过密码交换建立websocket连接,以侦听6个符号流(ETH/美元、BTC/美元、BNB/美元、.)并通过两个帐户(user1、user2)进行交易。应用程序使用库ccxtpro
。公共方法watch_ohlcv
获得价格流,而私有方法watchMyTrades
和watchOrders
在帐户级别获得新的订单和交易事件。
问题是一个或多个流在几个小时后被中断,而对象response
变为空或None
。我想检测和重新启动这些流后,他们停止工作,我如何做到这一点?
# tasks.py
@app.task(bind=True, name='Start websocket loops')
def start_ws_loops(self):
ws_loops()
# methods.py
def ws_loops():
async def method_loop(client, exid, wallet, method, private, args):
exchange = Exchange.objects.get(exid=exid)
if private:
account = args['account']
else:
symbol = args['symbol']
while True:
try:
if private:
response = await getattr(client, method)()
if method == 'watchMyTrades':
do_stuff(response)
elif method == 'watchOrders':
do_stuff(response)
else:
response = await getattr(client, method)(**args)
if method == 'watch_ohlcv':
do_stuff(response)
# await asyncio.sleep(3)
except Exception as e:
print(str(e))
break
await client.close()
async def clients_loop(loop, dic):
exid = dic['exid']
wallet = dic['wallet']
method = dic['method']
private = dic['private']
args = dic['args']
exchange = Exchange.objects.get(exid=exid)
parameters = {'enableRateLimit': True, 'asyncio_loop': loop, 'newUpdates': True}
if private:
log.info('Initialize private instance')
account = args['account']
client = exchange.get_ccxt_client_pro(parameters, wallet=wallet, account=account)
else:
log.info('Initialize public instance')
client = exchange.get_ccxt_client_pro(parameters, wallet=wallet)
mloop = method_loop(client, exid, wallet, method, private, args)
await gather(mloop)
await client.close()
async def main(loop):
lst = []
private = ['watchMyTrades', 'watchOrders']
public = ['watch_ohlcv']
for exid in ['binance']:
for wallet in ['spot', 'future']:
# Private
for method in private:
for account in ['user1', 'user2']:
lst.append(dict(exid=exid,
wallet=wallet,
method=method,
private=True,
args=dict(account=account)
))
# Public
for method in public:
for symbol in ['ETH/USD', 'BTC/USD', 'BNB/USD']:
lst.append(dict(exid=exid,
wallet=wallet,
method=method,
private=False,
args=dict(symbol=symbol,
timeframe='5m',
limit=1
)
))
loops = [clients_loop(loop, dic) for dic in lst]
await gather(*loops)
loop = asyncio.new_event_loop()
loop.run_until_complete(main(loop))
发布于 2022-06-30 16:12:05
让我与大家分享我的经验,因为我正在处理同样的问题。
预计CCXT在运行一段时间后不会停止运行。
不幸的是,实践和理论是不同的,错误1006经常发生。我使用Binance,OKX,Bitmex和BTSE ( BTSE不受CCXT支持),我的代码运行在AWS服务器上,所以我应该没有任何连接问题。就错误1006而言,Binance和OKX是最差的。老实说,在谷歌上研究过之后,我只知道1006是一个NetworkError,我知道CCXT会自动重新订阅频道。我在网上找到的所有其他解释都无法说服我。如果有人能给我更多关于这个错误的信息,我会很感激的。
在任何情况下,每次引发异常时,我都将其作为包含mls、方法、交换、描述ecc中的时间等信息的字典放在exception_list中。然后将exception_list传递给handle_exception方法。在这种情况下,如果列表包含两个1006异常在X时间handle_exception返回,我们不是同步的市场数据和交易必须停止。我取消了所有的限价订单,然后发出一声“蜂鸣声”(呼叫人为干预)。
至于你的第二个问题:
在这些流停止工作后重新启动,我如何做到这一点?
记住你是并发运行任务
如果return_exceptions为False (默认),则第一个引发的异常将立即传播到等待gather()的任务。aws序列中的其他等待程序不会被取消,并将继续运行。
您可以在这里()中找到关于重新启动单个任务的信息
在您的示例中,由于您使用的是单个exchange (Binance),并且没有在CCXT中实现取消订阅,因此您必须关闭连接并重新启动所有任务。您仍然可以在链接中使用上面的示例来实现自动化。如果您使用的是多个exchange,则可以设计代码,使您能够关闭并仅重新启动失败的Exchange。
您的另一个选项是定义主要粒度更大的任务,这样每个任务都与单个和定义良好的exchange/user/方法/符号相关,而每个任务都订阅了单个通道。这将导致代码更详细、更不优雅,但它将帮助您捕获异常,并最终只重新启动特定的协同过程。
显然,我假设在错误1006之后,信道状态是取消订阅的。
最后的想法:
永远不要让机器人无人看管
拥有一支在伦敦工作的工程师团队的专业做市商不去酒吧,而他们的商标(通常位于交易所内)执行数千项交易。
我希望这能帮助您,或者至少帮助您找到正确的方向来处理异常和重新启动任务。
发布于 2022-07-05 19:25:13
你需要使用回调。
例如:
ws = self.ws = await websockets.connect(END_POINTS, compression=None) # step 1
await self.ws.send(SEND_YOUR_SUBSCRIPTION_MESSAGES) # step 2
while True:
response = await self.ws.recv()
if response:
await handler(response)
最后,与await handler(response)
一样,您将向handler()
发送响应。
这个handler()
是回调,它是实际使用从交换服务器接收到的数据的函数。在这个handler()
中,您可以做的是检查响应是您想要的数据(出价/询问价格等)还是抛出类似于ConnectionClosedError的异常,在这种情况下,您可以通过在处理程序中执行步骤1和步骤2重新启动websocket。
因此,基本上在回调方法中,您需要处理数据或重新启动websocket并再次将处理程序传递给它以接收响应。
希望这能有所帮助。我无法共享完整的代码,因为我需要为敏感的业务逻辑清理它。
https://stackoverflow.com/questions/72759546
复制相似问题