前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis流计算

redis流计算

作者头像
spark
发布2018-12-20 11:53:02
1.4K0
发布2018-12-20 11:53:02
举报
文章被收录于专栏:数据科学数据科学

设计概要:

  • 把数据流形象话的比作水流
  • 使用redis流和流的存储功能做水库,分别设计进水和出水系统
  • 使用tornado可以同时支持多个进出水水管并行运行,互不干扰
  • 使用streamz库灵活实现加在进出水管上的算法,可以实现限速rate_limit、过滤filter、批处理map,合并zip,缓冲buffer等特性

使用类库

使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本

In [20]:

代码语言:javascript
复制
import os
import weakref

import time
import tornado.ioloop
from tornado import gen

from streamz.core import Stream, convert_interval
import pandas as pd

class Source(Stream):
    _graphviz_shape = 'doubleoctagon'

进水口设计

异步定时循环任务,不听生成数据,并将数据push到redis

In [21]:

代码语言:javascript
复制
@Stream.register_api(staticmethod)
class engine(Source):
  
    def __init__(self, topic, 
                 push_interval=1, 
                 start=False,
                 func=lambda:{'time':time.time()},
                 asyncflag=False,
                 threadcount=5,
                 **kwargs):
        
        self.producer = None
        self.topic = topic
        self.push_interval = push_interval
        self.func=func
        self.asyncflag = asyncflag
        if self.asyncflag:
            from concurrent.futures import ThreadPoolExecutor
            self.thread_pool = ThreadPoolExecutor(threadcount)
            
        super(engine, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_push(self):
        if self.producer is not None:
            msg = self.producer.add(self.func())
            if msg :
                return msg

    @gen.coroutine
    def push_redis(self):
        while True:
            if self.asyncflag:
                val = self.thread_pool.submit(self.do_push)
            else:
                val = self.do_push()
            yield gen.sleep(self.push_interval)
            if self.stopped:
                break

    def start(self):
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.producer = self.db.Stream(self.topic)
            self.stopped = False
            self.loop.add_callback(self.push_redis)
           

    def stop(self):
        if self.producer is not None:
            self.producer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水口设计

从redis读取流数据生成stream对象

In [22]:

代码语言:javascript
复制
@Stream.register_api(staticmethod)
class from_redis(Source):
  
    def __init__(self, topics, poll_interval=0.1, start=False,group='test',
                 **kwargs):
        self.consumer = None
        self.topics = topics
        self.group=group
        self.poll_interval = poll_interval
        super(from_redis, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_poll(self):
        if self.consumer is not None:
            msg = self.consumer.read()
            if msg:
                return msg

    @gen.coroutine
    def poll_redis(self):
        while True:
            val = self.do_poll()
            if val:
                yield self._emit(val)
            else:
                yield gen.sleep(self.poll_interval)
            if self.stopped:
                break

    def start(self):
        import confluent_kafka as ck
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.consumer = self.db.consumer_group(self.group, self.topics)
            self.consumer.create()  # Create the consumer group.
#             self.consumer.set_id('$')#不会从头读
            self.stopped = False
            self.loop.add_callback(self.poll_redis)
           

    def stop(self):
        if self.consumer is not None:
            self.consumer.destroy() 
            self.consumer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水管算法

In [50]:

代码语言:javascript
复制
def parse(meta_msg):
    topic,msg = meta_msg[0],meta_msg[1][0]
    msg_id,msg_body = msg
    return msg_body

def ismykey(byte_dict):
    return byte_dict.get(b'quant_df') != None

def isotherkey(byte_dict):
    return byte_dict.get(b'quant_df') == None
    
def get_quant(byte_dict):
    return byte_dict.get(b'quant_df')

def isdf(msg):
    return type(msg)==pd.core.frame.DataFrame

def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def get_index(df):
    return df[df.code.str.startswith('150')]

def my_sort(df):
    return df.sort_values('jiange').tail(10)

In [51]:

代码语言:javascript
复制
# source.stop()
source = Stream.from_redis(['stream-a'],group='new16', start=False)
mysource = source.flatten().map(parse)

from fn import F
otherkey = mysource.filter(isotherkey)
l = otherkey.sink_to_list()
squant = mysource.filter(ismykey).map(get_quant).map(pd.read_msgpack).filter(isdf)
sdf = squant.map(to_my_df).map(getmydf).map(get_index).map(lambda df:df.sample(15)).map(my_sort)
sdf

var element = $('#dcaf87a9-d898-43d6-a469-529c03163feb'); {"model_id": "9a9dc555feec40d78d394d2e13cd0429", "version_major": 2, "version_minor": 0}

In [52]:

代码语言:javascript
复制
source.start()

In [49]:

代码语言:javascript
复制
source.stop()

In [7]:

代码语言:javascript
复制
source.visualize()

Out[7]:

水泵设计

生成数据到原函数

In [26]:

代码语言:javascript
复制
def gen_quant():
    import easyquotation
    quotation_engine = easyquotation.use('sina')
    q1 = quotation_engine.all
    df = pd.DataFrame(q1).T
    return {'quant_df':df.to_msgpack()}

def gen_test():
    import moment as mm
    return {'test':mm.now().seconds}


def gen_block_test():
    import moment as mm
    import time
    time.sleep(6)
    return {'block_test':mm.now().seconds}

In [9]:

代码语言:javascript
复制
engine2 = engine(topic='stream-a',func=gen_test,push_interval=1)
engine2.start()

In [18]:

代码语言:javascript
复制
engine2.stop()

In [27]:

代码语言:javascript
复制
engine1 = Stream.engine(topic='stream-a',func=gen_quant,push_interval=5,asyncflag=True)
engine1.start()

In [53]:

代码语言:javascript
复制
engine1.stop()

In [ ]:

代码语言:javascript
复制
engine3 = Stream.engine(topic='stream-a',func=gen_block_test,push_interval=1,asyncflag=True,threadcount=10)
engine3.start()

In [ ]:

代码语言:javascript
复制
engine3.stop()

In [ ]:

代码语言:javascript
复制
engine3.stop()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年12月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用类库¶
  • 进水口设计¶
  • 出水口设计¶
  • 出水管算法¶
  • 水泵设计¶
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档