首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >股票实时流计算

股票实时流计算

作者头像
spark
发布2018-12-20 11:45:25
2K0
发布2018-12-20 11:45:25
举报
文章被收录于专栏:数据科学数据科学

获取行情数据并发消息到kafka

In [1]:

import easyquotation
import pandas as pd
import numpy as np
import time
from confluent_kafka import Producer,Consumer


quanta_list = []
quotation_engine = easyquotation.use('sina')
def foo():
    while True:
        q1 = quotation_engine.all
        df = pd.DataFrame(q1).T
        p.produce('test-quant',df.to_msgpack())
        time.sleep(10)

In [2]:

q1 = quotation_engine.all
df = pd.DataFrame(q1).T

定义数据流

In [3]:

from streamz.dataframe import DataFrame
from streamz import Stream
conf = {'bootstrap.servers': 'localhost:9092',
        'message.max.bytes': 524288000,
        'group.id': 'hi', 'session.timeout.ms': 6000,
#         'on_commit': lambda x:x,
#         'on_assign':lambda x:x,
        'default.topic.config': {'auto.offset.reset': 'smallest'},#smallest,earliest
       }

def isdf(x):
    return type(x)==pd.core.frame.DataFrame

source = Stream.from_kafka(['test-quant'],consumer_params=conf,start=True)
quant = source.map(pd.read_msgpack).filter(isdf)
quant.map(lambda df:df.head()).map(lambda df:df[['now','open']]).sink(display)

var element = $('#61901593-c697-4e0e-ad17-c8f2c3fae6ae'); {"model_id": "8629bab4ae2a42fe908a3fe8b82354c0", "version_major": 2, "version_minor": 0}

定义流算法

In [4]:

def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmycode(df):
    return df[df.code.str.startswith('150')]

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def mygroup(df):
    return df.pivot_table(index=['code','name'],values=['jiange'],aggfunc=np.average).sort_values('jiange').tail(10)

quant.map(to_my_df).map(getmydf).map(getmycode).sliding_window(1).map(pd.concat).map(mygroup).sink(display)

var element = $('#505e5b67-4fc6-4bed-a0d8-b1c3d9addda1'); {"model_id": "90191a8811c34609a599fa1b8d6af22d", "version_major": 2, "version_minor": 0}

启动行情数据查看结果

In [5]:

p = Producer( {'bootstrap.servers': 'localhost:9092','message.max.bytes': 5242880})
p.produce('test-quant',df.to_msgpack())

流计算过程的可视化

In [6]:

source.visualize()

.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }

now

open

000001

10.47

10.49

000002

25.36

24.81

000003

0

0

000004

16.78

16.7

000005

3.01

3

.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }

jiange

code

name

150130

"医药A

0.508

150344

"证券B基

0.533

150172

"证券B

0.549

150265

"一带A

0.552

150241

"银行A级

0.553

150209

"国企改A

0.566

150198

"食品A

0.572

150051

"沪深300A

0.583

150221

"中航军A

0.585

150028

"中证500A

0.821

Out[6]:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年11月19日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 获取行情数据并发消息到kafka¶
  • 定义数据流¶
  • 定义流算法¶
  • 启动行情数据查看结果¶
  • 流计算过程的可视化¶
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档