首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >与Pandas实时地将滴答数据转换为OHLC?

与Pandas实时地将滴答数据转换为OHLC?
EN

Stack Overflow用户
提问于 2021-12-23 21:13:12
回答 1查看 1.7K关注 0票数 1

我知道如何使用Python的Pandas重采样将静态刻度数据转换为OHLC烛台数据。

,但是我如何在实时中做到这一点(就像Websocket发送它一样)?

假设我正在使用这个while循环代码来准备滴答数据

代码语言:javascript
运行
复制
import time, requests

ohlc []
while True:
    r = requests.get('https://api1.binance.com/api/v3/ticker/price?symbol=ETHUSDT')
    resp_dict = r.json()
    time.time()
    print({'time' : time.time(), 'price' : resp_dict["price"]})

现在,我如何使用Pandas实时重采样这些数据(就像一个Websocket每秒都会向我们发送这个OHLC数据,这使得在实时中绘制烛台数据成为可能)?

提前谢谢

EN

回答 1

Stack Overflow用户

发布于 2022-02-02 17:58:21

我的意思是,除非你想要部署一个数据提供商业务,否则做一个独立的自主机器人是很酷的。实时的,你必须实际聚集OHLC酒吧现场。

  1. 通过WebSocket (或其他流API)进行的交易;->
  2. 排队等待dem滴答声;->
  3. 逻辑将em从队列中取下来,将em发送到聚合器逻辑中,然后将其从队列中弹出,这样我们就不会意外地多次处理一个滴答;->
  4. 逻辑聚合OHLCV现场;
  5. 判断何时开始构建新条的逻辑。

FTX交换& Python,15秒图表聚合。FTX有一个“正式”WebSocket客户,我对它进行了修改,以正确处理所有的滴答;

代码语言:javascript
运行
复制
def _handle_trades_message(self, message: Dict) -> None:
    self._trades[message['market']].extend(reversed(message['data']))

在那之后,我得到的是:

代码语言:javascript
运行
复制
import ftx
import time
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datetime                          import datetime

def process_tick(tick):
    global data
    global flag
    global frontier
    
    if (flag == True) and (tick['time'] >= frontier):
        start_time = datetime.utcnow().isoformat() #"almost"
        time_      = time.time() * 1000 # with higher precision
        op         = tick['price']
        hi         = tick['price']
        lo         = tick['price']
        cl         = tick['price']
        vol        = tick['size' ]
        row        = {'startTime' : start_time, 
                      'time'      : time_     , 
                      'open'      : op        , 
                      'high'      : hi        , 
                      'low'       : lo        , 
                      'close'     : cl        ,
                      'volume'    : vol        }
        data       = data.append(row, True)
        flag = False
        print('Opened')
        print(tick)
    else:
        
        if   (tick['price'] > data['high'].iloc[-1]):
            data['high'].iloc[-1] = tick['price']
        elif (tick['price'] < data['low' ].iloc[-1]):
            data['low' ].iloc[-1] = tick['price']

        data['close' ].iloc[-1]  = tick['price']
        data['volume'].iloc[-1] += tick['size' ]
        print(tick)
     
def on_tick():
    
    while True:

        try:

            try:
                process_tick(trades[-1])
                trades.pop()
            except IndexError:
                pass

            time.sleep(0.001)
        except KeyboardInterrupt:
            client_ws._reset_data()
            scheduler.remove_job('onClose')
            scheduler.shutdown()
            print('Shutdown')
            break
    
def on_close():
    global flag
    global frontier
    flag     = True
    frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
    print('Closed')

ASSET = 'LTC-PERP'
RES   = '15'

flag      = True
frontier  = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
cols      = ['startTime', 'time', 'open', 'high', 'low', 'close', 'volume']
data      = pd.DataFrame(columns=cols)
client_ws = ftx.FtxClientWs()
client_ws._subscribe({'channel': 'trades', 'market': ASSET})
trades    = client_ws._trades[ASSET]
scheduler = BackgroundScheduler()
scheduler.configure(timezone='utc')
scheduler.add_job(on_close, trigger='cron', second='0, 15, 30, 45', id='onClose')
scheduler.start()    
on_tick()

在我的文章中,只要让on_tick()在自己的线程中运行,data变量就会在后台不断更新并保存OHLCV,这样您就可以在同一个程序中使用这些数据了。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70467422

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档