Dual Thrust交易算法是由Michael Chalek开发的著名量化交易策略。它通常用于期货,外汇和股票市场。Dual Thrust的概念属于典型的突破交易系统,其运用“双推力”系统根据历史价格构建更新的回溯期,这在理论上使其在任何给定时期内更加稳定。
在这篇文章中,我们给出了此策略的详细逻辑细节,并展示了如何在发明者量化平台上实现此算法。首先,我们要选择所交易标的的历史价格,该范围基于最近N天的收盘价,最高价和最低价计算。当市场从开盘价移动一定范围时,执行开仓。
我们在常见的两个市场状态下用单个交易对测试了此策略,即趋势市场和震荡市场。结果表明,这种动量交易系统在趋势市场中运行得更好,在波动较大的市场中会触发一些无效买卖信号。在区间市场下,我们可以调整参数以获得更好的回报。作为个别参照交易标的的比较,我们还测试了国内商品期货市商。结果表明该策略好于平均表现。
它的逻辑原型是常见的日内交易策略。开盘区间突破策略基于今天的开盘价加上或减去昨天幅度的一定百分比来确定上下轨。当价格突破上方轨道时,它会开仓买入,当它突破下方轨道时,它会开仓做空。
策略原理
在收盘后,计算两个价值:最高价 - 收盘价,收盘价 - 最低价。然后取这两个值中较大的值,将该值乘以0.7。让我们称之为值K,K值我们称为触发值。
第二日开市后,记录开盘价,然后在价格超过(开盘价+触发价值)时立即买入,或在价格低于(开盘价 - 触发价值)时卖空。
此策略没有明显的止损。这个系统是一个反向系统,也就是说,如果在价格超过(开盘价+触发值)时有一个空头仓位订单,那么它将发送两个买单(一个关闭错误的仓位,另一个打开正确方向的仓位)。出于同样的原因,如果有一个多头仓位价格低于(开盘价-触发价值),那么它将发送两个卖单。
范围=最大值(HH-LC,HC-LL)
多头信号的计算方法是
cap = open + K1 × Rangecap = open + K1 × Range
空头短信号的计算方法是
floor = open – K2 × Rangefloor = open – K2 × Range
其中K1和K2是参数。当K1大于K2时,触发多头信号,反之亦然。为了演示,我们选择K1 = K2 = 0.5。在实际交易中,我们仍然可以使用历史数据来优化这些参数或根据市场趋势调整参数。如果您看涨市场,K1应小于K2,如果您看跌市场,则K1应大于K2。
该系统是一个反转系统,因此如果投资者在价格突破上轨时持有空头仓位,则在开多仓前要先平掉空头仓位。如果投资者在价格突破下轨时持有多头仓位,则在开新的空头仓位之前应先平掉多头仓位。
在范围设置中,引入前N天的四个价格点(高,开,低,收),使得一定时期内的范围相对稳定,可应用于日线趋势跟踪。
此策略的开多和空的触发条件,考虑不对称幅度,多空交易可参考范围应该选择不同数量的周期,也可以通过参数K1和K2来确定。当K1<K2时,多头信号相对容易被触发,而当K1>K2时,空头信号相对容易被触发。
因此,在使用此策略时,一方面可以参考历史数据回测的最佳参数。另一方面,您可以根据自己对后势的判断或其他主要周期技术指标分阶段调整K1和K2。
这是一种等待信号,进入市场,套利,然后离开市场的典型交易方式,但效果非常出色。
我们打开,FMZ.COM, 登陆账户,点击控制中心,部署托管者和机器人。
关于如何部署托管者和机器人,请参考我之前的文章:https://www.fmz.com/bbs-topic/4140
想购买自己云计算服务器部署托管者的读者,可以参考这篇文章:https://www.fmz.com/bbs-topic/2848
接下来,我们点击左侧栏目当中的策略库,点击新建策略
在编写策略页面右上角记得选择编程语言为Python,如图:
接下来我们把Python代码写入代码编辑页面中,下面的代码,有着非常详细的逐行注释,各位读者可以慢慢理解和体会。
我们就用OKCoin期货来测试这个策略:
import time # 这里需要引入python自带的时间库,后边的程序会用到
class Error_noSupport(BaseException): # 我们定义一个名为ChartCfg的全局class,用来初始化策略图表设置。对象有很多关于图表功能的属性。图表库为:HighCharts
def __init__(self): # log出提示信息
Log("只支持OKCoin期货!#FF0000")
class Error_AtBeginHasPosition(BaseException):
def __init__(self):
Log("启动时有期货持仓! #FF0000")
ChartCfg = {
'__isStock': True, # 该属性用于控制是否显示为单独控制数据序列(可以在图表上取消单独一个数据序列的显示),如果指定__isStock: false, 则显示为普通图表
'title': { # title为图表的主要标题
'text': 'Dual Thrust 上下轨图' # title的一个属性text为标题的文本,这里设置为'Dual Thrust 上下轨图'该文本就会显示在标题位置
},
'yAxis': { # 图表坐标Y轴的相关设置
'plotLines': [{ # Y轴上的水平线(和Y轴垂直),该属性的值是一个数组,即多条水平线的设置
'value': 0, # 水平线在Y轴上的坐标值
'color': 'red', # 水平线的颜色
'width': 2, # 水平线的线宽
'label': { # 水平线上的标签
'text': '上轨', # 标签的文本
'align': 'center' # 标签的显示位置,这里设置为居中(即 :'center')
},
}, { # 第二条水平线([{...},{...}]数组中的第二个元素)
'value': 0, # 水平线在Y轴上的坐标值
'color': 'green', # 水平线的颜色
'width': 2, # 水平线的线宽
'label': { # 标签
'text': '下轨',
'align': 'center'
},
}]
},
'series': [{ # 数据序列,即用来在图表上显示数据线、K线、标记等等内容的数据。也是一个数组第一个索引为0。
'type': 'candlestick', # 索引为0数据序列的类型:'candlestick' 表示为K线图
'name': '当前周期', # 数据序列的名称
'id': 'primary', # 数据序列的ID,用于下一个数据序列相关设置。
'data': [] # 数据序列的数组,用于储存具体的K线数据
}, {
'type': 'flags', # 数据序列,类型:'flags',在图表上显示标签,表示做多和做空。索引为1。
'onSeries': 'primary', # 这个属性表示标签显示在id为'primary'上。
'data': [] # 保存标签数据的数组。
}]
}
STATE_IDLE = 0 # 状态常量,表示空闲
STATE_LONG = 1 # 状态常量,表示持多仓
STATE_SHORT = 2 # 状态常量,表示持空仓
State = STATE_IDLE # 表示当前程序状态 ,初始赋值为空闲
LastBarTime = 0 # K线最后一柱的时间戳(单位为毫秒,1000毫秒等于1秒,时间戳是1970年1月1日到现在时刻的毫秒数是一个很大的正整数)
UpTrack = 0 # 上轨值
BottomTrack = 0 # 下轨值
chart = None # 用于接受Chart这个API函数返回的图表控制对象。用该对象(chart)可以调用其成员函数向图表内写入数据。
InitAccount = None # 初始账户情况
LastAccount = None # 最新账户情况
Counter = { # 计数器,用于记录盈亏次数
'w': 0, # 赢次数
'l': 0 # 亏次数
}
def GetPosition(posType): # 定义一个函数,用来存储账户持仓信息
positions = exchange.GetPosition() # exchange.GetPosition()是发明者量化的官方API,关于它的用法,请参考我的官方API文档:https://www.fmz.com/api
return [{'Price': position['Price'], 'Amount': position['Amount']} for position in positions if position['Type'] == posType] # 返回各种持仓信息
def CancelPendingOrders(): # 定义一个函数,专门用来撤单
while True: # 循环检查
orders = exchange.GetOrders() # 如果有持仓
[exchange.CancelOrder(order['Id']) for order in orders if not Sleep(500)] # 撤单语句
if len(orders) == 0: # 逻辑判断
break
def Trade(currentState,nextState): # 定义一个函数,用来判断下单逻辑
global InitAccount,LastAccount,OpenPrice,ClosePrice # 定义全局作用域
ticker = _C(exchange.GetTicker) # 关于_C的用法,请参考:https://www.fmz.com/api
slidePrice = 1 # 定义滑点值
pfn = exchange.Buy if nextState == STATE_LONG else exchange.Sell # 买卖判断逻辑
if currentState != STATE_IDLE: # 循环开始
Log(_C(exchange.GetPosition)) # 日志信息
exchange.SetDirection("closebuy" if currentState == STATE_LONG else "closesell") # 调整下单方向,特别是下过单后
while True:
ID = pfn( (ticker['Last'] - slidePrice) if currentState == STATE_LONG else (ticker['Last'] + slidePrice), AmountOP) # 限价单,ID = pfn(-1, AmountOP)为市价单,ID = pfn(AmountOP)为市价单
Sleep(Interval) # 休息一阵,防止API访问频率过快,账户被封。
Log(exchange.GetOrder(ID)) # Log信息
ClosePrice = (exchange.GetOrder(ID))['AvgPrice'] # 设置收盘价
CancelPendingOrders() # 调用撤单函数
if len(GetPosition(PD_LONG if currentState == STATE_LONG else PD_SHORT)) == 0: # 撤单逻辑
break
account = exchange.GetAccount() # 获取账户信息
if account['Stocks'] > LastAccount['Stocks']: # 如果当前账户币值大于之前账户币值
Counter['w'] += 1 # 盈亏计数器中,盈利次数加一
else:
Counter['l'] += 1 # 否者亏损次数加一
Log(account) # log信息
LogProfit((account['Stocks'] - InitAccount['Stocks']),"收益率:", ((account['Stocks'] - InitAccount['Stocks']) * 100 / InitAccount['Stocks']),'%')
Cal(OpenPrice,ClosePrice)
LastAccount = account
exchange.SetDirection("buy" if nextState == STATE_LONG else "sell") # 这一段的逻辑同上,不再详述
Log(_C(exchange.GetAccount))
while True:
ID = pfn( (ticker['Last'] + slidePrice) if nextState == STATE_LONG else (ticker['Last'] - slidePrice), AmountOP)
Sleep(Interval)
Log(exchange.GetOrder(ID))
CancelPendingOrders()
pos = GetPosition(PD_LONG if nextState == STATE_LONG else PD_SHORT)
if len(pos) != 0:
Log("持仓均价",pos[0]['Price'],"数量:",pos[0]['Amount'])
OpenPrice = (exchange.GetOrder(ID))['AvgPrice']
Log("now account:",exchange.GetAccount())
break
def onTick(exchange): # 程序主要函数,程序主要逻辑都是在该函数内处理。
global LastBarTime,chart,State,UpTrack,DownTrack,LastAccount # 定义全局作用域
records = exchange.GetRecords() # 关于exchange.GetRecords()的用法,请参见:https://www.fmz.com/api
if not records or len(records) <= NPeriod: # 防止发生意外的判断语句
return
Bar = records[-1] # 取records K线数据的倒数第一个元素,也就是最后一个bar
if LastBarTime != Bar['Time']:
HH = TA.Highest(records, NPeriod, 'High') # 声明HH变量,调用TA.Highest函数计算当前K线数据NPeriod周期内最高价的最大值赋值给HH。
HC = TA.Highest(records, NPeriod, 'Close') # 声明HC变量,获取NPeriod周期内的收盘价的最大值。
LL = TA.Lowest(records, NPeriod, 'Low') # 声明LL变量,获取NPeriod周期内的最低价的最小值。
LC = TA.Lowest(records, NPeriod, 'Close') # 声明LC变量,获取NPeriod周期内的收盘价的最小值。具体TA相关的应用,请参见官方API文档。
Range = max(HH - LC, HC - LL) # 计算出范围
UpTrack = _N(Bar['Open'] + (Ks * Range)) # 根据界面参数的上轨系数Ks最新K线柱的开盘价等,计算出上轨值。
DownTrack = _N(Bar['Open'] - (Kx * Range)) # 计算下轨值
if LastBarTime > 0: # 由于LastBarTime该变量初始化设置的值为0,所以第一次运行到此处LastBarTime > 0必定是false,不会执行if块内的代码,而是会执行else块内的代码
PreBar = records[-2] # 声明一个变量含义是“前一个Bar”把当前K线的倒数第二Bar赋值给它。
chart.add(0, [PreBar['Time'], PreBar['Open'], PreBar['High'], PreBar['Low'], PreBar['Close']], -1) # 调用chart图标控制类的add函数更新K线数据(用获取的K线数据的倒数第二Bar去更新图标的倒数第一个Bar,因为有新的K线Bar生成)
else: # chart.add函数的具体用法请参见API文档,和论坛里的文章。程序第一次运行到此必定执行else块内代码,主要作用是把第一次获取的K线一次性全部添加到图表上。
for i in range(len(records) - min(len(records), NPeriod * 3), len(records)): # 此处执行一个for循环,循环次数使用K线长度和NPeriod的3倍二者中最小的值,可以保证初始的K线不会画的太多太长。索引是从大到小的。
b = records[i] # 声明一个临时变量b用来取每次循环索引为records.length - i的K线柱数据。
chart.add(0,[b['Time'], b['Open'], b['High'], b['Low'], b['Close']]) # 调用chart.add函数向图表添加K线柱,注意add函数最后一个参数如果传入-1就是更新图表上最后一个Bar(柱),如果没传参数,就是向最后添加Bar。执行完i等于2这次循环后(i-- 了已经,此时为1了),就会触发i > 1为false停止循环,可见此处代码只处理到records.length - 2这个Bar,最后一个Bar没有处理。
chart.add(0,[Bar['Time'], Bar['Open'], Bar['High'], Bar['Low'], Bar['Close']]) # 由于以上if的2个分支都没处理records.length - 1这个Bar,所以此处处理。添加最新出现的Bar到图表中。
ChartCfg['yAxis']['plotLines'][0]['value'] = UpTrack # 把计算出来的上轨值赋值给图表对象(区别于图表控制对象chart),用于稍后显示。
ChartCfg['yAxis']['plotLines'][1]['value'] = DownTrack # 赋值下轨值
ChartCfg['subtitle'] = { # 设置副标题
'text': '上轨' + str(UpTrack) + '下轨' + str(DownTrack) # 副标题文本设置,在副标题上显示出上轨下轨值。
}
chart.update(ChartCfg) # 用图表类ChartCfg更新图表
chart.reset(PeriodShow) # 刷新根据界面参数设置的PeriodShow变量,只保留PeriodShow的值数量的K线柱。
LastBarTime = Bar['Time'] # 此次新产生的Bar的时间戳更新,给LastBarTime用于判断下次循环获取的K线数据最后一个Bar,是否是新产生的。
else: # 如果LastBarTime等于Bar.Time即:没有新的K线Bar产生。则执行一下{..}内代码
chart.add(0,[Bar['Time'], Bar['Open'], Bar['High'], Bar['Low'], Bar['Close']], -1) # 用当前K线数据的最后一个Bar(K线的最后一个Bar即当前周期的Bar是不断在变化的),更新图表上的最后一个K线柱。
LogStatus("Price:", Bar["Close"], "up:", UpTrack, "down:", DownTrack, "wins:", Counter['w'], "losses:", Counter['l'], "Date:", time.time()) # 调用LogStatus函数显示当前策略的数据在状态栏上。
msg = "" # 定义一个变量msg。
if State == STATE_IDLE or State == STATE_SHORT: # 判断当前状态变量State是否等于空闲或者State是否等于持空仓,在空闲状态下可以触发做多,在持空仓状态下可以触发平多仓,并反手。
if Bar['Close'] >= UpTrack: # 如果当前K线的收盘价大于上轨值,执行if块内代码。
msg = "做多,触发价:" + str(Bar['Close']) + "上轨" + str(UpTrack) # 给msg赋值,把需要显示的数值组合成字符串。
Log(msg) # 信息
Trade(State, STATE_LONG) # 调用上边的Trade函数进行交易
State = STATE_LONG # 无论开多仓还是反手,此刻程序状态要更新为持多仓。
chart.add(1,{'x': Bar['Time'], 'color': 'red', 'shape': 'flag', 'title': '多', 'text': msg}) # 在K线相应的位置添加一个标记显示开多。
if State == STATE_IDLE or State == STATE_LONG: # 做空方向与以上同理,不在赘述。代码完全一致。
if Bar['Close'] <= DownTrack:
msg = "做空,触发价:" + str(Bar['Close']) + "下轨" + str(DownTrack)
Log(msg)
Trade(State, STATE_SHORT)
State = STATE_SHORT
chart.add(1,{'x': Bar['Time'], 'color': 'green', 'shape': 'circlepin', 'title': '空', 'text': msg})
OpenPrice = 0 # 初始化OpenPrice和ClosePrice
ClosePrice = 0
def Cal(OpenPrice, ClosePrice): # 定义一个Cal函数,用来计算策略运行后的盈亏情况
global AmountOP,State
if State == STATE_SHORT:
Log(AmountOP,OpenPrice,ClosePrice,"策略盈亏:", (AmountOP * 100) / ClosePrice - (AmountOP * 100) / OpenPrice, "个币, 手续费:", - (100 * AmountOP * 0.0003), "美元,折合:", _N( - 100 * AmountOP * 0.0003/OpenPrice,8), "个币")
Log(((AmountOP * 100) / ClosePrice - (AmountOP * 100) / OpenPrice) + (- 100 * AmountOP * 0.0003/OpenPrice))
if State == STATE_LONG:
Log(AmountOP,OpenPrice,ClosePrice,"策略盈亏:", (AmountOP * 100) / OpenPrice - (AmountOP * 100) / ClosePrice, "个币, 手续费:", - (100 * AmountOP * 0.0003), "美元,折合:", _N( - 100 * AmountOP * 0.0003/OpenPrice,8), "个币")
Log(((AmountOP * 100) / OpenPrice - (AmountOP * 100) / ClosePrice) + (- 100 * AmountOP * 0.0003/OpenPrice))
def main(): # 策略程序的主函数。(入口函数)
global LoopInterval,chart,LastAccount,InitAccount # 定义全局作用域
if exchange.GetName() != 'Futures_OKCoin': # 判断添加的交易所对象的名称(通过exchange.GetName函数获取)如果不等于'Futures_OKCoin'即:添加的不是OKCoin期货交易所对象。
raise Error_noSupport # 抛出异常
exchange.SetRate(1) # 设置交易所的各种参数
exchange.SetContractType(["this_week","next_week","quarter"][ContractTypeIdx]) # 确定要交易的哪种具体合约。
exchange.SetMarginLevel([10,20][MarginLevelIdx]) # 设置保证金率,也就是杠杆。
if len(exchange.GetPosition()) > 0: # 设置容错机制
raise Error_AtBeginHasPosition
CancelPendingOrders()
InitAccount = LastAccount = exchange.GetAccount()
LoopInterval = min(1,LoopInterval)
Log("交易平台:",exchange.GetName(), InitAccount)
LogStatus("Ready...")
LogProfitReset()
chart = Chart(ChartCfg)
chart.reset()
LoopInterval = max(LoopInterval, 1)
while True: # 循环整个交易逻辑,调用onTick函数
onTick(exchange)
Sleep(LoopInterval * 1000) # 休息一阵,防止API访问频率过快,账户被封。
写完代码后,请注意我们还未完成整个策略的编写部分,接下来,我们需要把策略中用的参数添加到策略编写页面,添加的方法十分简单,直接点击策略编写对话框下方的加号逐个添加即可。
需要添加的内容:
到此,我们终于完成策略的编写部分,接下来,我们就开始回测这个策略吧。
写完策略后,我们首先要做的就是回测它,看它在历史数据中表现如何,但是请各位读者千万注意,回测的结果不等于未来的预判,回测只能作为一种参考信息来考虑我们的策略有效性。一旦市场发生变化,策略开始有大的亏损出现,我们应该及时去发现问题,然后改变策略以适应新的市场环境,比如上文提到的阀值,如果策略出现大于百分之10的亏损,我们就应该马上停止策略运行,然后查找问题,可以先从调节阀值开始入手。
点击策略编辑页面中的模拟回测,在回测页面,参数的调节可以根据需求的不同,进行方便快捷的调试,特别是对于逻辑复杂,参数众多的策略,不用再回去源码,进行逐个修改。
回测时间我们选最近半年的,点击添加OKCoin期货交易所,选择BTC交易标的。
可以看到,最近半年由于BTC的单边趋势非常不错,策略收获了很好的收益。
有问题的朋友可以到 https://www.fmz.com/bbs 留言,无论是关于策略还是平台的技术,发明者量化平台有专业的人员随时为您解答。
MAIDOVE 这个策略应该没有带止损跟止盈?