一般来说,一个股票信息应该保存在一张表中。但是由于我机器资源限制,且我希望尽快频率的抓取数据。所以每天我将所有股票的实时交易信息放在daily_temp库中的一个以日期命名的表中。主力动向信息也是如此。但是盘后分析股票时,我们会以单只股票进行分析。这样就需要跨越很多天,而这样的设计将导致需要查询若干个表,且随着日期增加,读取的表也将增加。我觉得这样是不合适的。(转载请指明出于breaksoftware的csdn博客)
目前我们系统繁忙的时间和交易时间同步。为了最大幅度的利用资源,我决定在盘后对每日的数据按照股票代码进行拆分备份。这样我们就可以查询一张表得到该股票所有历史数据。
首先我们要从股票基本信息表中读取所有股票代码
def _get_all_share_ids(self):
date_info = time.strftime('%Y_%m_%d')
trade_table_name = "trade_info_%s" % (date_info)
share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))
return share_ids
fetch_data是我用于封装获取数据的接口。其中包含通过网络获取数据,通过数据库获取数据和通过正则拆分数据
class select_db:
def __init__(self, conn_name, table_name, select_columns, conditions, pre="", extend=""):
self._conn_name = conn_name
self._table_name = table_name
self._select_columns = select_columns
self._conditions = conditions
self._pre = pre
self._extend = extend
def get_data(self):
db_manager = mysql_manager()
conn = db_manager.get_mysql_conn(self._conn_name)
result = conn.select(self._table_name, self._select_columns, self._conditions, self._pre, self._extend)
return result
class query_http:
def __init__(self, url):
self._url = url
def get_data(self):
res = ""
tried = False
while True:
try:
socket.setdefaulttimeout(15)
req = urllib2.Request(self._url)
res_data = urllib2.urlopen(req)
res = res_data.read()
break
except Exception as e:
LOG_ERROR("request error: %s %s" % (self._url ,e))
if tried:
break
else:
tried = True
return res
class regular_split:
def __init__(self, regular_name, data):
self._regular_name = regular_name
self._data = data
def get_data(self):
regular_split_mgr = regular_split_manager()
ret_array = regular_split_mgr.get_split_data(self._data, self._regular_name)
return ret_array
def get_data(query_item):
if False == hasattr(query_item, "get_data"):
return None
result = query_item.get_data()
return result
下一步通过股票代码查询当天所有数据
def _bak_trade_info(self, share_id):
date_info = time.strftime('%Y_%m_%d')
table_name = "trade_info_%s" % (date_info)
db_manager = mysql_manager()
conn = db_manager.get_mysql_conn(self._daily_temp_conn_name)
fields_array = ["today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price",
"trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price",
"buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price",
"sale_5_num","sale_5_price","time_date_str","time_str"]
daily_data = conn.select(table_name, fields_array, {"share_id":[share_id, "="]})
self._bak_single_market_maker_info(share_id, daily_data)
由于抓取时间和数据源时间存在差异,所以我们可能会抓取到交易时间之外的数据。于是我们要对这些数据进行归一化处理。比如我们有11.29、11:31和11:32三个数据,则对交易时间之外的数据11:31和11:32数据归一为11:30的数据并保存。
def _bak_single_market_maker_info(self, share_id, daily_data):
daily_data_list = []
has_between_11_30_and_13_00 = False
after_15_00 = False
keys_list = []
for item in daily_data:
item_list = list(item)
date_str = item[-2] + " " + item[-1]
today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00"
today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"
today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"
today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))
today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))
today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))
date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))
if date_int >= today_11_30_int and date_int < today_13_00_int:
if has_between_11_30_and_13_00:
continue
else:
has_between_11_30_and_13_00 = True
if date_int >= today_15_00_int:
if after_15_00:
continue
else:
after_15_00 = True
if date_int in keys_list:
continue
else:
keys_list.append(date_int)
item_list.insert(0, date_int)
del item_list[-1]
del item_list[-1]
daily_data_list.append(item_list)
keys_array = ["time","today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price",
"trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price",
"buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price",
"sale_5_num","sale_5_price"]
share_trade_info_table_name = "trade_info_detail_" +share_id
self._create_table_if_not_exist(share_id, share_trade_info_table_name)
stock_conn_manager_obj = stock_conn_manager()
conn = stock_conn_manager_obj.get_conn(share_id)
conn.insert_data(share_trade_info_table_name, keys_array, daily_data_list)
此处我们并没有使用直接检查并创建表的方式,而是使用了_create_table_if_not_exist方法
def _create_table_if_not_exist(self, share_id, table_name):
stock_conn_manager_obj = stock_conn_manager()
conn_name = stock_conn_manager_obj.get_conn_name(share_id)
prepare_table_obj = prepare_table(conn_name, "trade_info")
prepare_table_obj.prepare(table_name)
为什么要这么用?因为我们要将三千多支股票信息保存分片到300个不同的数据库中。那么当前这支股票在哪个库中,则需要一个中间层去代理管理。
@singleton
class stock_conn_manager():
def __init__(self):
pass
def get_conn(self, share_id):
conn_name = self.get_conn_name(share_id)
db_manager = mysql_manager()
conn = db_manager.get_mysql_conn(conn_name)
return conn
def get_conn_name(self, share_id):
share_id_int = int(share_id)
share_id_part_no = share_id_int % 300
conn_name = "stock_part_%d" % (share_id_part_no)
return conn_name
stock_conn_manager类将股票代码和300取余数,得出分片ID。然后连接该ID对应的库。这层设计非常重要,因为不仅此处我们备份数据要用到,之后对全部股票进行分析时也要用到它。
主要逻辑同实时交易信息。故只贴出代码
class bak_today_market_maker(job_base):
def __init__(self):
self._db_manager = mysql_manager()
self._daily_temp_conn_name = "daily_temp"
def run(self):
share_ids = self._get_all_share_ids()
for share_id in share_ids:
self._bak_market_maker_info(share_id[0])
LOG_INFO("run bak_today_market_maker")
def _bak_market_maker_info(self, share_id):
date_info = time.strftime('%Y_%m_%d')
table_name = "market_maker_%s" % (date_info)
fields_array =["time_str", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per",
"huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]
daily_data = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, table_name, fields_array, {"share_id":[share_id, "="]}))
self._bak_single_market_maker_info(share_id, daily_data)
def _bak_single_market_maker_info(self, share_id, daily_data):
daily_data_list = []
has_between_11_30_and_13_00 = False
after_15_00 = False
keys_list = []
for item in daily_data:
item_list = list(item)
date_str = item[0]
today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00"
today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"
today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"
today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))
today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))
today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))
date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))
if date_int >= today_11_30_int and date_int < today_13_00_int:
if has_between_11_30_and_13_00:
continue
else:
has_between_11_30_and_13_00 = True
if date_int >= today_15_00_int:
if after_15_00:
continue
else:
after_15_00 = True
if date_int in keys_list:
continue
else:
keys_list.append(date_int)
item_list[0] = date_int
daily_data_list.append(item_list)
keys_array =["time", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per",
"huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]
share_market_maker_table_name = "market_maker_detail_" + share_id
self._create_table_if_not_exist(share_id, share_market_maker_table_name)
stock_conn_manager_obj = stock_conn_manager()
conn = stock_conn_manager_obj.get_conn(share_id)
conn.insert_data(share_market_maker_table_name, keys_array, daily_data_list)
def _get_all_share_ids(self):
date_info = time.strftime('%Y_%m_%d')
trade_table_name = "trade_info_%s" % (date_info)
share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))
return share_ids
def _create_table_if_not_exist(self, share_id, table_name):
stock_conn_manager_obj = stock_conn_manager()
conn_name = stock_conn_manager_obj.get_conn_name(share_id)
prepare_table_obj = prepare_table(conn_name, "market_maker")
prepare_table_obj.prepare(table_name)
实时交易和主力动向拆分备份的任务配置如下。因为这两个数据库比较大,我给每个任务留了一个小时的处理时间。
[bak_today_market_maker]
type=cron
class=bak_today_market_maker
day_of_week=1-5
hour=16
minute=50
timezone = Asia/Shanghai
[bak_today_trade]
type=cron
class=bak_today_trade
day_of_week=1-5
hour=15
minute=50
timezone = Asia/Shanghai