日线数据是股票每日收盘后的信息。这块数据不用实时抓取,所以并不占用宝贵的交易时间的资源。于是我们抓取完数据后直接往切片后的数据库中保存。(转载请指明出于breaksoftware的csdn博客)
我们先要获取今天有交易信息的股票代码。因为存在股票停牌的情况,所以不需要这类股票信息
def _get_all_share_ids(self):
date_info = time.strftime('%Y_%m_%d')
trade_table_name = "trade_info_%s" % (date_info)
share_ids = fetch_data.get_data(fetch_data.select_db("daily_temp", trade_table_name, ["share_id"],{}, pre = "distinct"))
return share_ids
然后通过股票ID从股票基本信息表中获取股票代码和市场类型等数据
def _get_all_share_ids_market_type(self):
share_ids = self._get_all_share_ids()
ids = []
for share_id in share_ids:
ids.append(share_id[0])
share_ids = fetch_data.get_data(fetch_data.select_db("stock_db", "share_base_info", ["share_id", "market_type"],{"share_id":[ids, "in"]}))
return share_ids
然后就是抓取和保存数据
class update_stock_daily_info(job_base):
def __init__(self):
pass
def run(self):
share_id_market_type = self._get_all_share_ids_market_type()
for id_market_type in share_id_market_type:
share_id = id_market_type[0]
market_type = id_market_type[1]
self._query_save_data(share_id, market_type)
LOG_INFO("run update_stock_daily_info")
这次我们数据从网易抓取。这儿要非常感谢网易,它提供一个通过指定起始和截止日期的接口拉取历史日线数据。如果起始和截止选择今天,则拉取的是今天的数据。
def _get_data(self, market_type, id, start_time, end_time):
url_format = """http://quotes.money.163.com/service/chddata.html?code=%d%s&start=%s&end=%s&fields=TCLOSE;HIGH;LOW;TOPEN;LCLOSE;PCHG;TURNOVER;VOTURNOVER;VATURNOVER"""
url = url_format % (market_type, id, start_time, end_time)
res = fetch_data.get_data(fetch_data.query_http(url))
#res = res.decode("gbk").encode("utf-8")
return res
最开始时,我们是一条数据都没有的,于是我们选择从1990年1月1日开始。之后我们有数据了,则从有数据的最后一天开始算起。
def _get_start_time(self, share_id, table_name):
stock_conn_manager_obj = stock_conn_manager()
conn_name = stock_conn_manager_obj.get_conn_name(share_id)
last_time = fetch_data.get_data(fetch_data.select_db(conn_name, table_name, ["time"], {}, extend="order by time desc limit 1"))
if len(last_time) > 0:
last_day = last_time[0][0]
tz = pytz.timezone('Asia/Shanghai')
last_day_obj = datetime.datetime.fromtimestamp(last_day, tz)
while True:
next_day_obj = last_day_obj + datetime.timedelta(days = 1)
if next_day_obj.weekday() < 5:
break
last_day_obj = next_day_obj
time_str = next_day_obj.strftime("%Y%m%d")
else:
time_str = "19900101"
return time.mktime(time.strptime(time_str, '%Y%m%d'))
获取一个区间的数据后,我们通过正则表达式对结果进行拆分
def _filter_data(self, data):
data = data.replace("None", "0")
filter_data = fetch_data.get_data(fetch_data.regular_split("quotes_money_163", data))
if len(filter_data) > 0:
del filter_data[0]
useful_data = []
for item in filter_data:
if int(item[-2]) == 0:
continue
time_str = item[0]
time_int = time.mktime(time.strptime(time_str,'%Y-%m-%d'))
item.insert(0, time_int)
del item[2]
del item[2]
useful_data.append(item)
return useful_data
最后将数据保存到对应的表中
def _save_data(self, share_id, table_name, data):
into_db_columns = ["time","time_str","today_close","today_high","today_low","today_open","yesterday_close","pchg","turnover_rate","volume","turnover"]
columns_count = len(into_db_columns)
for item in data:
if len(item) != columns_count:
LOG_INFO("%s length is not match for column length %d" %(str(item), columns_count))
continue
del item
if 0 == len(data):
return
stock_conn_manager_obj = stock_conn_manager()
conn = stock_conn_manager_obj.get_conn(share_id)
conn.insert_data(table_name, into_db_columns, data)
均线数据按类型分可以分为成交量均线和价格均线。按时间分可以分为5日、10日、20日、30日、60日、90日、120日、180日和360日均线。
为了方便计算,我引入了talib库
pip install TA-Lib -i http://pypi.douban.com/simple
首先获取所有股票代码以便之后枚举
class update_stock_daily_average_info(job_base):
def __init__(self):
pass
def run(self):
share_ids = self._get_all_share_ids()
for share_id_item in share_ids:
share_id = share_id_item[0]
self._update_average(share_id)
LOG_INFO("run update_stock_daily_average_info")
def _get_all_share_ids(self):
date_info = time.strftime('%Y_%m_%d')
trade_table_name = "trade_info_%s" % (date_info)
share_ids = fetch_data.get_data(fetch_data.select_db("daily_temp", trade_table_name, ["share_id"],{}, pre = "distinct"))
return share_ids
然后查询每支股票最后一次计算均线的日期。判断规则就是查看价格5日均线值是否为0。因为均线计算量非常大,所以我们不能野蛮的全部重算。每次都要基于上次计算成果进行增量计算。
def _get_ma_empty_start_time(self, share_id, table_name):
stock_conn_manager_obj = stock_conn_manager()
conn_name = stock_conn_manager_obj.get_conn_name(share_id)
last_time = fetch_data.get_data(fetch_data.select_db(conn_name, table_name, ["time"], {"close_ma5":[0, "="]}, extend="order by time asc limit 1"))
if len(last_time) > 0:
last_day = last_time[0][0]
tz = pytz.timezone('Asia/Shanghai')
last_day_obj = datetime.datetime.fromtimestamp(last_day, tz)
time_str = last_day_obj.strftime("%Y%m%d")
return time.mktime(time.strptime(time_str, '%Y%m%d'))
else:
return 0
因为我们代码中最多分析180日均线数据,所以日期要从上面函数得到的日前前推180日;如果之前没有180日数据,则返回最早的那天。如果是新股,则返回当日。
def _get_start_time(self, share_id, table_name, ma_empty_start_time):
stock_conn_manager_obj = stock_conn_manager()
conn_name = stock_conn_manager_obj.get_conn_name(share_id)
last_time = fetch_data.get_data(fetch_data.select_db(conn_name, table_name, ["time"], {"time":[ma_empty_start_time, "<="]}, extend="order by time desc limit 180"))
if len(last_time) > 0:
last_day = last_time[-1][0]
tz = pytz.timezone('Asia/Shanghai')
last_day_obj = datetime.datetime.fromtimestamp(last_day, tz)
time_str = last_day_obj.strftime("%Y%m%d")
return time.mktime(time.strptime(time_str, '%Y%m%d'))
else:
return ma_empty_start_time
下一步就是计算各个日期的均值
def _get_ma_data(self, ori_data, periods):
ret_data = {}
float_data = [float(x) for x in ori_data]
for period in periods:
data = talib.MA(numpy.array(float_data), timeperiod = period)
data_list = data.tolist()
data_list = self._filter_data(data_list)
ret_data["%d" % period] = data_list
return ret_data
然后将计算结果保存到数组中并保存
def _calc_average_data(self, share_id, table_name):
ma_empty_start_time_int = self._get_ma_empty_start_time(share_id, table_name)
if ma_empty_start_time_int == 0:
return []
start_time_int = self._get_start_time(share_id, table_name, ma_empty_start_time_int)
stock_info = self._get_close_volume(share_id, table_name, start_time_int)
periods = [5, 10, 20, 30, 60, 90, 120, 150, 180]
#periods = [90, 180]
close_data = self._get_ma_data(stock_info["close"], periods)
volume_data = self._get_ma_data(stock_info["volume"], periods)
if len(stock_info["time"]) == len(close_data["180"]) and len(close_data["180"]) == len(volume_data["180"]):
pass
else:
LOG_WARNING("calc %s daily average error" % share_id)
return
infos = []
data_len = len(stock_info["time"])
for index in range(data_len):
info = {}
time_int = stock_info["time"][index]
if time_int < ma_empty_start_time_int:
continue
info["time"] = time_int
for period in periods:
info["close_ma%s" % period] = close_data["%s" % period][index]
info["volume_ma%s" % period] = volume_data["%s" % period][index]
infos.append(info)
return infos
def _filter_data(self, data):
for index in range(len(data)):
if math.isnan(data[index]):
data[index] = 0.01
else:
break
return data
def _save_data(self, share_id, table_name, data):
if len(data) < 2:
return
stock_conn_manager_obj = stock_conn_manager()
conn = stock_conn_manager_obj.get_conn(share_id)
conn.update(table_name, data, ["time"])
之前算的那些均值理论上来说是没什么用的!因为没有除权!这是我在对比我的数据和同花顺的数据之后得出的。于是只能再改改。
基本思路是要计算一个因子,因子=前一日数据中收盘价/今日数据中昨日收盘价。然后把除权日之前的价格都“乘以”该因子得出向后复权的价格,相应的把除权日之前的成交量都“除以”该因子得出向后复权的成交量。这样就会导致整个表进行一次更新(从后向前)。
有意思的是同花顺将成交量也“乘以”该因子,其实这个算法是错误的。举个例子,比如昨日股票收盘10元,成交量100股,则成交金额是1000元。今天除权,于是拉取数据中昨日的收盘价是5元。这样相当于单股价值缩水一半。那么因子是5/10=0.5。那么向后复权计算,昨日的股票收盘价是10*0,5=5元。成交量应该是100/0,5=200股。这样昨日的成交金额是5*200=1000。但是同花顺的算法昨日成交量100*0.5=50股,这明显是错误的。
最后贴上向后复权的算法
def _dividend_ori_data(self, share_id, from_table, to_table, start_time, compare = ">", yesterday_close = 0):
ori_data = self._get_daily_info(share_id, from_table, start_time, compare)
if 0 == len(ori_data):
return 0
if ori_data[0][6] == yesterday_close:
return 0
ex_dividend_ori = []
pre_div_value = 1
for item in ori_data:
if 0 == yesterday_close:
ex_dividend_ori.append(item)
yesterday_close = item[6]
continue
if len(ex_dividend_ori) > 0:
yesterday_close = ex_dividend_ori[-1][6]
ori_close = item[2]
if ori_close == 0 or yesterday_close == 0:
div_value = pre_div_value
else:
if yesterday_close == ori_close:
ex_dividend_ori.append(item)
continue
div_value = yesterday_close/ori_close
pre_div_value = div_value
ex_dividend_ori.append([item[0], item[1], item[2] * div_value, item[3] * div_value,item[4] * div_value,item[5] * div_value,item[6] * div_value,item[7],item[8],item[9] / div_value,item[10]])
stock_conn_manager_obj = stock_conn_manager()
conn = stock_conn_manager_obj.get_conn(share_id)
if from_table != to_table:
conn.insert_data(to_table, self._table_keys, ex_dividend_ori)
else:
for info_value in ex_dividend_ori:
infos = {}
for index in range(len(self._table_keys)):
infos[self._table_keys[index]] = info_value[index]
conn.insert_onduplicate(to_table, infos, ["time"])
conn.insert_onduplicate(to_table, {"close_ma5":0, "time":ex_dividend_ori[-1][0]}, ["time"])
last_yesterday_close = ex_dividend_ori[-1][6]
return last_yesterday_close