原文:
www.backtrader.com/blog/2019-05-20-momentum-strategy/momentum-strategy/
在另一篇很棒的文章中,Teddy Koker 再次展示了 算法交易 策略的发展路径:
pandas
进行研究
backtrader
进行回测
真棒!!!
文章可以在以下位置找到:
Teddy Koker 给我留了言,询问我是否可以评论 backtrader 的使用。我的观点可以在下面看到。这仅仅是我的个人意见,因为作为 backtrader 的作者,我对如何最好地使用该平台有偏见。
我个人对某些结构如何表述的偏好,不必与其他人使用平台的偏好相匹配。
注意
实际上,让平台能够插入几乎任何内容,并以不同的方式执行相同的操作,是一个有意识的决定,让人们按照他们认为合适的方式使用它(在平台旨在实现的约束条件、语言可能性和我所做的失败设计决定的范围内)。
在这里,我们只关注可以以不同方式完成的事情。是否 “不同” 更好总是一个看法问题。而 backtrader 的作者并不总是必须在使用 backtrader 进行开发时始终正确(因为实际的开发必须适合开发者,而不是 backtrader 的作者)
dict
vs tuple of tuples
backtrader
提供的许多示例,以及文档和/或博客中提供的示例,都使用 tuple of tuples
模式进行参数设置。例如,来自代码的示例:
class Momentum(bt.Indicator):
lines = ('trend',)
params = (('period', 90),)
在这种范例中,一直有机会使用 dict
。
class Momentum(bt.Indicator):
lines = ('trend',)
params = dict(period=90) # or params = {'period': 90}
随着时间的推移,这种方式变得更易于使用,并成为作者首选的模式。
注意
作者更喜欢 dict(period=90)
,因为它更易于输入,不需要引号。但是,许多其他人更喜欢花括号表示法,{'period': 90}
。
dict
和 tuple
方法之间的根本区别:
tuple of tuples
参数保留了声明顺序,这在枚举参数时可能很重要。
提示
在 Python 3.7
(以及 3.6
,如果使用 CPython,即使这是一个实现细节)中,默认排序字典使声明顺序不会成为问题。
在下面作者修改的示例中,将使用 dict
表示法。
Momentum
指标在文章中,这就是指标的定义方式
class Momentum(bt.Indicator):
lines = ('trend',)
params = (('period', 90),)
def __init__(self):
self.addminperiod(self.params.period)
def next(self):
returns = np.log(self.data.get(size=self.p.period))
x = np.arange(len(returns))
slope, _, rvalue, _, _ = linregress(x, returns)
annualized = (1 + slope) ** 252
self.lines.trend[0] = annualized * (rvalue ** 2)
使用力量,即使用已经存在的东西,比如 PeriodN
指标,它:
period
参数,并知道如何将其传递给系统因此,这可能更好
class Momentum(bt.ind.PeriodN):
lines = ('trend',)
params = dict(period=50)
def next(self):
...
我们已经跳过了为了使用 addminperiod
而定义 __init__
的需要,这只应在特殊情况下使用。
继续进行,backtrader 定义了一个 OperationN
指标,它必须具有定义的属性 func
,该属性将作为参数传递 period
个 bars,并将返回值放入定义的线中。
有了这个想法,一个人可以将以下内容想象成潜在的代码
def momentum_func(the_array):
r = np.log(the_array)
slope, _, rvalue, _, _ = linregress(np.arange(len(r)), r)
annualized = (1 + slope) ** 252
return annualized * (rvalue ** 2)
class Momentum(bt.ind.OperationN):
lines = ('trend',)
params = dict(period=50)
func = momentum_func
这意味着我们已经将指标的复杂性移出了指标。我们甚至可以从外部库导入 momentum_func
,如果底层函数发生变化,指标就不需要进行任何更改以反映新的行为。作为额外的奖励,我们拥有纯粹的声明性指标。没有 __init__
,没有 addminperiod
,也没有 next
。
让我们看看 __init__
部分。
class Strategy(bt.Strategy):
def __init__(self):
self.i = 0
self.inds = {}
self.spy = self.datas[0]
self.stocks = self.datas[1:]
self.spy_sma200 = bt.indicators.SimpleMovingAverage(self.spy.close,
period=200)
for d in self.stocks:
self.inds[d] = {}
self.inds[d]["momentum"] = Momentum(d.close,
period=90)
self.inds[d]["sma100"] = bt.indicators.SimpleMovingAverage(d.close,
period=100)
self.inds[d]["atr20"] = bt.indicators.ATR(d,
period=20)
关于风格的一些事情:
close
。通用地传递数据源,它将使用 close。这可能看起来不相关,但在尝试在各处保持代码的通用性时(比如在指标中),这确实有所帮助。
一个人应该考虑的第一件事:如果可能的话,将一切都保持为参数。因此
class Strategy(bt.Strategy):
params = dict(
momentum=Momentum, # parametrize the momentum and its period
momentum_period=90,
movav=bt.ind.SMA, # parametrize the moving average and its periods
idx_period=200,
stock_period=100,
volatr=bt.ind.ATR, # parametrize the volatility and its period
vol_period=20,
)
def __init__(self):
# self.i = 0 # See below as to why the counter is commented out
self.inds = collections.defaultdict(dict) # avoid per data dct in for
# Use "self.data0" (or self.data) in the script to make the naming not
# fixed on this being a "spy" strategy. Keep things generic
# self.spy = self.datas[0]
self.stocks = self.datas[1:]
# Again ... remove the name "spy"
self.idx_mav = self.p.movav(self.data0, period=self.p.idx_period)
for d in self.stocks:
self.inds[d]['mom'] = self.p.momentum(d, period=self.momentum_period)
self.inds[d]['mav'] = self.p.movav(d, period=self.p.stock_period)
self.inds[d]['vol'] = self.p.volatr(d, period=self.p.vol_period)
通过使用 params
并更改几个命名约定,我们使 __init__
(以及策略)完全可定制且通用(任何地方都没有 spy
的引用)
next
及其 len
backtrader 尽可能使用 Python 范式。它肯定有时会失败,但它会尝试。
让我们看看 next
发生了什么
def next(self):
if self.i % 5 == 0:
self.rebalance_portfolio()
if self.i % 10 == 0:
self.rebalance_positions()
self.i += 1
Python 的 len
范式正是所需之处。让我们来使用它。
def next(self):
l = len(self)
if l % 5 == 0:
self.rebalance_portfolio()
if l % 10 == 0:
self.rebalance_positions()
正如你所见,没有必要保留 self.i
计数器。策略和大多数对象的长度由系统一直提供、计算和更新。
next
和 prenext
代码包含了这种转发
def prenext(self):
# call next() even when data is not available for all tickers
self.next()
在进入 next
时没有保障
def next(self):
if self.i % 5 == 0:
self.rebalance_portfolio()
...
好吧,我们知道正在使用一个无幸存者偏差的数据集,但一般来说,不保护 prenext => next
转发不是一个好主意。
next
。一个 100-bar
移动平均线显然只有在从数据源获取了 100 个数据点时才会提供数据。
这意味着在进入 next
时,数据源将有 100 个数据点
可供检查,而移动平均值只有 1 个数据点
prenext
作为钩子,让开发者在上述保证能够实现之前访问事物。例如,当有多个数据源并且它们的开始日期不同时,这是有用的。开发者可能希望在满足所有数据源(和相关指标)的所有保证之前进行一些检查或操作,并且在第一次调用 next
之前。
在一般情况下,prenext => next
转发应该有这样的保护措施:
def prenext(self):
# call next() even when data is not available for all tickers
self.next()
def next(self):
d_with_len = [d for d in self.datas if len(d)]
...
这意味着只有来自self.datas
的子集d_with_len
才能得到保证使用。
注意
对于指标也必须使用类似的保护措施。
因为对于策略的整个生命周期来说这样做似乎是毫无意义的,可以进行如此优化
def __init__(self):
...
self.d_with_len = []
def prenext(self):
# Populate d_with_len
self.d_with_len = [d for d in self.datas if len(d)]
# call next() even when data is not available for all tickers
self.next()
def nextstart(self):
# This is called exactly ONCE, when next is 1st called and defaults to
# call `next`
self.d_with_len = self.datas # all data sets fulfill the guarantees now
self.next() # delegate the work to next
def next(self):
# we can now always work with self.d_with_len with no calculation
...
保护计算已移至prenext
,当保证满足时将停止调用它。然后将调用nextstart
,通过重写它,我们可以重置数据集的list
,以便与之一起工作,即:self.datas
并且通过这样做,所有保护措施都已从next
中删除。
next
虽然作者的意图是每 5/10 天重新平衡(投资组合/头寸),但这可能意味着每周/两周重新平衡。
len(self) % period
方法在以下情况下会失败:
为了克服这一点,可以使用backtrader中的内置功能。
使用它们将确保在应该发生时进行重新平衡。让我们假设意图是在星期五重新平衡
让我们在我们的策略中为params
和__init__
增加一点魔法
class Strategy(bt.Strategy):
params = dict(
...
rebal_weekday=5, # rebalance 5 is Friday
)
def __init__(self):
...
self.add_timer(
when=bt.Timer.SESSION_START,
weekdays=[self.p.rebal_weekday],
weekcarry=True, # if a day isn't there, execute on the next
)
...
现在我们已经准备好知道今天是星期五了。即使星期五恰好是交易日,添加weekcarry=True
也确保我们会在星期一收到通知(或者如果星期一也是假日则为星期二,等等)
定时器的通知在notify_timer
中进行
def notify_timer(self, timer, when, *args, **kwargs):
self.rebalance_portfolio()
因为原始代码中还有每10
个条形图进行一次rebalance_positions
,所以可以:
allow=callable
参数
注意
定时器甚至可以更好地用于实现模式,比如:
rebalance_positions
仅在每个月的第 4 个星期五进行。
其他一些事情可能纯粹是个人喜好的问题。
个人喜好 1
始终使用预先构建的比较而不是在next
期间比较事物。例如来自代码(多次使用)
if self.spy < self.spy_sma200:
return
我们可以做以下事情。首先在__init__
期间
def __init__(self):
...
self.spy_filter = self.spe < self.spy_sma200
以后
if self.spy_filter:
return
考虑到这一点,如果我们想要改变spy_filter
条件,我们只需在__init__
中执行一次,而不是在代码中的多个位置执行。
同样的情况也可能适用于此处的另一个比较d < self.inds[d]["sma100"]
:
# sell stocks based on criteria
for i, d in enumerate(self.rankings):
if self.getposition(self.data).size:
if i > num_stocks * 0.2 or d < self.inds[d]["sma100"]:
self.close(d)
这也可以在__init__
期间预先构建,并因此更改为如下所示
# sell stocks based on criteria
for i, d in enumerate(self.rankings):
if self.getposition(self.data).size:
if i > num_stocks * 0.2 or self.inds[d]['sma_signal']:
self.close(d)
个人喜好 2
将一切都作为参数。例如,在上面的几行中,我们看到一个0.2
,它在代码的几个部分中都被使用:将其作为参数。同样,还有其他值,如0.001
和100
(实际上已经建议将其作为创建移动平均值的参数)。
将所有东西都作为参数,可以通过只改变策略的实例化而不是策略本身来打包代码并尝试不同的方法。
原文:
www.backtrader.com/blog/posts/2018-04-22-improving-code/improving-code/
每隔一段时间,互联网上会出现带有backtrader代码的样本。在我看来有几个是中文。最新的一个在这里:
标题是:backtrader-学习笔记 2,显然(谢谢谷歌)被翻译为backtrader-学习笔记 2。如果那些是学习笔记,让我们尝试在那里真正可以改进代码的地方进行改进,在我个人看来,那就是backtrader最擅长的地方。
在学习笔记中策略的__init__
方法中,我们发现以下内容
def __init__(self):
...
self.ma1 = bt.indicators.SMA(self.datas[0],
period=self.p.period
)
self.ma2 = bt.indicators.SMA(self.datas[1],
period=self.p.period
)
这里没有什么好争论的(风格是非常个人的事情,我不会触及那方面)
在策略的next
方法中,以下是买入和卖出的逻辑决策。
...
# Not yet ... we MIGHT BUY if ...
if (self.ma1[0]-self.ma1[-1])/self.ma1[-1]>(self.ma2[0]-self.ma2[-1])/self.ma2[-1]:
...
和
...
# Already in the market ... we might sell
if (self.ma1[0]-self.ma1[-1])/self.ma1[-1]<=(self.ma2[0]-self.ma2[-1])/self.ma2[-1]:
...
这两个逻辑块是可以做得更好的,这样也会增加可读性、可维护性和调整性(如果需要的话)
不是将移动平均值的比较(当前点0
和上一个点-1
)后跟一些除法,让我们看看如何让它预先计算。
让我们调整__init__
def __init__(self):
...
# Let's create the moving averages as before
ma1 = bt.ind.SMA(self.data0, period=self.p.period)
ma2 = bt.ind.SMA(self.data1, period=self.p.period)
# Use line delay notation (-x) to get a ref to the -1 point
ma1_pct = ma1 / ma1(-1) - 1.0 # The ma1 percentage part
ma2_pct = ma2 / ma2(-1) - 1.0 # The ma2 percentage part
self.buy_sig = ma1_pct > ma2_pct # buy signal
self.sell_sig = ma1_pct <= ma2_pct # sell signal
现在我们可以将其带到next
方法并执行以下操作:
def next(self):
...
# Not yet ... we MIGHT BUY if ...
if self.buy_sig:
...
...
# Already in the market ... we might sell
if self.sell_sig:
...
注意,我们甚至不必使用self.buy_sig[0]
,因为通过if self.buy_sig
进行的布尔测试已经被backtrader机制翻译成了对[0]
的检查
在我看来,通过在__init__
中使用标准算术和逻辑操作来定义逻辑(并使用行延迟符号(-x)
)使代码变得更好。
无论如何,作为结束语,人们也可以尝试使用内置的PercentChange
指标(又名PctChange
)
正如名称所示,它已经计算了一定周期内的百分比变化。现在__init__
中的代码看起来是这样的
def __init__(self):
...
# Let's create the moving averages as before
ma1 = bt.ind.SMA(self.data0, period=self.p.period)
ma2 = bt.ind.SMA(self.data1, period=self.p.period)
ma1_pct = bt.ind.PctChange(ma1, period=1) # The ma1 percentage part
ma2_pct = bt.ind.PctChange(ma2, period=1) # The ma2 percentage part
self.buy_sig = ma1_pct > ma2_pct # buy signal
self.sell_sig = ma1_pct <= ma2_pct # sell signal
在这种情况下,并没有太大的区别,但如果计算更大更复杂的话,这绝对可以为你省下很多麻烦。
祝愉快的回溯交易!
原文:
www.backtrader.com/blog/posts/2018-02-06-dynamic-indicator/dynamic-indicator/
指标是棘手的东西。不是因为它们在一般情况下很难编码,而是因为名称具有误导性,并且人们对指标有不同的期望。
让我们至少尝试定义在backtrader生态系统内什么是指标。
它是一个定义了至少一个输出线的对象,可能定义影响其行为的参数,并接受一个或多个数据源作为输入。
为了尽可能使指标通用,选择了以下设计原则:
datetime
线负载。这是因为输入可能没有自己的datetime
负载进行同步。并且根据系统范围内的通用datetime
进行同步可能是不正确的,因为指标可能正在使用来自周时间框架的数据,而系统时间可能以秒为单位进行计时,因为这是多个数据源之一的最低分辨率。
0
。否则它将被命名为Study
。Study
将寻找模式并在过去写入输出值。
例如请参阅Backtrader 社区 - ZigZag
一旦(在backtrader生态系统中)定义清晰,让我们尝试看看如何实际编写一个动态指标。似乎我们不能,因为从上述的设计原则来看,指标的操作过程多多少少是……不可变的。
通常启动的一个指标是Highest
(别名为MaxN
),以获得给定周期内的最高某物。如
import backtrader as bt
class MyStrategy(bt.Strategy)
def __init__(self):
self.the_highest_high_15 = bt.ind.Highest(self.data.high, period=15)
def next(self):
if self.the_highest_high_15 > X:
print('ABOUT TO DO SOMETHING')
在此代码片段中,我们实例化Highest
以跟踪最近 15 个周期内的最高高点。如果最高高点大于X
,将执行某些操作。
这里的关键是:
15
有时,我们需要指标是动态的,并且根据实时条件改变其行为。例如,请参阅 backtrader 社区中的这个问题:自开仓以来的最高高点
当然,我们不知道何时会开/平仓,并且将 period
设置为固定值如 15
是没有意义的。让我们看看我们如何做,将所有东西打包到一个指标中
我们首先将使用我们将在指标生命周期中更改的参数,通过它实现动态性。
import backtrader as bt
class DynamicHighest(bt.Indicator):
lines = ('dyn_highest',)
params = dict(tradeopen=False)
def next(self):
if self.p.tradeopen:
self.lines.dyn_highest[0] = max(self.data[0], self.dyn_highest[-1])
class MyStrategy(bt.Strategy)
def __init__(self):
self.dyn_highest = DynamicHighest(self.data.high)
def notify_trade(self, trade):
self.dyn_highest.p.tradeopen = trade.isopen
def next(self):
if self.dyn_highest > X:
print('ABOUT TO DO SOMETHING')
让我们来吧!我们拥有它,到目前为止我们还没有违反为我们的指标制定的规则。让我们看看指标
dyn_highest
的输出 line
tradeopen=False
Indicator
的子类)
next
,它将始终返回相同的值
唯一的事情:
我们在 notify_trade
中使用这个来影响我们的 DynamicHighest
trade
的值 isopen
作为一个标志,以知道我们是否需要记录输入数据的最高点
trade
关闭时,isopen
的值将为 False
,我们将停止记录最高值
供参考:Backtrader Documentation Trade
简单!!!
有些人会反对修改在指标声明中的 param
,并且应该只在实例化期间设置。
好的,让我们使用一个方法。
import backtrader as bt
class DynamicHighest(bt.Indicator):
lines = ('dyn_highest',)
def __init__(self):
self._tradeopen = False
def tradeopen(self, yesno):
self._tradeopen = yesno
def next(self):
if self._tradeopen:
self.lines.dyn_highest[0] = max(self.data[0], self.dyn_highest[-1])
class MyStrategy(bt.Strategy)
def __init__(self):
self.dyn_highest = DynamicHighest(self.data.high)
def notify_trade(self, trade):
self.dyn_highest.tradeopen(trade.isopen)
def next(self):
if self.dyn_highest > X:
print('ABOUT TO DO SOMETHING')
并没有太大的区别,但现在指标有了一些额外的样板代码,包括 __init__
和方法 tradeopen(self, yesno)
。但是我们的 DynamicHighest
的动态性是相同的。
让我们恢复 params
并使指标成为可以应用不同函数而不仅仅是 max
的指标
import backtrader as bt
class DynamicFn(bt.Indicator):
lines = ('dyn_highest',)
params = dict(fn=None)
def __init__(self):
self._tradeopen = False
# Safeguard for not set function
self._fn = self.p.fn or lambda x, y: x
def tradeopen(self, yesno):
self._tradeopen = yesno
def next(self):
if self._tradeopen:
self.lines.dyn_highest[0] = self._fn(self.data[0], self.dyn_highest[-1])
class MyStrategy(bt.Strategy)
def __init__(self):
self.dyn_highest = DynamicHighest(self.data.high, fn=max)
def notify_trade(self, trade):
self.dyn_highest.tradeopen(trade.isopen)
def next(self):
if self.dyn_highest > X:
print('ABOUT TO DO SOMETHING')
说了做到!我们已经添加了:
params=dict(fn=None)
收集最终用户想要使用的函数
如果用户没有传递特定函数,则使用占位符函数的保护措施:
# Safeguard for not set function
self._fn = self.p.fn or lambda x, y: x`
并且我们使用函数(或占位符)进行计算:
self.lines.dyn_highest[0] = self._fn(self.data[0], self.dyn_highest[-1])`
在我们(现在命名为)DynamicFn
指标的调用中声明我们想要使用的函数… max
(这里没有惊喜):
self.dyn_highest = DynamicHighest(self.data.high, fn=max)`
今天剩下的不多了…享受它!!!
原文:
www.backtrader.com/blog/posts/2018-02-01-stop-trading/stop-trading/
交易可能是危险的,使用止损订单可以帮助您避免巨额损失或确保利润。backtrader为您提供了几种实现基于Stop的策略的机制
将使用经典的Fast EMA
穿过Slow EMA
的方法。但:
buy
订单
sell
将通过Stop
进行
因此,策略将从这个简单的框架开始
class BaseStrategy(bt.Strategy):
params = dict(
fast_ma=10,
slow_ma=20,
)
def __init__(self):
# omitting a data implies self.datas[0] (aka self.data and self.data0)
fast_ma = bt.ind.EMA(period=self.p.fast_ma)
slow_ma = bt.ind.EMA(period=self.p.slow_ma)
# our entry point
self.crossup = bt.ind.CrossUp(fast_ma, slow_ma)
并且使用继承,我们将解决如何实现Stops的不同方法
为了避免太多的方法,我们基本策略的这个子类将允许:
Stop
StopTrail
,它随着价格的变动而移动(在这种情况下使用点)
class ManualStopOrStopTrail(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
)
def notify_order(self, order):
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
if not self.p.trail:
stop_price = order.executed.price * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price)
else:
self.sell(exectype=bt.Order.StopTrail, trailamount=self.p.trail)
def next(self):
if not self.position and self.crossup > 0:
# not in the market and signal triggered
self.buy()
正如您所见,我们已添加了参数:
stop_loss=0.02
(2%)
trail=False
,设置为数值时,策略将使用StopTrail
有关订单的文档,请参见:
让我们使用固定的Stop
执行我们的脚本:
$ ./stop-loss-approaches.py manual --plot
BUY @price: 3073.40
SELL@price: 3009.93
BUY @price: 3034.88
以及图表
正如我们所见:
buy
buy
通知为Completed
时,我们会以执行价格下stop_loss
百分比发出Stop
订单。
结果:
stop_loss
百分比以下
让我们使用相同的方法,但应用StopTrail
订单:
$ ./stop-loss-approaches.py manual --plot --strat trail=20
BUY @price: 3073.40
SELL@price: 3070.72
BUY @price: 3034.88
SELL@price: 3076.54
BUY @price: 3349.72
SELL@price: 3339.65
BUY @price: 3364.26
SELL@price: 3393.96
BUY @price: 3684.38
SELL@price: 3708.25
BUY @price: 3884.57
SELL@price: 3867.00
BUY @price: 3664.59
SELL@price: 3650.75
BUY @price: 3635.17
SELL@price: 3661.55
BUY @price: 4100.49
SELL@price: 4120.66
以及图表
现在我们看到,与之前的方法相比,这不是那么有效。
20
个点(我们的trail值)
notify_order
?因为这确保了必须由Stop
控制的订单实际已被执行。在进行实时交易时,这可能不是一个大问题,但在回测时是。
让我们简化回测方法,使用 backtrader 提供的cheat-on-close
模式。
class ManualStopOrStopTrailCheat(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
)
def __init__(self):
super().__init__()
self.broker.set_coc(True)
def notify_order(self, order):
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
def next(self):
if not self.position and self.crossup > 0:
# not in the market and signal triggered
self.buy()
if not self.p.trail:
stop_price = self.data.close[0] * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price)
else:
self.sell(exectype=bt.Order.StopTrail,
trailamount=self.p.trail)
在这种情况下:
__init__
阶段,经纪人会激活cheat-on-close
模式。
StopOrder
在 buy
订单之后立即发布。这是因为 cheat-on-close
确保它将在不等待下一个柱状图的情况下被执行。
请注意,使用收盘价格(self.data.close[0]
)用于止损,因为尚未有执行价格。我们知道它将是收盘价格,这要归功于 cheat-on-close
notify_order
方法纯粹是一个记录方法,告诉我们何时购买或出售。
使用 StopTrail
进行示例运行:
$ ./stop-loss-approaches.py manualcheat --plot --strat trail=20
BUY @price: 3076.23
SELL@price: 3070.72
BUY @price: 3036.30
SELL@price: 3076.54
BUY @price: 3349.46
SELL@price: 3339.65
BUY @price: 3362.83
SELL@price: 3393.96
SELL@price: 3685.48
SELL@price: 3665.48
SELL@price: 3888.46
SELL@price: 3868.46
BUY @price: 3662.92
SELL@price: 3650.75
BUY @price: 3631.50
SELL@price: 3661.55
BUY @price: 4094.33
SELL@price: 4120.66
和图表
注意:
cheat-on-close
给策略提供了收盘价格(这是不现实的,但可以是一个很好的近似值),而不是下一个可用价格(这是下一个开盘价)
如果可以将订单的逻辑保持在 next
中,并且不必使用 cheat-on-close
,那将是完美的。这是可以实现的!!!
让我们使用
注意
这是 Bracket Order
功能的一部分。
class AutoStopOrStopTrail(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
buy_limit=False,
)
buy_order = None # default value for a potential buy_order
def notify_order(self, order):
if order.status == order.Cancelled:
print('CANCEL@price: {:.2f} {}'.format(
order.executed.price, 'buy' if order.isbuy() else 'sell'))
return
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
def next(self):
if not self.position and self.crossup > 0:
if self.buy_order: # something was pending
self.cancel(self.buy_order)
# not in the market and signal triggered
if not self.p.buy_limit:
self.buy_order = self.buy(transmit=False)
else:
price = self.data.close[0] * (1.0 - self.p.buy_limit)
# transmit = False ... await child order before transmission
self.buy_order = self.buy(price=price, exectype=bt.Order.Limit,
transmit=False)
# Setting parent=buy_order ... sends both together
if not self.p.trail:
stop_price = self.data.close[0] * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price,
parent=self.buy_order)
else:
self.sell(exectype=bt.Order.StopTrail,
trailamount=self.p.trail,
parent=self.buy_order)
这种新策略仍然基于 BaseStrategy
,做了以下工作:
buy
订单作为 Limit
订单发布的可能性
参数 buy_limit
(当不为 False
时)将是一个百分比,以从当前价格中减去来设置预期的购买点。
buy
订单设置 transmit=False
。这意味着订单不会立即传输到经纪人。它将等待来自 子 订单的传输信号
parent=buy_order
在 buy
订单放置之前,没有 Stop
订单执行的风险。
Limit
订单可能永远不会被执行,并且当新信号出现时仍然处于活动状态。在这种情况下,示例将简单地取消待处理的 buy
订单,并继续以当前价格水平发出新订单。
如上所述,这将取消子 Stop
订单。
让我们试图在当前收盘价下购买 0.5%
,并且使用 trail=30
来执行
使用 StopTrail
进行示例运行:
$ ./stop-loss-approaches.py auto --plot --strat trail=30,buy_limit=0.005
BUY @price: 3060.85
SELL@price: 3050.54
CANCEL@price: 0.00 buy
CANCEL@price: 0.00 sell
BUY @price: 3332.71
SELL@price: 3329.65
CANCEL@price: 0.00 buy
CANCEL@price: 0.00 sell
BUY @price: 3667.05
SELL@price: 3698.25
BUY @price: 3869.02
SELL@price: 3858.46
BUY @price: 3644.61
SELL@price: 3624.02
CANCEL@price: 0.00 buy
CANCEL@price: 0.00 sell
BUY @price: 4073.86
和图表
日志和图表上的 buy/sell 标志表明,没有没有对应的 buy
订单被执行,并且取消的 buy
订单立即被子 sell
订单的取消所跟随(无需任何手动编码)
已经展示了使用不同方法进行带止损交易。这可以用来避免损失或确保利润。
警告:非常严格的止损订单也可能只是使您的持仓退出市场的效果,如果止损设置在价格正常波动范围内。
$ ./stop-loss-approaches.py --help
usage: stop-loss-approaches.py [-h] [--data0 DATA0] [--fromdate FROMDATE]
[--todate TODATE] [--cerebro kwargs]
[--broker kwargs] [--sizer kwargs]
[--strat kwargs] [--plot [kwargs]]
{manual,manualcheat,auto}
Stop-Loss Approaches
positional arguments:
{manual,manualcheat,auto}
Stop approach to use
optional arguments:
-h, --help show this help message and exit
--data0 DATA0 Data to read in (default:
../../datas/2005-2006-day-001.txt)
--fromdate FROMDATE Date[time] in YYYY-MM-DD[THH:MM:SS] format (default: )
--todate TODATE Date[time] in YYYY-MM-DD[THH:MM:SS] format (default: )
--cerebro kwargs kwargs in key=value format (default: )
--broker kwargs kwargs in key=value format (default: )
--sizer kwargs kwargs in key=value format (default: )
--strat kwargs kwargs in key=value format (default: )
--plot [kwargs] kwargs in key=value format (default: )
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import datetime
import backtrader as bt
class BaseStrategy(bt.Strategy):
params = dict(
fast_ma=10,
slow_ma=20,
)
def __init__(self):
# omitting a data implies self.datas[0] (aka self.data and self.data0)
fast_ma = bt.ind.EMA(period=self.p.fast_ma)
slow_ma = bt.ind.EMA(period=self.p.slow_ma)
# our entry point
self.crossup = bt.ind.CrossUp(fast_ma, slow_ma)
class ManualStopOrStopTrail(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
)
def notify_order(self, order):
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
if not self.p.trail:
stop_price = order.executed.price * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price)
else:
self.sell(exectype=bt.Order.StopTrail, trailamount=self.p.trail)
def next(self):
if not self.position and self.crossup > 0:
# not in the market and signal triggered
self.buy()
class ManualStopOrStopTrailCheat(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
)
def __init__(self):
super().__init__()
self.broker.set_coc(True)
def notify_order(self, order):
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
def next(self):
if not self.position and self.crossup > 0:
# not in the market and signal triggered
self.buy()
if not self.p.trail:
stop_price = self.data.close[0] * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price)
else:
self.sell(exectype=bt.Order.StopTrail,
trailamount=self.p.trail)
class AutoStopOrStopTrail(BaseStrategy):
params = dict(
stop_loss=0.02, # price is 2% less than the entry point
trail=False,
buy_limit=False,
)
buy_order = None # default value for a potential buy_order
def notify_order(self, order):
if order.status == order.Cancelled:
print('CANCEL@price: {:.2f} {}'.format(
order.executed.price, 'buy' if order.isbuy() else 'sell'))
return
if not order.status == order.Completed:
return # discard any other notification
if not self.position: # we left the market
print('SELL@price: {:.2f}'.format(order.executed.price))
return
# We have entered the market
print('BUY @price: {:.2f}'.format(order.executed.price))
def next(self):
if not self.position and self.crossup > 0:
if self.buy_order: # something was pending
self.cancel(self.buy_order)
# not in the market and signal triggered
if not self.p.buy_limit:
self.buy_order = self.buy(transmit=False)
else:
price = self.data.close[0] * (1.0 - self.p.buy_limit)
# transmit = False ... await child order before transmission
self.buy_order = self.buy(price=price, exectype=bt.Order.Limit,
transmit=False)
# Setting parent=buy_order ... sends both together
if not self.p.trail:
stop_price = self.data.close[0] * (1.0 - self.p.stop_loss)
self.sell(exectype=bt.Order.Stop, price=stop_price,
parent=self.buy_order)
else:
self.sell(exectype=bt.Order.StopTrail,
trailamount=self.p.trail,
parent=self.buy_order)
APPROACHES = dict(
manual=ManualStopOrStopTrail,
manualcheat=ManualStopOrStopTrailCheat,
auto=AutoStopOrStopTrail,
)
def runstrat(args=None):
args = parse_args(args)
cerebro = bt.Cerebro()
# Data feed kwargs
kwargs = dict()
# Parse from/to-date
dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S'
for a, d in ((getattr(args, x), x) for x in ['fromdate', 'todate']):
if a:
strpfmt = dtfmt + tmfmt * ('T' in a)
kwargs[d] = datetime.datetime.strptime(a, strpfmt)
data0 = bt.feeds.BacktraderCSVData(dataname=args.data0, **kwargs)
cerebro.adddata(data0)
# Broker
cerebro.broker = bt.brokers.BackBroker(**eval('dict(' + args.broker + ')'))
# Sizer
cerebro.addsizer(bt.sizers.FixedSize, **eval('dict(' + args.sizer + ')'))
# Strategy
StClass = APPROACHES[args.approach]
cerebro.addstrategy(StClass, **eval('dict(' + args.strat + ')'))
# Execute
cerebro.run(**eval('dict(' + args.cerebro + ')'))
if args.plot: # Plot if requested to
cerebro.plot(**eval('dict(' + args.plot + ')'))
def parse_args(pargs=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=(
'Stop-Loss Approaches'
)
)
parser.add_argument('--data0', default='../../datas/2005-2006-day-001.txt',
required=False, help='Data to read in')
# Strategy to choose
parser.add_argument('approach', choices=APPROACHES.keys(),
help='Stop approach to use')
# Defaults for dates
parser.add_argument('--fromdate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--todate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--cerebro', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--broker', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--sizer', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--strat', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--plot', required=False, default='',
nargs='?', const='{}',
metavar='kwargs', help='kwargs in key=value format')
return parser.parse_args(pargs)
if __name__ == '__main__':
runstrat()
原文:
www.backtrader.com/blog/posts/2018-01-27-recursive-indicators/recursive-indicator/
backtrader 的最初目标之一是:
它不必是一个完美的指标,但能够快速轻松地开发它们确实有所帮助。为了确认设计是正确的,backtrader标准武器库中的第一个指标之一是指数移动平均线(又称EMA),根据定义是:递归。
注意
小知识:您可能会想象第一个指标是简单移动平均线
由于在backtrader 社区中发布了如何开发递归指标的问题,让我们快速开发一个ExponentialMovingAverage
指标。
像这样的递归指标
您可以在Wikipedia - 指数移动平均线中看到数学示例
如果您足够勇敢读完所有内容,您会发现期间用于计算指数平滑。我们将使用它。
为了解决计算第一个值的难题,行业决定使用前period
个值的简单平均值。
作为杠杆,我们将使用bt.indicators.PeriodN
,它:
period
参数
period
在此处查看其定义:文档 - 指标参考
然后让我们开发我们的EMA
import backtrader as bt
class EMA(bt.indicators.PeriodN):
params = {'period': 30} # even if defined, we can redefine the default value
lines = ('ema',) # our output line
def __init__(self):
self.alpha = 2.0 / (1.0 + self.p.period) # period -> exp smoothing factor
def nextstart(self): # calculate here the seed value
self.lines.ema[0] = sum(self.data.get(size=self.p.period)) / self.p.period
def next(self):
ema1 = self.lines.ema[-1] # previous EMA value
self.lines.ema[0] = ema1 * (1.0 - self.alpha) + self.data[0] * self.alpha
几乎比说更容易。关键在于在nextstart
中提供种子值,其中
next
相反,它将为系统传递的每个新数据值调用
nextstart
的默认实现只是将工作委托给next
,对于大多数指标(例如简单移动平均线)来说,这是正确的做法。但在这种情况下,重写并提供种子值是关键。
作为一个移动平均线,如果指标绘制在计算平均值的数据的同一轴上会很好。因为我们从PeriodN
继承了绘图的默认值(在文档中查看):
subplot=True
这当然意味着我们的指标将创建一个subplot
(图表上的另一个轴)。这可以很容易地被覆盖。
import backtrader as bt
class EMA(bt.indicators.PeriodN):
plot = dict(subplot=False)
完成。如果您想控制更多绘图选项,请查看文档 - 绘图
祝你好运!
原文:
www.backtrader.com/blog/posts/2017-08-08-dow-10-day-streak/dow-10-day-streak/
这已经成为新闻了。道琼斯工业平均指数创下了历史新高,已经连续上涨了 10 天,创下了 9 次历史新高。例如:
许多人肯定已经注意到道琼斯正在经历这样的连涨,文章只是告诉我们这已成为主流。但是一些问题出现了:
让我们通过启动 backtrader 来回答这些问题,通过制定一个 分析器 来做到这一点:分析情况并回答问题(有关代码,请参见下文)
我们的样本数据包含5923
个交易日。让我们看看当前的连续上涨日在哪里。
执行我们的代码显示,连续10
天这样的日子至少是非凡的,如果不是特别的话。
$ ./updaystreaks.py --data 099I-DJI --upstreak hilo=True
count rank upstreak upleg upleg % drawdown rel drawdown
1987-01-02 1 1 13 219.069946 0.116193 0.017616 0.171407
2017-02-09 2 2 12 822.109375 0.041074 0.001875 0.047548
1970-11-19 3 2 12 66.900024 0.088986 0.010321 0.127055
1929-06-20 4 2 12 32.000000 0.101716 0.031134 0.340625
1991-12-18 5 3 11 315.100098 0.109167 0.011113 0.113614
1955-01-18 6 3 11 22.200012 0.057290 0.014334 0.265765
2017-07-25 7 4 10 622.289062 0.028949 NaN NaN
2013-03-01 8 4 10 488.959961 0.034801 0.008102 0.240919
1996-11-04 9 4 10 348.839844 0.058148 0.004792 0.087605
1973-07-16 10 4 10 53.600037 0.060695 0.095935 1.686565
1959-11-17 11 4 10 31.599976 0.049945 0.011216 0.237342
1959-06-24 12 4 10 36.200012 0.057680 0.020649 0.381215
1955-08-23 13 4 10 25.400024 0.056344 0.008772 0.165353
1933-03-03 14 4 10 12.600002 0.250497 0.142415 0.730158
1920-12-29 15 4 10 8.099998 0.119118 0.022339 0.209876
2016-07-08 16 5 9 778.378906 0.043688 0.016552 0.396003
1996-05-08 17 5 9 334.369629 0.061755 0.002442 0.041990
1989-07-03 18 5 9 141.890137 0.058804 0.007179 0.129677
1968-04-23 19 5 9 38.000000 0.043123 0.070535 1.736842
1967-04-13 20 5 9 49.700012 0.059061 0.006593 0.118713
1967-01-03 21 5 9 55.799988 0.071603 0.006321 0.094982
1965-01-22 22 5 9 18.500000 0.020838 0.031326 1.540541
1964-03-06 23 5 9 19.600037 0.024506 0.016127 0.678570
1955-06-15 24 5 9 12.399994 0.028343 0.005537 0.201613
1955-04-05 25 5 9 16.299988 0.039553 0.010465 0.276074
1954-09-01 26 5 9 18.599976 0.055822 0.009325 0.177419
1945-04-06 27 5 9 9.000000 0.058140 0.008526 0.155555
1929-02-18 28 5 9 21.800018 0.072812 0.086005 1.279815
1921-10-18 29 5 9 4.300003 0.061871 0.008130 0.139536
当前的连涨,尚未结束,排名(并列)第 4。请注意:
即使表格已经显示了持续上涨日结束后的回撤和相对回撤(从上涨日开始计算,因此可能 > 100%),但最好通过视觉方式来回答问题。
图表很快就会显示出来:
在排名第 1 和第 2 的 remarkable 日期中,我们有:
count rank upstreak upleg upleg % drawdown rel drawdown
1987-01-02 1 1 13 219.069946 0.116193 0.017616 0.171407
2017-02-09 2 2 12 822.109375 0.041074 0.001875 0.047548
1970-11-19 3 2 12 66.900024 0.088986 0.010321 0.127055
1929-06-20 4 2 12 32.000000 0.101716 0.031134 0.340625
...
的确,因为1987
和1929
后来确实有非常大的熊市。但是并不是连涨结束后立即发生,如统计数据所示:相对回撤没有超过 100%,因此新高跟随了那些连涨结束后。
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import collections
import datetime
import itertools
import matplotlib.pyplot as plt
import pandas as pd
import backtrader as bt
class UpStreak(bt.Analyzer):
params = dict(
sep=',',
hilo=False,
)
def __init__(self):
self.upday = bt.ind.UpDayBool()
self.curdt = None # streak start date
self.incs = dict() # upleg in points
self.pincs = dict() # upleg in percentage
self.close0 = dict() # starting price for upleg
self.peaks = collections.deque() # endng price for upleg
self.ddown = dict() # absolute drawdowns
self.ddownrel = dict() # relative drawdown (% of upleg retraced)
self.rets = collections.defaultdict(int) # holds main results
def next(self):
curclose = self.data.close[0]
lastclose = self.data.close[-1]
self.peaks.append((None, None))
while True:
dt, peak = self.peaks.popleft()
if dt is None:
break # all elements seen
if peak > curclose: # peak not overdone, update drawdown
ddown = 1.0 - curclose / peak
self.ddown[dt] = max(self.ddown[dt], ddown)
self.peaks.append((dt, peak)) # not done yet
inc = self.incs[dt]
fall = peak - curclose
ddownrel = fall / inc
self.ddownrel[dt] = max(self.ddownrel[dt], ddownrel)
if self.upday:
if self.curdt is None: # streak begins
self.curdt = self.strategy.datetime.date()
if self.p.hilo:
lastclose = self.data.low[-1]
self.close0[self.curdt] = lastclose
self.incs[self.curdt] = inc = curclose - self.close0[self.curdt]
self.pincs[self.curdt] = inc / self.close0[self.curdt]
self.rets[self.curdt] += 1 # update current streak
else:
if self.curdt is not None: # streak ends
if self.p.hilo:
lastclose = self.data.high[-1]
inc = self.incs[self.curdt]
fall = lastclose - curclose
self.ddownrel[self.curdt] = fall / inc
self.ddown[self.curdt] = 1.0 - curclose / lastclose
self.peaks.append((self.curdt, lastclose))
self.curdt = None
def stop(self):
s = sorted(
self.rets.items(),
reverse=True,
key=lambda item: (item[1], item[0])
)
# keep it in dict format
self.rets = collections.OrderedDict(s)
self.s = collections.OrderedDict(s)
self.headers = [
'date',
'count', 'rank', 'upstreak',
'upleg', 'upleg %',
'drawdown', 'rel drawdown',
]
i = 0
count = itertools.count(1)
last = float('inf')
for dt, streak in self.s.items():
if streak < last:
i += 1
last = streak
ddown = self.ddown.get(dt, None)
ddownrel = self.ddownrel.get(dt, None)
inc = self.incs.get(dt, None)
pinc = self.pincs.get(dt, None)
self.s[dt] = [
next(count), i,
streak,
inc, pinc,
ddown, ddownrel
]
def get_dataframe(self):
return pd.DataFrame.from_items(
self.s.items(),
orient='index',
columns=self.headers[1:], # skip index
)
def print_ranking(self):
i = 0
last = float('inf')
print(self.p.sep.join(self.headers))
for dt, items in self.s.items():
print(
self.p.sep.join(
str(x) for x in itertools.chain([dt], items)
)
)
def runstrat(args=None):
args = parse_args(args)
cerebro = bt.Cerebro()
kwargs = dict() # Data feed kwargs
# Parse from/to-date
dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S'
for a, d in ((getattr(args, x), x) for x in ['fromdate', 'todate']):
if a:
strpfmt = dtfmt + tmfmt * ('T' in a)
kwargs[d] = datetime.datetime.strptime(a, strpfmt)
fromdate = kwargs.get('fromdate', datetime.date.min)
store = bt.stores.VChartFile()
data = store.getdata(dataname=args.data, **kwargs)
cerebro.adddata(data)
cerebro.addanalyzer(UpStreak, **eval('dict(' + args.upstreak + ')'))
result = cerebro.run()
st0 = result[0]
a = st0.analyzers.upstreak
# Plot some things
# pd.set_option('display.max_columns', 500)
pd.set_option('display.expand_frame_repr', False)
df = a.get_dataframe()
up = df['upstreak']
up9 = df[up >= 9]
print(up9)
up7 = df[up >= 7]
x = up7['upstreak']
y = up7['rel drawdown'] * 100.0
plt.scatter(x, y)
plt.ylabel('% Relative Drawdown')
plt.xlabel('Updays streak')
plt.title('DJI Relative Drawdown after N consecutive UpDays')
plt.show()
# Plot some things
y = up7['drawdown'] * 100.0
plt.ylabel('% Absolute Drawdown')
plt.xlabel('Updays streak')
plt.title('DJI Drawdown after N consecutive UpDays')
plt.scatter(x, y)
plt.show()
def parse_args(pargs=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=(
'UpDayStreaks'
)
)
parser.add_argument('--data', default='', required=True,
help='Data Ticker')
parser.add_argument('--fromdate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--todate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--cerebro', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--upstreak', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--strat', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--plot', required=False, default='',
nargs='?', const='{}',
metavar='kwargs', help='kwargs in key=value format')
return parser.parse_args(pargs)
if __name__ == '__main__':
runstrat()
原文:
www.backtrader.com/blog/posts/2017-07-05-order-history/order-history/
使用版本1.9.55.122
,backtrader现在可以用于评估外部一组订单的性能。 例如可以用于:
...
cerebro.adddata(mydata)
...
cerebro.add_order_history(orders, notify=True or False)
...
cerebro.run()
显而易见的问题在于orders
应该是什么样子。让我们引用文档:
orders
:是一个可迭代对象(例如:列表、元组、迭代器、生成器),其中每个元素也将是一个具有以下子元素的可迭代对象(有 2 种格式)
[datetime, size, price]
或[datetime, size, price, data]
注意:它必须按排序(或产生排序元素)
datetime ascending`
其中:
datetime
是一个 python date/datetime
实例或具有格式 YYYY-MM-DD[THH:MM:SS[.us]]的字符串,其中括号中的元素是可选的
size
是一个整数(买为正,卖为负)
price
是一个浮点数/整数
data
如果存在,可以取以下任何值
cerebro.addata(data, name=value)
分配的,将是目标
对于notify
的情况:
notify
(默认值:True)
如果True
,则系统中插入的第一个策略将收到根据orders
中每个订单信息创建的人为订单的通知
注意
注意上面的示例是如何添加数据源的。是的,这是必要的。
orders 可能的样子的一个实际例子
ORDER_HISTORY = (
('2005-02-01', 1, 2984.63),
('2005-03-04', -1, 3079.93),
...
('2006-12-18', 1, 4140.99),
)
一个有 3 个元素的可迭代对象,可以完全从CSV文件中加载。
下面的示例执行两件事:
notify_order
和notify_trade
接收订单和交易通知
在两种情况下加载了一组分析器(时间回报以月和年计以及一个TradeAnalyzer
)……它们应该返回相同的值。
$ ./order-history.py --plot --cerebro writer=True
这产生了一个图表
和一些文本输出(为简洁起见截断):
Creating Signal Strategy
2005-02-01,1,2984.63
2005-03-04,-1,3079.93
...
2006-12-01,-1,3993.03
profit 177.9000000000001
2006-12-18,1,4140.99
===============================================================================
Cerebro:
...
- timereturn1:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Params:
- timeframe: 8
- compression: None
- _doprenext: True
- data: None
- firstopen: True
- fund: None
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Analysis:
- 2005-12-31: 0.03580099999999975
- 2006-12-31: 0.01649448108275653
.......................................................................
- tradeanalyzer:
- Params: None
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Analysis:
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
- total:
- total: 14
- open: 1
- closed: 13
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
- streak:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- won:
- current: 2
- longest: 2
...
$ ./order-history.py --plot --cerebro writer=True --order-history
这产生了一个看起来没有差异的图表
和一些文本输出(为了简洁起见再次截断):
Creating Empty Strategy
2005-02-01,1,2984.63
2005-03-04,-1,3079.93
...
.......................................................................
- timereturn1:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Params:
- timeframe: 8
- compression: None
- _doprenext: True
- data: None
- firstopen: True
- fund: None
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Analysis:
- 2005-12-31: 0.03580099999999975
- 2006-12-31: 0.01649448108275653
.......................................................................
- tradeanalyzer:
- Params: None
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Analysis:
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
- total:
- total: 14
- open: 1
- closed: 13
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
- streak:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- won:
- current: 2
- longest: 2
...
并且所期望的值与参考值相匹配。
例如,可以测量判断性交易的表现。有时会与算法交易结合使用,其中算法生成信号,但人类最终决定是否将信号转化为实际交易。
$ ./order-history.py --help
usage: order-history.py [-h] [--data0 DATA0] [--fromdate FROMDATE]
[--todate TODATE] [--order-history] [--cerebro kwargs]
[--broker kwargs] [--sizer kwargs] [--strat kwargs]
[--plot [kwargs]]
Order History Sample
optional arguments:
-h, --help show this help message and exit
--data0 DATA0 Data to read in (default:
../../datas/2005-2006-day-001.txt)
--fromdate FROMDATE Date[time] in YYYY-MM-DD[THH:MM:SS] format (default: )
--todate TODATE Date[time] in YYYY-MM-DD[THH:MM:SS] format (default: )
--order-history use order history (default: False)
--cerebro kwargs kwargs in key=value format (default: )
--broker kwargs kwargs in key=value format (default: )
--sizer kwargs kwargs in key=value format (default: )
--strat kwargs kwargs in key=value format (default: )
--plot [kwargs] kwargs in key=value format (default: )
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import datetime
import backtrader as bt
ORDER_HISTORY = (
('2005-02-01', 1, 2984.63),
('2005-03-04', -1, 3079.93),
('2005-03-08', 1, 3113.82),
('2005-03-22', -1, 3040.55),
('2005-04-08', 1, 3092.07),
('2005-04-20', -1, 2957.92),
('2005-05-13', 1, 2991.71),
('2005-08-19', -1, 3284.35),
('2005-08-22', 1, 3328.84),
('2005-08-25', -1, 3293.69),
('2005-09-12', 1, 3361.1),
('2005-10-18', -1, 3356.73),
('2005-11-09', 1, 3361.92),
('2006-01-24', -1, 3544.78),
('2006-02-06', 1, 3678.87),
('2006-03-13', -1, 3801.03),
('2006-03-20', 1, 3833.25),
('2006-04-13', -1, 3777.24),
('2006-05-02', 1, 3839.24),
('2006-05-16', -1, 3711.46),
('2006-06-30', 1, 3592.01),
('2006-07-21', -1, 3580.53),
('2006-08-01', 1, 3687.82),
('2006-09-14', -1, 3809.08),
('2006-09-25', 1, 3815.13),
('2006-12-01', -1, 3993.03),
('2006-12-18', 1, 4140.99),
)
class SmaCross(bt.SignalStrategy):
params = dict(sma1=10, sma2=20)
def notify_order(self, order):
if not order.alive():
print(','.join(str(x) for x in
(self.data.num2date(order.executed.dt).date(),
order.executed.size * 1 if order.isbuy() else -1,
order.executed.price)))
def notify_trade(self, trade):
if trade.isclosed:
print('profit {}'.format(trade.pnlcomm))
def __init__(self):
print('Creating Signal Strategy')
sma1 = bt.ind.SMA(period=self.params.sma1)
sma2 = bt.ind.SMA(period=self.params.sma2)
crossover = bt.ind.CrossOver(sma1, sma2)
self.signal_add(bt.SIGNAL_LONG, crossover)
class St(bt.Strategy):
params = dict(
)
def notify_order(self, order):
if not order.alive():
print(','.join(str(x) for x in
(self.data.num2date(order.executed.dt).date(),
order.executed.size * 1 if order.isbuy() else -1,
order.executed.price)))
def notify_trade(self, trade):
if trade.isclosed:
print('profit {}'.format(trade.pnlcomm))
def __init__(self):
print('Creating Empty Strategy')
pass
def next(self):
pass
def runstrat(args=None):
args = parse_args(args)
cerebro = bt.Cerebro()
# Data feed kwargs
kwargs = dict()
# Parse from/to-date
dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S'
for a, d in ((getattr(args, x), x) for x in ['fromdate', 'todate']):
if a:
strpfmt = dtfmt + tmfmt * ('T' in a)
kwargs[d] = datetime.datetime.strptime(a, strpfmt)
data0 = bt.feeds.BacktraderCSVData(dataname=args.data0, **kwargs)
cerebro.adddata(data0)
# Broker
cerebro.broker = bt.brokers.BackBroker(**eval('dict(' + args.broker + ')'))
# Sizer
cerebro.addsizer(bt.sizers.FixedSize, **eval('dict(' + args.sizer + ')'))
# Strategy
if not args.order_history:
cerebro.addstrategy(SmaCross, **eval('dict(' + args.strat + ')'))
else:
cerebro.addstrategy(St, **eval('dict(' + args.strat + ')'))
cerebro.add_order_history(ORDER_HISTORY, notify=True)
cerebro.addanalyzer(bt.analyzers.TimeReturn, timeframe=bt.TimeFrame.Months)
cerebro.addanalyzer(bt.analyzers.TimeReturn, timeframe=bt.TimeFrame.Years)
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer)
# Execute
cerebro.run(**eval('dict(' + args.cerebro + ')'))
if args.plot: # Plot if requested to
cerebro.plot(**eval('dict(' + args.plot + ')'))
def parse_args(pargs=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=(
'Order History Sample'
)
)
parser.add_argument('--data0', default='../../datas/2005-2006-day-001.txt',
required=False, help='Data to read in')
# Defaults for dates
parser.add_argument('--fromdate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--todate', required=False, default='',
help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')
parser.add_argument('--order-history', required=False, action='store_true',
help='use order history')
parser.add_argument('--cerebro', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--broker', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--sizer', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--strat', required=False, default='',
metavar='kwargs', help='kwargs in key=value format')
parser.add_argument('--plot', required=False, default='',
nargs='?', const='{}',
metavar='kwargs', help='kwargs in key=value format')
return parser.parse_args(pargs)
if __name__ == '__main__':
runstrat()