Dans la conception initiale de la stratégie FMZ, les opérations asynchrones ne pouvaient être utilisées que si elles étaient nécessaires.exchange.Go()
Les fonctions pour réaliser la concomitance de l'interface FMZ ne peuvent pas exécuter simultanément certaines opérations personnalisées (fonctions). Bien que cette conception améliore considérablement l'efficacité de l'exécution des programmes stratégiques, les élèves qui ont déjà eu une expérience de la conception en parallèle dans un langage de programmation natif ne sont pas habitués à cette conception.
Même les nouveaux étudiants qui utilisent FMZ pour les transactions quantitatives ne comprennent pas.exchange.Go()
Utilisation des fonctionsexchange.Go()
Il semble toujours que les phrases soient exécutées une par une dans le code exécuté dans l'ordre. Dans cet article, nous allons explorer les nouvelles fonctionnalités de synchronisation de la plateforme FMZ:__Thread()
L'utilisation des fonctions de série, etc., est asynchrone avec la conception des procédures stratégiques.
Si nous voulons que le fil principal de la stratégie fonctionne en même temps que le fil secondaire pour exécuter les fonctions personnalisées que nous avons écrites, nous pouvons utiliser une conception similaire au code suivant.GetTickerAsync()
Cette fonction exécute un cercle mort, dans ce cas, la fonction est appelée un cercle mort.while
L'API de l'appel FMZ en boucle:GetTicker()
Il y a aussi des sites de téléchargement de vidéos pour les consommateurs.
Puis utilisez-le.__threadSetData(0, "ticker", t)
C'est une phrase qui écrit une donnée dans le fil principal, appelée nom de donnée.ticker
La valeur est:t
Je veux dire,GetTicker()
La valeur de retour de l'élément ≠ 2 {\displaystyle \mathbb {2}}}
__threadSetData(0, "ticker", t)
Une fois que nous avons conçu les fonctions personnalisées qui s'exécutent en parallèle, nous pouvons écriremain()
C'est le code dans la fonction.main()
La fonction commence avec:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
Nous créons un fil en parallèle qui commence à exécuter.GetTickerAsync()
Les fonctions.main()
Les fonctions commencent à exécuter leur propre fonction.while
La circulation, la réception dans la circulationGetTickerAsync()
Les données mises à jour par la fonction sont imprimées:
var t = __threadGetData(0, "ticker")
Log(t)
L'exemple de code complet est ici:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
Les tests sont effectués sur disque:
Il s'agit de la conception la plus simple d'applications, et nous allons voir d'autres conceptions de besoins.
Une fonction peut être conçue pour créer 10 threads simultanément, chacun exécutant une fonction d'opération de sous-ordre.main()
On peut créer une fonctionwhile
La stratégie d'interaction de la roue, de la détection.placeMultipleOrders
On peut appeler cette fonction en même temps.testPlaceMultipleOrders()
。
if (cmd == "placeMultipleOrders") {
// ...
}
Ajoutez la conception interactive de la politique sur la page d'édition de la politique, en définissant un bouton avec la commande: placeMultipleOrders
L'exemple de code complet est ici:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}
Le test est effectué à l'aide d'une méthode de mise en attente, en augmentation de 80% à 90% du prix actuel, en utilisant un test d'environnement de disque analogue, en cliquant sur un bouton interactif pour déclencher le test de mise en attente:
En cliquant sur le bouton "placeMultipleOrders", vous obtenez:
Le journal des stratégies affiche les opérations suivantes:
Cette demande a été faite par un utilisateur de FMZ qui souhaitait avoir un exemple simple pour démontrer comment l'utiliser dans des fils en parallèle.WebSocketNous avons créé un système de connexion, et nous avons conçu un système de connexion pour transmettre les données vers les fils principaux.main()
Les fonctions.
Il est très simple de créer des fils simultanés comme dans l'exemple précédent.__threadPeekMessage()
Les fonctions et__threadPostMessage()
Fonction. À titre d'exemple de l'appel à l'interface API WebSocket de l'échange Bitcoin, nous devons également faire attention à la conception de l'opération de fermeture des connexions WebSocket.
L'exemple de code complet est ici:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
Les tests sont effectués sur disque:
Vous pouvez voirmain()
La fonction reçoit en permanence les données de marché provenant des connexions WebSocket créées par des fils simultanés.
Lorsque la politique est arrêtée, la fonction de balayage commence à fonctionner:
Les Spartans jouent à la quantificationUn dernier exemple, si vous avez beaucoup de fils ws et que vous avez plusieurs sujets souscrits, quelle est la meilleure performance pour la communication entre les fils, la méthode get/set ou la méthode peek/post?
Les Spartans jouent à la quantificationL'implémentation sous-jacente des variables partagées entre les threads est de ne pas prendre en charge les variables de référence et de devoir les réinitialiser à chaque mise à jour, ce qui est très inefficace.
L'inventeur de la quantification - un petit rêveLes deux façons ne sont pas différentes, mais elles sont toutes les deux possibles.