在最初的FMZ策略设计中,如果需要使用异步并发的操作只能使用exchange.Go()
函数来实现FMZ封装接口的并发,无法并发执行一些自定义的操作(函数)。虽然这种设计让策略程序执行起来效率提高不少,但是对于原生编程语言中有过并发设计经验的同学总感觉十分不习惯。
甚至有使用FMZ入门量化交易的新同学不理解exchange.Go()
函数的使用,使用exchange.Go()
看起来依然是在顺序执行的代码中挨个执行语句。那么本篇文章我们就一起来探索FMZ平台新增的并发线程功能:__Thread()
等系列函数的使用与策略程序异步设计。
如果我们想让策略主线程运行的同时,并发运行一个子线程来执行我们编写的自定义函数,就可以使用类似以下代码的设计。在策略代码中自定义一个函数GetTickerAsync()
,编写这个函数的具体功能。这个函数执行一个死循环,在这个while
循环中不停的调用FMZ的API接口:GetTicker()
来获取行情数据。
然后再使用__threadSetData(0, "ticker", t)
这句向主线程写入一个数据,数据名称为ticker
,数据值为t
即GetTicker()
的返回值。
__threadSetData(0, "ticker", t)
设计好线程并发执行的自定义函数后,我们就可以编写main()
函数中的代码了,在main()
函数开始,我们使用:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
创建一个并发的线程,这个线程开始执行GetTickerAsync()
函数。接着main()
函数开始执行自己的while
循环,在循环中接收GetTickerAsync()
函数更新的数据然后打印:
var t = __threadGetData(0, "ticker")
Log(t)
完整的代码例子:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
实盘运行测试:
这是一种最简单的应用设计,接下来我们来看下另外的一些需求设计。
可以设计一个函数同时创建10个线程,每个线程都执行一个下单操作函数。在main()
函数中设计一个while
循环,检测策略交互指令。接收到交互指令:placeMultipleOrders
就去调用这个并发下单函数testPlaceMultipleOrders()
。
if (cmd == "placeMultipleOrders") {
// ...
}
在策略编辑页面增加策略交互设计,设置一个按钮,命令为:placeMultipleOrders
完整的代码例子:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}
测试使用挂单的方式,从当前价格80% ~ 90%递增,使用模拟盘环境测试,点击交互按钮触发测试下单:
点击「placeMultipleOrders」按钮后,提示信息:
策略日志上显示并发下单操作:
这个需求是一个FMZ用户提出的,希望有一个简单的例子演示如何在并发的线程中使用WebSocket连接,并且设计如何把数据传递到主线程的main()
函数。
其实非常简单,和前面的例子中创建并发的线程差不多。只不过线程间通信使用__threadPeekMessage()
函数和__threadPostMessage()
函数。以币安交易所的WebSocket API接口调用为例,在设计中我们还需要注意对于WebSocket连接的关闭操作,以下例子中也展示了如何给一个并发的线程通知,让其停止。
完整的代码例子:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
实盘运行测试:
可以看到main()
函数中不停收到来自并发线程创建的WebSocket连接收到的行情数据。
当停止策略实盘时,扫尾函数会开始工作:
斯巴达玩量化 最后一个实例,如果ws线程很多,并且订阅了多个话题,那么线程间通信的话,使用get/set的方式还是peek/post的方式哪种性能更好呢
斯巴达玩量化 线程间共享变量的底层实现是不支持引用变量的,每次更新都得重新set,这效率很低
发明者量化-小小梦 两种方式差别不大的,都可以。