首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将异步websockets作为类实现后接收流数据?

将异步websockets作为类实现后接收流数据?
EN

Stack Overflow用户
提问于 2020-04-14 19:51:09
回答 2查看 1.5K关注 0票数 3

我的问题与Stackoverflow上的以下问题和文档这里密切相关。我将websockets-connection定义为一个类。接下来,我创建了一个新的类,其中我将前面定义的websocket类称为self.ws,并告诉您要用self.request发送哪些数据到websocket。我的问题是当前脚本只运行一次,而我想要的输出是连续数据。

第二个链接显示,我可以使用

代码语言:javascript
运行
复制
asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))

我在代码中包含了上述所有代码(由于希望将call_api作为类编写,所以定义不同)。下面是我的代码:

代码语言:javascript
运行
复制
import sys, json
import asyncio
from websockets import connect

class EchoWebsocket:
   def __init__(self, URL, CLIENT_ID=None, CLIENT_SECRET=None):
      self.url = URL
      self.client_id = CLIENT_ID
      self.client_secret = CLIENT_SECRET

   async def __aenter__(self):
      self._conn = connect(self.url)
      self.websocket = await self._conn.__aenter__()
      return self

   async def __aexit__(self, *args, **kwargs):
      await self._conn.__aexit__(*args, **kwargs)

   async def send(self, message):
      await self.websocket.send(message)

   async def receive(self):
      return await self.websocket.recv()

class DERIBIT:
   def __init__(self):
      self.ws = EchoWebsocket(URL='wss://test.deribit.com/ws/api/v2')
      self.loop = asyncio.get_event_loop()
      self.request = \
                   {"jsonrpc": "2.0",
                    "method": "public/subscribe",
                    "id": 42,
                    "params": {
                        "channels": ["deribit_price_index.btc_usd"]}
                   }

   def get_ticks(self):
      return self.loop.run_until_complete(self.__async__get_ticks())

   async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         response = await echo.receive()
         print(response)


if __name__ == "__main__":
   deribit = DERIBIT()
   deribit.get_ticks()

此脚本提供以下输出:

{"jsonrpc":"2.0",“方法”:“公共/订阅”,"id":42,"params":{“信道”:“deribit_price_index.btc_usd”}

而我想看到

请指点。

EN

回答 2

Stack Overflow用户

发布于 2020-04-19 12:30:36

我只使用了“旋风”的websockets,但它们运行得很好,“旋风”有许多处理异步代码的助手:

代码语言:javascript
运行
复制
import json
import tornado
from tornado.ioloop import PeriodicCallback
from tornado.websocket import websocket_connect


class EchoWebsocket:

    def __init__(self, url, client_id=None, client_secret=None):
        self.url = url
        self.client_id = client_id
        self.client_secret = client_secret
        self.websocket = None

    async def connect(self):
        if not self.websocket:
            self.websocket = await websocket_connect(self.url)

    async def close(self):
        await self.websocket.close()
        self.websocket = None

    async def read(self):
        return await self.websocket.read_message()

    async def write(self, message):
        await self.websocket.write_message(message)


class DERIBIT:

    def __init__(self):
        self.ws = EchoWebsocket(url='wss://test.deribit.com/ws/api/v2')
        self.request = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "id": 42,
            "params": {
                "channels": ["deribit_price_index.btc_usd"]}
        }
        self.callback = PeriodicCallback(self.get_ticks, 1000)
        self.callback.start()

    async def get_ticks(self):
        if not self.ws.websocket:
            await self.ws.connect()
        await self.ws.write(json.dumps(self.request))
        response = await self.ws.read()
        print(response)


if __name__ == "__main__":
    deribit = DERIBIT()
    tornado.ioloop.IOLoop.current().start()

输出:

代码语言:javascript
运行
复制
{"jsonrpc":"2.0","id":42,"result":["deribit_price_index.btc_usd"],"usIn":1587298852138977,"usOut":1587298852139023,"usDiff":46,"testnet":true}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587298851526,"price":7173.46,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587298852533,"price":7173.53,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","id":42,"result":["deribit_price_index.btc_usd"],"usIn":1587298852932540,"usOut":1587298852932580,"usDiff":40,"testnet":true}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587298852533,"price":7173.53,"index_name":"btc_usd"}}}

如果将websocket集成到DERIBIT类中,而不是为其创建单独的类,那么上面的示例可以简化很多。

票数 1
EN

Stack Overflow用户

发布于 2020-04-23 15:12:02

问题就在功能上

第一次运行loop.run_until_complete,直到将来完成完成

这意味着您的函数将只运行一个响应。run_until_complete不是callback函数!

所以在你的例子中,main

deribit.get_ticks() ->运行未来的实例__async__get_ticks

所以__async__get_ticks就是任务:让我们看看任务是做什么的:

代码语言:javascript
运行
复制
 1.open ws connection:
代码语言:javascript
运行
复制
 2.send request
代码语言:javascript
运行
复制
 3.wait the response of the ws
代码语言:javascript
运行
复制
 4. print(response)

here the task is done,这就是为什么你只看到一行

代码语言:javascript
运行
复制
   async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         response = await echo.receive()
         print(response)

解释之后:解决方案将很简单:需要用时间包装行response

代码语言:javascript
运行
复制
async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         while True:
                response = await echo.receive()
                print(response)

输出

代码语言:javascript
运行
复制
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654476817,"price":7540.54,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654477824,"price":7540.52,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654478831,"price":7540.15,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654479838,"price":7539.83,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654480845,"price":7539.2,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654481852,"price":7538.96,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654482859,"price":7538.9,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654483866,"price":7538.89,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654484873,"price":7538.47,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654485880,"price":7537.15,"index_name":"btc_usd"}}}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61216022

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档