专栏首页数据科学股票实时流计算

股票实时流计算

获取行情数据并发消息到kafka

In [1]:

import easyquotation
import pandas as pd
import numpy as np
import time
from confluent_kafka import Producer,Consumer


quanta_list = []
quotation_engine = easyquotation.use('sina')
def foo():
    while True:
        q1 = quotation_engine.all
        df = pd.DataFrame(q1).T
        p.produce('test-quant',df.to_msgpack())
        time.sleep(10)

In [2]:

q1 = quotation_engine.all
df = pd.DataFrame(q1).T

定义数据流

In [3]:

from streamz.dataframe import DataFrame
from streamz import Stream
conf = {'bootstrap.servers': 'localhost:9092',
        'message.max.bytes': 524288000,
        'group.id': 'hi', 'session.timeout.ms': 6000,
#         'on_commit': lambda x:x,
#         'on_assign':lambda x:x,
        'default.topic.config': {'auto.offset.reset': 'smallest'},#smallest,earliest
       }

def isdf(x):
    return type(x)==pd.core.frame.DataFrame

source = Stream.from_kafka(['test-quant'],consumer_params=conf,start=True)
quant = source.map(pd.read_msgpack).filter(isdf)
quant.map(lambda df:df.head()).map(lambda df:df[['now','open']]).sink(display)

var element = $('#61901593-c697-4e0e-ad17-c8f2c3fae6ae'); {"model_id": "8629bab4ae2a42fe908a3fe8b82354c0", "version_major": 2, "version_minor": 0}

定义流算法

In [4]:

def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmycode(df):
    return df[df.code.str.startswith('150')]

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def mygroup(df):
    return df.pivot_table(index=['code','name'],values=['jiange'],aggfunc=np.average).sort_values('jiange').tail(10)

quant.map(to_my_df).map(getmydf).map(getmycode).sliding_window(1).map(pd.concat).map(mygroup).sink(display)

var element = $('#505e5b67-4fc6-4bed-a0d8-b1c3d9addda1'); {"model_id": "90191a8811c34609a599fa1b8d6af22d", "version_major": 2, "version_minor": 0}

启动行情数据查看结果

In [5]:

p = Producer( {'bootstrap.servers': 'localhost:9092','message.max.bytes': 5242880})
p.produce('test-quant',df.to_msgpack())

流计算过程的可视化

In [6]:

source.visualize()

.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }

now

open

000001

10.47

10.49

000002

25.36

24.81

000003

0

0

000004

16.78

16.7

000005

3.01

3

.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }

jiange

code

name

150130

"医药A

0.508

150344

"证券B基

0.533

150172

"证券B

0.549

150265

"一带A

0.552

150241

"银行A级

0.553

150209

"国企改A

0.566

150198

"食品A

0.572

150051

"沪深300A

0.583

150221

"中航军A

0.585

150028

"中证500A

0.821

Out[6]:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 朴素贝叶斯做文本分类

    .dataframe tbody tr th:only-of-type { vertical-align: middle; ...

    spark
  • redis流计算

    使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本

    spark
  • 旅行搜索词权重分析

    .dataframe tbody tr th:only-of-type { vertical-align: middle; ...

    spark
  • 用 Pandas 进行数据处理系列 二

    获取行操作df.loc[3:6]获取列操作df['rowname']取两列df[['a_name','bname']] ,里面需要是一个 list 不然会报错增...

    zucchiniy
  • Python中字段抽取、字段拆分、记录抽取

    1、字段抽取 字段抽取是根据已知列数据的开始和结束位置,抽取出新的列 字段截取函数:slice(start,stop) 注意:和数据结构的访问方式一样,开始位置...

    Erin
  • 【MathorCup】2020年 A题 无车承运人平台线路定价问题,特征间的相关性分析

    问题 1:通过定量分析的方法,研究影响无车承运人平台进行货运线路定价的主要因素有哪些,并说明理由。 问题 2:根据附件 1 数据,通过建立数学模型,对已经成交...

    不太灵光的程序员
  • Pandas进阶修炼120题,给你深度和广度的船新体验

    本文为你介绍Pandas基础、Pandas数据处理、金融数据处理等方面的一些习题。

    数据派THU
  • Day05| 第四期-电商数据分析

    疫情期间,想必我们会增加网上购物,人们的生活越来越数字化。当我们消费时,无论是线上和线下都会产生大量的交易数据,对于商家来说数字化的运营方式非常必要,从大量的交...

    DataScience
  • Pandas 数据分析: 3 种方法实现一个实用小功能

    与时间相关,自然第一感觉便是转化为datetime格式,这里需要注意:需要首先将两列转化为 str 类型。

    double
  • 使用pandas筛选出指定列值所对应的行

    该方法其实就是找出每一行中符合条件的真值(true value),如找出列A中所有值等于foo

    星星在线

扫码关注云+社区

领取腾讯云代金券