前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python量化数据仓库搭建系列3:数据落库代码封装

Python量化数据仓库搭建系列3:数据落库代码封装

原创
作者头像
算法爱好者
修改2021-10-28 10:00:07
9490
修改2021-10-28 10:00:07
举报
文章被收录于专栏:Python量化

Python量化数据仓库搭建系列3:数据落库代码封装

本系列教程为量化开发者,提供本地量化金融数据仓库的搭建教程与全套源代码。我们以恒有数(UDATA)金融数据社区为数据源,将金融基础数据落到本地数据库。教程提供全套源代码,包括历史数据下载与增量数据更新,数据更新任务部署与日常监控等操作。

在上一节讲述中,我们封装了Python操作MySQL数据库的自定义类,存为MySQLOperation.py文件;本节内容操作数据库部分,将会调用MySQLOperation中的方法,以及pandas.to_sql和pandas.read_sql的操作。

一、恒有数(UDATA)操作简介

1、获取Token

A、在恒有数官网(https://udata.hs.net)注册并登录,在订阅页面,下单免费的体验套餐;

B、在右上角,头像下拉菜单中,进入总览页面,复制Token;

C、在数据页面,查看数据接口文档,获取接口名称、请求参数、返回参数和Python代码示例;

2、安装hs_udata
代码语言:javascript
复制
pip install hs_udata

使用示例如下:

代码语言:python
代码运行次数:0
复制
import hs_udata as hs
# 设置Token
hs.set_token(token = 'xxxxxxxxxxxxxxx')  # 总览页面获取个人Token
# 以分钟线行情为例,获取000001.SZ在2021-05-01至2021-06-01期间的分钟线数据
# 接口文档见:https://udata.hs.net/datas/342/
df = hs.stock_quote_minutes(en_prod_code="000001.SZ",begin_date="20210501",end_date="20210601")
# 增加股票代码列
df['hs_code']='000001.SZ'
df.head()

其余接口使用过程与之类似;

二、数据落库示例

以股票列表(stock_list)为例,讲解建表、落库、查询等操作;全套代码见本文第三章;

1、准备工作

(1)在MySQL数据库中,创建数据库udata,创建过程见第一讲《Python量化数据仓库搭建系列1:数据库安装与操作》;

(2)在MySQL数据库中,创建数据更新记录表udata.tb_update_records,表结构如下:

建表SQL如下:

代码语言:sql
复制
CREATE TABLE udata.tb_update_records (
                table_name CHAR(40),
                data_date CHAR(20),
                update_type CHAR(20),
                data_number INT(20),
                elapsed_time INT(20),
                updatetime CHAR(20)
                )

(3)将Token与数据库参数写入配置文件DB_MySQL.config,文件内容如下:

代码语言:javascript
复制
[udata]
token='你的Token'
host='127.0.0.1'
port=3306
user='root'
passwd='密码'
db='udata'

(4)读取配置文件中的参数

代码语言:python
代码运行次数:0
复制
import configparser
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_MySQL.config'
section = 'udata'
config = configparser.ConfigParser()
config.read(configFilePath)
# 读取 恒有数(UData) 的 token
token = eval(config.get(section=section, option='token'))
# MySQL连接参数读取
host = eval(config.get(section=section, option='host'))
port = int(config.get(section=section, option='port'))
db = eval(config.get(section=section, option='db'))
user = eval(config.get(section=section, option='user'))
passwd = eval(config.get(section=section, option='passwd'))
2、建表

在社区数据页面,查看数据表返回参数字段,以股票列表(stock_list,https://udata.hs.net/datas/202/)为例:

将建表、删除表格、清空数据的SQL,写入配置文件DB_Table.config,文件内容如下:

代码语言:python
代码运行次数:0
复制
[tb_stock_list]
DROP_TABLE='DROP TABLE IF EXISTS tb_stock_list'
CREATE_TABLE='''CREATE TABLE tb_stock_list (
                secu_code CHAR(20),
                hs_code CHAR(20),
                secu_abbr CHAR(20),
                chi_name CHAR(40),
                secu_market CHAR(20), 
                listed_state CHAR(20),
                listed_sector CHAR(20),
                updatetime CHAR(20)
                )'''
DELETE_DATA = 'truncate table tb_stock_list'

python建表代码如下,MySQLOperation为上一讲封装的方法。

代码语言:python
代码运行次数:0
复制
# MySQL操作实例化,pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
MySQL = MySQLOperation(host, port, db, user, passwd)
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_Table.config'
section = 'tb_stock_list'
config = configparser.ConfigParser()
config.read(configFilePath)
DROP_TABLE = eval(config.get(section=section, option='DROP_TABLE'))
CREATE_TABLE = eval(config.get(section=section, option='CREATE_TABLE'))
DELETE_DATA = eval(config.get(section=section, option='DELETE_DATA'))
# 删除表结构
MySQL.Execute_Code(DROP_TABLE)
# 创建表结构
MySQL.Execute_Code(CREATE_TABLE)
3、数据落库
代码语言:python
代码运行次数:0
复制
import hs_udata as hs
from sqlalchemy import create_engine
from datetime import datetime
# 设置token
hs.set_token(token)
# 获取 股票列表 数据
df = hs.stock_list()
# 在最后一列增加系统时间戳
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
MySQL.Execute_Code(DELETE_DATA)
# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user,passwd,host,port,db))
# 将数据写入到MySQL中的数据表
df.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')

在数据库中查看结果如下:

4、读取数据

代码语言:python
代码运行次数:0
复制
import pandas as pd
result = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)
print(result.head())
三、落库代码封装

将上述几步操作,封装到一起,定义并调用python中class类的属性和方法。代码中涉及主要技术点如下:

(1)使用pymysql、pandas.to_sql和pandas.read_sql操作MySQL数据库;

(2)使用class类的方法,集成建表、插入数据和查询数据的操作;

(3)使用配置文件的方式,从本地文件中,读取数据库参数与表操作的SQL代码;

(4)使用try容错机制,结合日志函数,将执行日志打印到本地的DB_MySQL_LOG.txt文件;

代码语言:python
代码运行次数:0
复制
import pandas as pd
import hs_udata as hs
from MySQLOperation import *
from sqlalchemy import create_engine
from datetime import datetime
import time
import configparser
import logging
import traceback
import warnings
warnings.filterwarnings("ignore")

class TB_Stock_List:

    def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name):
        # 创建日志
        self.logging = logging
        self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG
                                 , format='%(asctime)s - %(levelname)s - %(message)s')
        # 读取配置文件中,恒有数和数据库参数
        configFilePath = MySQL_Config
        self.section1 = BD_Name
        config = configparser.ConfigParser()
        config.read(configFilePath)
        # 读取 恒有数(UData) 的 token
        self.token = eval(config.get(section=self.section1, option='token'))
        # MySQL连接参数读取
        self.host = eval(config.get(section=self.section1, option='host'))
        self.port = int(config.get(section=self.section1, option='port'))
        self.db = eval(config.get(section=self.section1, option='db'))
        self.user = eval(config.get(section=self.section1, option='user'))
        self.passwd = eval(config.get(section=self.section1, option='passwd'))
        # pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
        self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd)
        # sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
        self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user
                                                                                      , self.passwd
                                                                                      , self.host
                                                                                      , self.port
                                                                                      , self.db))
        # 读取配置文件中,恒有数和数据库参数
        configFilePath = Table_Config
        self.section2 = Table_Name
        config = configparser.ConfigParser()
        config.read(configFilePath)
        self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE'))
        self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE'))
        self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))

        self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))

    def CREATE_TABLE(self):
        try:
            # 删除表结构
            self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0')
            self.MySQL.Execute_Code(self.DROP_TABLE_SQL)
            # 创建表结构
            self.MySQL.Execute_Code(self.CREATE_TABLE_SQL)
            self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2))
        except:
            self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())

    def UPDATE_DATA(self):
        try:
            # 设置token
            hs.set_token(self.token)
            time_start = time.time()  # 计时
            # 获取 股票列表 数据
            df = hs.stock_list()
            # 在最后一列增加系统时间戳
            dt = datetime.now()
            df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
            # 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
            self.MySQL.Execute_Code(self.DELETE_DATA_SQL)
            # 将数据写入到MySQL中的数据表
            df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append')
            time_end = time.time()  # 计时
            elapsed_time = round(time_end-time_start,2)
            # 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间
            self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records 
            VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2
                                                                   ,dt.strftime('%Y-%m-%d'),len(df),elapsed_time)
            self.MySQL.Execute_Code(self.RECORDS_SQL)
            self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2))
        except:
            self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())

    def READ_DATA(self, WhereCondition=''):
        try:
            result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2)
                                 + WhereCondition, con=self.engine)
            self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2))
            return result
        except:
            self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2))
            self.logging.debug(traceback.format_exc())
            return 0
        
if __name__ == '__main__':
    MySQL_Config = 'DB_MySQL.config'
    BD_Name = 'udata'
    Table_Config = 'DB_Table.config'
    Table_Name = 'tb_stock_list'
    # 实例化
    TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name)
    # 创建表结构
    TB_Stock_List_Main.CREATE_TABLE()
    # 更新数据
    TB_Stock_List_Main.UPDATE_DATA()
    # 读取数据
    data = TB_Stock_List_Main.READ_DATA()

四、代码及配置文件下载

(1)附件文件目录:

DB_MySQL.config

DB_Table.config

MySQLOperation.py

TB_Stock_List.py

(2)下载附件文件后,将配置文件DB_MySQL.config中的Token与数据库参数,修改为自己的参数;将代码放置在同一目录下执行。

下一节《Python量化数据仓库搭建系列4:股票数据落库》

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python量化数据仓库搭建系列3:数据落库代码封装
  • 一、恒有数(UDATA)操作简介
    • 1、获取Token
      • 2、安装hs_udata
      • 二、数据落库示例
        • 1、准备工作
          • 2、建表
            • 3、数据落库
              • 三、落库代码封装
              • 四、代码及配置文件下载
              相关产品与服务
              云数据库 SQL Server
              腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档