redis流计算

设计概要:

  • 把数据流形象话的比作水流
  • 使用redis流和流的存储功能做水库,分别设计进水和出水系统
  • 使用tornado可以同时支持多个进出水水管并行运行,互不干扰
  • 使用streamz库灵活实现加在进出水管上的算法,可以实现限速rate_limit、过滤filter、批处理map,合并zip,缓冲buffer等特性

使用类库

使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本

In [20]:

import os
import weakref

import time
import tornado.ioloop
from tornado import gen

from streamz.core import Stream, convert_interval
import pandas as pd

class Source(Stream):
    _graphviz_shape = 'doubleoctagon'

进水口设计

异步定时循环任务,不听生成数据,并将数据push到redis

In [21]:

@Stream.register_api(staticmethod)
class engine(Source):
  
    def __init__(self, topic, 
                 push_interval=1, 
                 start=False,
                 func=lambda:{'time':time.time()},
                 asyncflag=False,
                 threadcount=5,
                 **kwargs):
        
        self.producer = None
        self.topic = topic
        self.push_interval = push_interval
        self.func=func
        self.asyncflag = asyncflag
        if self.asyncflag:
            from concurrent.futures import ThreadPoolExecutor
            self.thread_pool = ThreadPoolExecutor(threadcount)
            
        super(engine, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_push(self):
        if self.producer is not None:
            msg = self.producer.add(self.func())
            if msg :
                return msg

    @gen.coroutine
    def push_redis(self):
        while True:
            if self.asyncflag:
                val = self.thread_pool.submit(self.do_push)
            else:
                val = self.do_push()
            yield gen.sleep(self.push_interval)
            if self.stopped:
                break

    def start(self):
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.producer = self.db.Stream(self.topic)
            self.stopped = False
            self.loop.add_callback(self.push_redis)
           

    def stop(self):
        if self.producer is not None:
            self.producer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水口设计

从redis读取流数据生成stream对象

In [22]:

@Stream.register_api(staticmethod)
class from_redis(Source):
  
    def __init__(self, topics, poll_interval=0.1, start=False,group='test',
                 **kwargs):
        self.consumer = None
        self.topics = topics
        self.group=group
        self.poll_interval = poll_interval
        super(from_redis, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_poll(self):
        if self.consumer is not None:
            msg = self.consumer.read()
            if msg:
                return msg

    @gen.coroutine
    def poll_redis(self):
        while True:
            val = self.do_poll()
            if val:
                yield self._emit(val)
            else:
                yield gen.sleep(self.poll_interval)
            if self.stopped:
                break

    def start(self):
        import confluent_kafka as ck
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.consumer = self.db.consumer_group(self.group, self.topics)
            self.consumer.create()  # Create the consumer group.
#             self.consumer.set_id('$')#不会从头读
            self.stopped = False
            self.loop.add_callback(self.poll_redis)
           

    def stop(self):
        if self.consumer is not None:
            self.consumer.destroy() 
            self.consumer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水管算法

In [50]:

def parse(meta_msg):
    topic,msg = meta_msg[0],meta_msg[1][0]
    msg_id,msg_body = msg
    return msg_body

def ismykey(byte_dict):
    return byte_dict.get(b'quant_df') != None

def isotherkey(byte_dict):
    return byte_dict.get(b'quant_df') == None
    
def get_quant(byte_dict):
    return byte_dict.get(b'quant_df')

def isdf(msg):
    return type(msg)==pd.core.frame.DataFrame

def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def get_index(df):
    return df[df.code.str.startswith('150')]

def my_sort(df):
    return df.sort_values('jiange').tail(10)

In [51]:

# source.stop()
source = Stream.from_redis(['stream-a'],group='new16', start=False)
mysource = source.flatten().map(parse)

from fn import F
otherkey = mysource.filter(isotherkey)
l = otherkey.sink_to_list()
squant = mysource.filter(ismykey).map(get_quant).map(pd.read_msgpack).filter(isdf)
sdf = squant.map(to_my_df).map(getmydf).map(get_index).map(lambda df:df.sample(15)).map(my_sort)
sdf

var element = $('#dcaf87a9-d898-43d6-a469-529c03163feb'); {"model_id": "9a9dc555feec40d78d394d2e13cd0429", "version_major": 2, "version_minor": 0}

In [52]:

source.start()

In [49]:

source.stop()

In [7]:

source.visualize()

Out[7]:

水泵设计

生成数据到原函数

In [26]:

def gen_quant():
    import easyquotation
    quotation_engine = easyquotation.use('sina')
    q1 = quotation_engine.all
    df = pd.DataFrame(q1).T
    return {'quant_df':df.to_msgpack()}

def gen_test():
    import moment as mm
    return {'test':mm.now().seconds}


def gen_block_test():
    import moment as mm
    import time
    time.sleep(6)
    return {'block_test':mm.now().seconds}

In [9]:

engine2 = engine(topic='stream-a',func=gen_test,push_interval=1)
engine2.start()

In [18]:

engine2.stop()

In [27]:

engine1 = Stream.engine(topic='stream-a',func=gen_quant,push_interval=5,asyncflag=True)
engine1.start()

In [53]:

engine1.stop()

In [ ]:

engine3 = Stream.engine(topic='stream-a',func=gen_block_test,push_interval=1,asyncflag=True,threadcount=10)
engine3.start()

In [ ]:

engine3.stop()

In [ ]:

engine3.stop()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 旅行搜索词权重分析

    .dataframe tbody tr th:only-of-type { vertical-align: middle; ...

    spark
  • 股票实时流计算

    var element = $('#61901593-c697-4e0e-ad17-c8f2c3fae6ae'); {"model_id": "8629...

    spark
  • python流数据动态可视化

    “流数据”是连续生成的数据,通常由某些外部源(如远程网站,测量设备或模拟器)生成。这种数据在金融时间序列,Web服务器日志,科学应用程序和许多其他情况下很常见。...

    spark
  • python pyqt5 卡通人物形状窗体

    import sys from PyQt5.QtWidgets import QApplication, QWidget from PyQt5.QtGui ...

    用户5760343
  • Day21.Python项目案例

    1、批量修改文件名 2、发送邮件 --- 打卡领取 可配置。 3、批量合成头像图片 4、股票查询程序开发 --- 打...

    DataScience
  • 12 | Tornado源码分析:BaseIOStream 对象(上)

    hello 大家好 通过前几期我们已经聊了 Tornado 中协程的创建、运行,本期我们开始聊聊 tornado 中 网络读写数据处理相关的内容,这部分还是比较...

    还是牛6504957
  • 福利来啦,送给大家一个小游戏的源码,不要错过哟(复制粘贴运行即可玩)

    从小到大玩过很多的游戏,在我小时候,能玩游戏的地方不多,那时玩游戏都是偷摸玩的,只要是个游戏就觉得非常有趣,比较经典的有魂斗罗,拳皇,超级玛丽,贪吃蛇,俄罗斯方...

    松鼠爱吃饼干
  • 喜欢的歌曲不在一个平台怎么办?你需要一个自己专属的音乐播放器

    网易云音乐,QQ音乐,酷狗音乐,是我们经常会用到的音乐软件,当然有时候我们因为一首歌,需要在各大音乐平台上跳转,那么我们完全可以使用python自己打造一款音乐...

    松鼠爱吃饼干
  • 500 行代码写一个俄罗斯方块游戏

    俄罗斯方块游戏是世界上最流行的游戏之一。是由一名叫Alexey Pajitnov的俄罗斯程序员在1985年制作的,从那时起,这个游戏就风靡了各个游戏平台。

    程序猿DD
  • RL实践3——为Agent添加Policy、记忆功能

    在实践2中,介绍了gym环境的定义和使用方法。 在实践1中,介绍了 动态规划DP 求解 价值函数 并没有形成一个策略Policy\(\pi\)来指导agen...

    列夫托尔斯昊

扫码关注云+社区

领取腾讯云代金券