相信很多买过股票的同学应该都听过PEG估值选股法,这个策略是美国的传奇基金经理彼得林奇极力推广的。
我们先来介绍下彼得林奇,在1977年至1990年的13年间,他在富达公司掌管的麦哲伦基金规模由2000万美元增长到140亿美元,13年的平均复利报酬率达29%,(如果只是比较从业期间的收益率,他甚至超过了巴菲特老爷子)在事业的巅峰时刻,选择退休。跟乔布斯一样,他们的传奇故事都在最精彩的地方停留。
彼得林奇年轻的时候去高尔夫球场当球童,因为他觉得那里是富人聚集的地方,他想更加深入的了解富人,挤进富人的圈子,而他能够进入富达公司实习也是因为当球童时结识了富达公司的经理。这一点被很多人都效仿,李嘉诚的二公子李泽楷年轻时也当过高尔夫球场的球童。
彼得林奇买过15000多只股票,这是成了很多人调侃的话题,美国一个节目上甚至有个题目是:彼得林奇没买过哪些股票?
扯的有点远了,我们再回到PEG上面来,彼得林奇的论断是:任何一家公司股票如果定价合理的话,市盈率就会与收益增长率相等。这句话也就是大名鼎鼎的PEG估值法了,那PEG到底是什么意思呢? 这里再补充一下巴菲特老爷子的观点,巴老爷子认为一个公司股票每年的长期收益率等于该公司净资产收益率ROE的倒数,所以到底增长率跟哪个因子的关系更大一些,相信每个人心里都会有自己的判断。这篇文章中,我们就认为彼得林奇说的是对的,并用PEG策略做回测。
首先,介绍几个基本概念,相信股民们已经是耳熟能详了。
简单一点说,就是净利润除以发行的股票数量,即平均一股能赚多少钱。
EPS的算法有多种,也就导致了PE也有多种:
这里,我们取滚动市盈率,更加准确一些。因为静态市盈率是按去年的EPS算的,时间差的可能有点多,而动态市盈率是预测的值,未来的事情,谁能说的清楚?
收益增长率的计算公式也有多种,因为增长率这东西嘛,既可以计算净利润增长率,也可以计算EPS的增长率。
因此G可以写成
也可以写成
介绍完上面几个概念,我们就可以引入PEG的公式了
下面我们举个简单的例子来说一下上面几个指标,方便小白用户理解,如果是股市老手可以略过此段。 话说小明有一家上市公司,主营卖包子,去年的净利润是1个亿,总共发行了1000万股,目前的股票价钱是50/股,那么EPS=1亿/1000万=10,PE=50/10=5;由于公司旁新开了写字楼,来吃包子的人突然增多,净利润变成了1.2个亿,股价也涨到了60,那今年的EPS=1.2亿/1000万=12,PE=60/12=5。增长率G=(1.2-1)/1=20%,PEG=5/(0.2*100)=0.25。
PEG越高,表示PE越大或者G越小,说明该公司的股价被高估,或者说明该公司业绩的增长太慢,不建议购买;相反,如果PEG越小,表示PE越小或者G越大,说明公司股价被低估,或者说明公司增益较快,可以考虑买入。
PEG估值法的适用范围
几乎每种策略都有它的适用范围,PEG估值法不适用下面两种情况:
原因也很简单,周期性行业公司赢利了未必是公司本身发展的好,而是行业周期来了,此时的增长率可能很高,但不可持续;第二种和第三种就更明显了,吃了上顿没下顿的公司就更不适合了。周期性行业很容易判断,比如煤炭、钢铁;但第二种和第三种就需要从财报中找了。像我们上面举的例子,业绩增长了20%是由于旁边新开了一家写字楼,买包子的人变多了,这种就是不可持续的。
下表列出了PEG范围和股票估值的关系。
PEG | 股票估值 |
---|---|
0~0.5 | 相对低估 |
0.5~1 | 相对合理 |
1~2 | 相对高估 |
>2 | 高风险 |
用PEG策略进行回测,思路如下:
的回测结果如下:
可以看出,参数合适的话,PEG策略的收益是要远大于市场平均收益的,如果我们还可以判断市场上的明显高位(比如2007年的6000点,2015年的5000点),在相对高位离场,那我们的收益还会更高。毕竟A股这种只能靠做多赚钱的熊市里,再好的策略都是无效的。
PEG的主要难点不在PE,而在于G,它的核心还是我们要发现有潜力的行业或者公司,这就要通过其他手段来辅助我们判断了;这些我们都会在后面的文章中讲到。
为了避免给某个量化平台做广告的嫌疑,我们这里给出jointquant和uqer两家平台上的回测程序和结果。
jointquant:(注:该程序为joinquant官网上量化课堂中的示例程序)
import pandas as pd
'''
================================================================================
总体回测前
================================================================================
'''
#总体回测前要做的事情
def initialize(context):
set_params() # 设置策略常量
set_variables() # 设置中间变量
set_backtest() # 设置回测条件
#1
#设置策略参数
def set_params():
g.tc = 15 # 调仓天数
g.num_stocks = 10 # 每次调仓选取的最大股票数量
#2
#设置中间变量
def set_variables():
g.t = 0 # 记录回测运行的天数
g.if_trade = False # 当天是否交易
#3
#设置回测条件
def set_backtest():
set_option('use_real_price',True) # 用真实价格交易
log.set_level('order','error') # 设置报错等级
'''
================================================================================
每天开盘前
================================================================================
'''
#每天开盘前要做的事情
def before_trading_start(context):
if g.t%g.tc==0:
g.if_trade=True # 每g.tc天,调仓一次
set_slip_fee(context) # 设置手续费与手续费
g.stocks=get_index_stocks('000300.XSHG') # 设置沪深300为初始股票池
# 设置可行股票池
g.feasible_stocks = set_feasible_stocks(g.stocks,context)
g.t+=1
#4
# 设置可行股票池:过滤掉当日停牌的股票
# 输入:initial_stocks为list类型,表示初始股票池; context(见API)
# 输出:unsuspened_stocks为list类型,表示当日未停牌的股票池,即:可行股票池
def set_feasible_stocks(initial_stocks,context):
# 判断初始股票池的股票是否停牌,返回list
paused_info = []
current_data = get_current_data()
for i in initial_stocks:
paused_info.append(current_data[i].paused)
df_paused_info = pd.DataFrame({'paused_info':paused_info},index = initial_stocks)
unsuspened_stocks =list(df_paused_info.index[df_paused_info.paused_info == False])
return unsuspened_stocks
#5
# 根据不同的时间段设置滑点与手续费
# 输入:context(见API)
# 输出:none
def set_slip_fee(context):
# 将滑点设置为0
set_slippage(FixedSlippage(0))
# 根据不同的时间段设置手续费
dt=context.current_dt
if dt>datetime.datetime(2013,1, 1):
set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
elif dt>datetime.datetime(2011,1, 1):
set_commission(PerTrade(buy_cost=0.001, sell_cost=0.002, min_cost=5))
elif dt>datetime.datetime(2009,1, 1):
set_commission(PerTrade(buy_cost=0.002, sell_cost=0.003, min_cost=5))
else:
set_commission(PerTrade(buy_cost=0.003, sell_cost=0.004, min_cost=5))
'''
================================================================================
每天交易时
================================================================================
'''
# 每天回测时做的事情
def handle_data(context,data):
if g.if_trade == True:
# 待买入的g.num_stocks支股票,list类型
list_to_buy = stocks_to_buy(context)
# 待卖出的股票,list类型
list_to_sell = stocks_to_sell(context, list_to_buy)
# 卖出操作
sell_operation(list_to_sell)
# 买入操作
buy_operation(context, list_to_buy)
g.if_trade = False
#6
# 计算股票的PEG值
# 输入:context(见API);stock_list为list类型,表示股票池
# 输出:df_PEG为dataframe: index为股票代码,data为相应的PEG值
def get_PEG(context, stock_list):
# 查询股票池里股票的市盈率,收益增长率
q_PE_G = query(valuation.code, valuation.pe_ratio, indicator.inc_net_profit_year_on_year
).filter(valuation.code.in_(stock_list))
# 得到一个dataframe:包含股票代码、市盈率PE、收益增长率G
# 默认date = context.current_dt的前一天,使用默认值,避免未来函数,不建议修改
df_PE_G = get_fundamentals(q_PE_G)
# 筛选出成长股:删除市盈率或收益增长率为负值的股票
df_Growth_PE_G = df_PE_G[(df_PE_G.pe_ratio >0)&(df_PE_G.pe_ratio <80) \
&(df_PE_G.inc_net_profit_year_on_year >0)&(df_PE_G.inc_net_profit_year_on_year <200) ]
# 去除PE或G值为非数字的股票所在行
df_Growth_PE_G.dropna()
# 得到一个Series:存放股票的市盈率TTM,即PE值
Series_PE = df_Growth_PE_G.ix[:,'pe_ratio']
# 得到一个Series:存放股票的收益增长率,即G值
Series_G = df_Growth_PE_G.ix[:,'inc_net_profit_year_on_year']
# 得到一个Series:存放股票的PEG值
Series_PEG = Series_PE/Series_G
# 将股票与其PEG值对应
Series_PEG.index = df_Growth_PE_G.ix[:,0]
# 将Series类型转换成dataframe类型
df_PEG = pd.DataFrame(Series_PEG)
return df_PEG
#7
# 获得买入信号
# 输入:context(见API)
# 输出:list_to_buy为list类型,表示待买入的g.num_stocks支股票
def stocks_to_buy(context):
list_to_buy = []
# 得到一个dataframe:index为股票代码,data为相应的PEG值
df_PEG = get_PEG(context, g.feasible_stocks)
# 将股票按PEG升序排列,返回daraframe类型
df_sort_PEG = df_PEG.sort(columns=[0], ascending=[1])
# 将存储有序股票代码index转换成list并取前g.num_stocks个为待买入的股票,返回list
for i in range(g.num_stocks):
if df_sort_PEG.ix[i,0] < 0.5:
list_to_buy.append(df_sort_PEG.index[i])
return list_to_buy
#8
# 获得卖出信号
# 输入:context(见API文档), list_to_buy为list类型,代表待买入的股票
# 输出:list_to_sell为list类型,表示待卖出的股票
def stocks_to_sell(context, list_to_buy):
list_to_sell=[]
# 对于不需要持仓的股票,全仓卖出
for stock_sell in context.portfolio.positions:
if stock_sell not in list_to_buy:
list_to_sell.append(stock_sell)
return list_to_sell
#9
# 执行卖出操作
# 输入:list_to_sell为list类型,表示待卖出的股票
# 输出:none
def sell_operation(list_to_sell):
for stock_sell in list_to_sell:
order_target_value(stock_sell, 0)
#10
# 执行买入操作
# 输入:context(见API);list_to_buy为list类型,表示待买入的股票
# 输出:none
def buy_operation(context, list_to_buy):
for stock_sell in list_to_buy:
# 为每个持仓股票分配资金
g.capital_unit=context.portfolio.portfolio_value/len(list_to_buy)
# 买入在“待买股票列表”的股票
for stock_buy in list_to_buy:
order_target_value(stock_buy, g.capital_unit)
'''
uqer:
uqer中回测的结果跟joinquant中回测的不同,因为下面的程序中没有添加可行股票池,即每日未停牌股票的筛选,而且每次投入的比例也不太相同,大家可以观察这几个参数带来的回测差异。
# uqer的数据中没有收益增长率,即G,因此用净利润增长率来代替
import pandas as pd
import numpy as np
import datetime
# system parameters
start = '2012-01-06' # 回测起始时间
end = '2018-01-06' # 回测结束时间
universe = DynamicUniverse('HS300').apply_filter(Factor.PE.nsmall(100)) # 证券池,支持股票、基金、期货、指数四种资产
benchmark = 'HS300' # 策略参考标准
freq = 'd' # 策略类型,'d'表示日间策略使用日线回测,'m'表示日内策略使用分钟线回测
refresh_rate = 15 # 调仓频率,表示执行handle_data的时间间隔,若freq = 'd'时间间隔的单位为交易日,若freq = 'm'时间间隔为分钟
# my parameters
num_stocks = 10 # 每次调仓的最大股票数量
capital_unit = 1.0/num_stocks
# 配置账户信息,支持多资产多账户
accounts = {
'stock_account': AccountConfig(account_type='security',
capital_base=10000000,
slippage = Slippage(value=0.001, unit='perValue'), # 滑点设置成百分比滑点0.001
commission = Commission(buycost = 0.0003, sellcost = 0.002, unit = 'perValue') #手续费
)
}
def initialize(context):
print 'initialize...'
# 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
def handle_data(context):
print 'handle_data...'
print context.current_date
cur_date = context.current_date
stock_account = context.get_account('stock_account')
current_universe = context.get_universe('stock', exclude_halt=True) #可行股票池
list_to_buy = stocks_to_buy(context,current_universe, cur_date)
# print list_to_buy
# list_to_sell = stocks_to_sell(context, stock_account, list_to_buy)
# sell_operation(list_to_buy,stock_account)
# buy_operation(context,list_to_buy,stock_account)
current_position = stock_account.get_positions(exclude_halt=True)
for stock in set(current_position).difference(list_to_buy):
stock_account.order_to(stock, 0)
for stock_buy in list_to_buy:
print stock_buy
stock_account.order(stock_buy, 10000)#(stock_buy,capital_unit)
def get_PEG(context, current_universe, cur_date):
# 获取PE和G的数据
df_PE_G = DataAPI.MktStockFactorsOneDayGet(tradeDate=cur_date,secID=current_universe,ticker='',field=['secID','PE','NetProfitGrowRate'],pandas="1")
df_Growth_PE_G = df_PE_G[(df_PE_G['PE']>0) & (df_PE_G['NetProfitGrowRate']>0)]
# print df_Growth_PE_G
df_Growth_PE_G.dropna()
Serial_PE = df_Growth_PE_G.loc[:,'PE']
Serial_G = df_Growth_PE_G.loc[:,'NetProfitGrowRate']
Serial_PEG = Serial_PE/(100 * Serial_G)
# print Serial_PEG
Serial_PEG.index = df_Growth_PE_G.iloc[:,0]
df_PEG = pd.DataFrame(Serial_PEG)
# print('get PEG done')
return df_PEG
def stocks_to_buy(context,current_universe, cur_data):
list_to_buy = []
# print current_universe
# print cur_data
df_PEG = get_PEG(context,current_universe, cur_data)
# print df_PEG
df_sort_PEG = df_PEG.sort(columns = [0], ascending = [1]) #升序
# print df_sort_PEG
# 选出num_stocks个股票
for i in range(num_stocks):
if df_sort_PEG.iloc[i,0] < 0.5: # PEG--0.5:
list_to_buy.append(df_sort_PEG.index[i])
return list_to_buy
def stocks_to_sell(context, stock_account, list_to_buy):
list_to_sell = []
# 若不在要买的股票池里即为要卖出的股票
for stock_sell in stock_account.get_positions().keys():
if stock_sell not in list_to_buy:
list_to_sell.append(stock_sell)
return list_to_sell
def sell_operation(list_to_buy,stock_account):
for stock in set(stock_account.get_positions(exclude_halt=True)).difference(set(list_to_buy)):
stock_account.order_to(stock, 0)
def buy_operation(context,list_to_buy,stock_account):
for stock_buy in list_to_buy:
print stock_buy
# stock_account.order_pct(stock_buy,capital_unit)
stock_account.order(stock_buy, 10000)