In the original FMZ policy design, asynchronous concurrent operations were only used if needed.exchange.Go()
Functions implemented concurrently in FMZ wrapper interfaces are not able to perform some custom operations (functions) at the same time. Although this design makes it much more efficient to execute the policy program, it is not familiar to students who have experience with concurrent design in native programming languages.
Even the newcomers to FMZ who use the introductory quantitative trading don't understand.exchange.Go()
Use of functions, useexchange.Go()
It still seems to be one execution statement at a time in the sequential code. So in this article, we explore the new synchronization functionality of the FMZ platform:__Thread()
The use of a series of functions, etc., is asynchronous with the design of the policy procedure.
If we want to have a policy thread run at the same time as a sub-thread to execute a custom function we have written, we can use a design similar to the following code.GetTickerAsync()
This function executes a dead loop, in which the function is written as a dead loop.while
The API for the FMZ that is constantly being called in the loop:GetTicker()
This is a very useful tool to get market data.
And then use it again.__threadSetData(0, "ticker", t)
This is written to the main thread as a data, the data name is calledticker
The data value ist
That is,GetTicker()
The return value of this function is.
__threadSetData(0, "ticker", t)
And then we can write a custom function that executes the threads in parallel.main()
So the code in the function is,main()
We start the function by using:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
You create a parallel thread, and that thread starts executing.GetTickerAsync()
The function.main()
The function starts executing itself.while
Circulation, receiving in the cycleGetTickerAsync()
The function updates the data and then prints:
var t = __threadGetData(0, "ticker")
Log(t)
Here's a full code example:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
This is a test run on a real disk:
This is one of the simplest application designs, and we'll look at some of the other needs designs next.
A function can be designed to create 10 threads simultaneously, each of which performs a subordinate operation function.main()
Design a functionwhile
In this case, the user can use the following commands:placeMultipleOrders
So let's call this concurrent function.testPlaceMultipleOrders()
。
if (cmd == "placeMultipleOrders") {
// ...
}
Add policy interaction design on the policy edit page, set a button and command: placeMultipleOrders
Here's a full code example:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}
This request was made by an FMZ user who wanted a simple example to demonstrate how to use it in parallel threads.WebSocketIt's a simple way of connecting and designing how to get the data to the main thread.main()
The function ≠ ∞
It's actually very simple, and it's pretty much the same as creating concurrent threads in the previous example.__threadPeekMessage()
Functions and__threadPostMessage()
Function. Using the example of the WebSocket API call on the Binance Exchange, we also need to pay attention to the closing operation of the WebSocket connection in the design. The following example also shows how to notify a concurrent thread to stop it.
Here's a full code example:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
This is a test run on a real disk:
You can see.main()
The function continuously receives traffic data from WebSocket connections created by concurrent threads.
When the policy is stopped, the sweep function starts working:
Spartan play quantifiedOne last example, if there are a lot of ws threads, and you subscribe to multiple threads, which is better to use the get/set method or the peek/post method when communicating between threads?
Spartan play quantifiedThe underlying implementation of thread-sharing variables is that they do not support reference variables, and have to be reset every time they are updated, which is very inefficient.
Inventors quantify - small dreamsThe two ways are no different, but they are all possible.