前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >码农技术炒股之路——实时交易信息、主力动向信息分库备份

码农技术炒股之路——实时交易信息、主力动向信息分库备份

作者头像
方亮
发布2019-01-16 15:40:09
4470
发布2019-01-16 15:40:09
举报
文章被收录于专栏:方亮方亮

        一般来说,一个股票信息应该保存在一张表中。但是由于我机器资源限制,且我希望尽快频率的抓取数据。所以每天我将所有股票的实时交易信息放在daily_temp库中的一个以日期命名的表中。主力动向信息也是如此。但是盘后分析股票时,我们会以单只股票进行分析。这样就需要跨越很多天,而这样的设计将导致需要查询若干个表,且随着日期增加,读取的表也将增加。我觉得这样是不合适的。(转载请指明出于breaksoftware的csdn博客)

        目前我们系统繁忙的时间和交易时间同步。为了最大幅度的利用资源,我决定在盘后对每日的数据按照股票代码进行拆分备份。这样我们就可以查询一张表得到该股票所有历史数据。

拆分备份实时交易信息

        首先我们要从股票基本信息表中读取所有股票代码

代码语言:javascript
复制
    def _get_all_share_ids(self):
        date_info = time.strftime('%Y_%m_%d')
        trade_table_name = "trade_info_%s" % (date_info)
        share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))
        return share_ids

        fetch_data是我用于封装获取数据的接口。其中包含通过网络获取数据,通过数据库获取数据和通过正则拆分数据

代码语言:javascript
复制
class select_db:
    def __init__(self, conn_name, table_name, select_columns, conditions, pre="", extend=""):
        self._conn_name = conn_name
        self._table_name = table_name
        self._select_columns = select_columns
        self._conditions = conditions
        self._pre = pre
        self._extend = extend

    def get_data(self):
        db_manager = mysql_manager()
        conn = db_manager.get_mysql_conn(self._conn_name)
        result = conn.select(self._table_name, self._select_columns, self._conditions, self._pre, self._extend)
        return result

class query_http:
    def __init__(self, url):
        self._url = url

    def get_data(self):
        res = ""
        tried = False
        while True:
            try:
                socket.setdefaulttimeout(15)
                req = urllib2.Request(self._url)
                res_data = urllib2.urlopen(req)
                res = res_data.read()
                break
            except Exception as e:
                LOG_ERROR("request error: %s %s"  % (self._url ,e))
                if tried:
                    break
                else:
                    tried = True
        return res

class regular_split:
    def __init__(self, regular_name, data):
        self._regular_name = regular_name
        self._data = data

    def get_data(self):
        regular_split_mgr = regular_split_manager()
        ret_array = regular_split_mgr.get_split_data(self._data, self._regular_name)
        return ret_array

def get_data(query_item):
    if False == hasattr(query_item, "get_data"):
        return None
    result = query_item.get_data()
    return result

        下一步通过股票代码查询当天所有数据

代码语言:javascript
复制
    def _bak_trade_info(self, share_id):
        date_info = time.strftime('%Y_%m_%d')
        table_name = "trade_info_%s" % (date_info)
        db_manager = mysql_manager()
        conn = db_manager.get_mysql_conn(self._daily_temp_conn_name)
        
        fields_array = ["today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price",
                "trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price",
                "buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price",
                "sale_5_num","sale_5_price","time_date_str","time_str"]
        daily_data = conn.select(table_name, fields_array, {"share_id":[share_id, "="]})
        self._bak_single_market_maker_info(share_id, daily_data)

        由于抓取时间和数据源时间存在差异,所以我们可能会抓取到交易时间之外的数据。于是我们要对这些数据进行归一化处理。比如我们有11.29、11:31和11:32三个数据,则对交易时间之外的数据11:31和11:32数据归一为11:30的数据并保存。

代码语言:javascript
复制
    def _bak_single_market_maker_info(self, share_id, daily_data):
        daily_data_list = []
        has_between_11_30_and_13_00 = False
        after_15_00 = False
        keys_list = []
        for item in daily_data:
            item_list = list(item)
            date_str = item[-2] + " " + item[-1]
            
            today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" 
            today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"
            today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"
            today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))
            today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))
            today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))

            date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))
            if date_int >= today_11_30_int and date_int < today_13_00_int:
                if has_between_11_30_and_13_00:
                    continue
                else:
                    has_between_11_30_and_13_00 = True

            if date_int >= today_15_00_int:
                if after_15_00:
                    continue
                else:
                    after_15_00 = True

            if date_int in keys_list:
                continue
            else:
                keys_list.append(date_int)

            item_list.insert(0, date_int)
            del item_list[-1]
            del item_list[-1]
            daily_data_list.append(item_list)

        keys_array = ["time","today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price",
                "trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price",
                "buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price",
                "sale_5_num","sale_5_price"]

        share_trade_info_table_name = "trade_info_detail_" +share_id
        self._create_table_if_not_exist(share_id, share_trade_info_table_name)

        stock_conn_manager_obj = stock_conn_manager()
        conn = stock_conn_manager_obj.get_conn(share_id)
        conn.insert_data(share_trade_info_table_name, keys_array, daily_data_list)

        此处我们并没有使用直接检查并创建表的方式,而是使用了_create_table_if_not_exist方法

代码语言:javascript
复制
    def _create_table_if_not_exist(self, share_id, table_name):
        stock_conn_manager_obj = stock_conn_manager()
        conn_name = stock_conn_manager_obj.get_conn_name(share_id)
        prepare_table_obj = prepare_table(conn_name, "trade_info")
        prepare_table_obj.prepare(table_name)

        为什么要这么用?因为我们要将三千多支股票信息保存分片到300个不同的数据库中。那么当前这支股票在哪个库中,则需要一个中间层去代理管理。

代码语言:javascript
复制
@singleton
class stock_conn_manager():
    def __init__(self):
        pass

    def get_conn(self, share_id):
        conn_name = self.get_conn_name(share_id)
        db_manager = mysql_manager()
        conn = db_manager.get_mysql_conn(conn_name)
        return conn

    def get_conn_name(self, share_id):
        share_id_int = int(share_id)
        share_id_part_no = share_id_int % 300
        conn_name = "stock_part_%d" % (share_id_part_no)
        return conn_name

        stock_conn_manager类将股票代码和300取余数,得出分片ID。然后连接该ID对应的库。这层设计非常重要,因为不仅此处我们备份数据要用到,之后对全部股票进行分析时也要用到它。

拆分备份主力动向信息

        主要逻辑同实时交易信息。故只贴出代码

代码语言:javascript
复制
class bak_today_market_maker(job_base):
    def __init__(self):
        self._db_manager = mysql_manager()
        self._daily_temp_conn_name = "daily_temp"
    
    def run(self):
        share_ids = self._get_all_share_ids()
        for share_id  in share_ids:
            self._bak_market_maker_info(share_id[0])
        LOG_INFO("run bak_today_market_maker")
    
    def _bak_market_maker_info(self, share_id):
        date_info = time.strftime('%Y_%m_%d')
        table_name = "market_maker_%s" % (date_info)
        
        fields_array =["time_str", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per",
                "huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]
        
        daily_data = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, table_name, fields_array, {"share_id":[share_id, "="]}))
        self._bak_single_market_maker_info(share_id, daily_data)

    def _bak_single_market_maker_info(self, share_id, daily_data):
        daily_data_list = []
        has_between_11_30_and_13_00 = False
        after_15_00 = False
        keys_list = []
        for item in daily_data:
            item_list = list(item)
            date_str = item[0]
            
            today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" 
            today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"
            today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"
            today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))
            today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))
            today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))

            date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))
            if date_int >= today_11_30_int and date_int < today_13_00_int:
                if has_between_11_30_and_13_00:
                    continue
                else:
                    has_between_11_30_and_13_00 = True

            if date_int >= today_15_00_int:
                if after_15_00:
                    continue
                else:
                    after_15_00 = True

            if date_int in keys_list:
                continue
            else:
                keys_list.append(date_int)

            item_list[0] = date_int
            daily_data_list.append(item_list)
        keys_array =["time", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per",
                "huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]

        share_market_maker_table_name = "market_maker_detail_" + share_id
        self._create_table_if_not_exist(share_id, share_market_maker_table_name)

        stock_conn_manager_obj = stock_conn_manager()
        conn = stock_conn_manager_obj.get_conn(share_id)
        conn.insert_data(share_market_maker_table_name, keys_array, daily_data_list)

    def _get_all_share_ids(self):
        date_info = time.strftime('%Y_%m_%d')
        trade_table_name = "trade_info_%s" % (date_info)
        share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))
        return share_ids

    def _create_table_if_not_exist(self, share_id, table_name):
        stock_conn_manager_obj = stock_conn_manager()
        conn_name = stock_conn_manager_obj.get_conn_name(share_id)
        prepare_table_obj = prepare_table(conn_name, "market_maker")
        prepare_table_obj.prepare(table_name)

        实时交易和主力动向拆分备份的任务配置如下。因为这两个数据库比较大,我给每个任务留了一个小时的处理时间。

代码语言:javascript
复制
[bak_today_market_maker]
type=cron
class=bak_today_market_maker
day_of_week=1-5
hour=16
minute=50
timezone = Asia/Shanghai

[bak_today_trade]
type=cron
class=bak_today_trade
day_of_week=1-5
hour=15
minute=50
timezone = Asia/Shanghai
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年05月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 拆分备份实时交易信息
  • 拆分备份主力动向信息
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档