Dalam desain kebijakan FMZ awal, operasi asynchronous hanya dapat digunakan jika diperlukanexchange.Go()
Fungsi untuk mengimplementasikan FMZ packing interface, tidak dapat melakukan beberapa operasi kustom (fungsi) secara bersamaan. Meskipun desain ini membuat program kebijakan dapat dijalankan dengan sangat efisien, tetapi untuk teman sekelas yang memiliki pengalaman desain paralel dalam bahasa pemrograman asli, perasaan umum sangat tidak biasa.
Bahkan teman-teman baru yang menggunakan transaksi kuantitatif untuk masuk FMZ tidak mengerti.exchange.Go()
Penggunaan fungsi, penggunaanexchange.Go()
Sepertinya masih ada satu kalimat pelaksanaan dalam kode yang dijalankan secara berurutan. Jadi, dalam artikel ini kita akan mengeksplorasi fitur baru dari FMZ:__Thread()
Penggunaan fungsi deret dan lain-lain tidak seiring dengan desain prosedur strategi.
Jika kita ingin membuat thread utama kebijakan berjalan bersamaan dengan menjalankan sub-thread untuk menjalankan fungsi kustom yang kita tulis, kita dapat menggunakan desain yang mirip dengan kode berikut.GetTickerAsync()
, menulis fungsi tertentu untuk fungsi ini. Fungsi ini melakukan sebuah loop mati, di manawhile
Interface API untuk FMZ yang terus-menerus dipanggil dalam lingkaran:GetTicker()
Untuk mendapatkan data pasar.
Kemudian gunakan lagi.__threadSetData(0, "ticker", t)
Kata ini menulis data ke thread utama, yang disebut nama data.ticker
Nilai data adalaht
Ini adalahGetTicker()
Nilai yang dikembalikan adalah
__threadSetData(0, "ticker", t)
Setelah kita merancang fungsi kustom yang akan dijalankan secara bersamaan, kita bisa menulismain()
Jadi jika kita menggunakan kode dalam fungsi, kita bisa menggunakan kode dalam fungsi.main()
Fungsi dimulai dengan menggunakan:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
Anda membuat sebuah thread paralel, dan thread ini akan mulai dijalankan.GetTickerAsync()
Fungsi. Selanjutnya.main()
Fungsi mulai melakukan sendiri.while
Daur ulang, menerima dalam siklusGetTickerAsync()
Fungsi ini memperbarui data dan kemudian mencetak:
var t = __threadGetData(0, "ticker")
Log(t)
Contoh kode lengkap:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
Percobaan berjalan pada hardisk:
Ini adalah desain aplikasi yang paling sederhana, dan kita akan melihat beberapa desain kebutuhan lainnya.
Fungsi dapat dirancang untuk menciptakan 10 thread secara bersamaan, masing-masing thread melakukan operasi suborder.main()
Jadi kita bisa membuat sebuah fungsi.while
Perintah interaksi siklus, deteksi kebijakan.placeMultipleOrders
Jadi kita akan menggunakan fungsi yang sama ini untuk memanggilnya.testPlaceMultipleOrders()
。
if (cmd == "placeMultipleOrders") {
// ...
}
Menambahkan desain interaksi kebijakan pada halaman edit kebijakan, mengatur tombol, perintah: placeMultipleOrders
Contoh kode lengkap:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}
Permintaan ini dikemukakan oleh pengguna FMZ yang berharap untuk memiliki contoh sederhana yang menunjukkan bagaimana menggunakannya dalam thread paralel.WebSocketIni adalah bagian dari proses yang sangat penting untuk menghubungkan, dan merancang bagaimana data dikirim ke thread utama.main()
Fungsi tersebut.
Pada kenyataannya sangat sederhana, dan hampir sama dengan contoh sebelumnya, membuat thread secara bersamaan; hanya saja komunikasi antar thread digunakan.__threadPeekMessage()
Fungsi dan__threadPostMessage()
Fungsi. Sebagai contoh panggilan ke API WebSocket dari Bitcoin Exchange, dalam desain kita juga perlu memperhatikan operasi penutupan koneksi WebSocket, contoh berikut juga menunjukkan bagaimana memberi pemberitahuan pada thread paralel dan membuatnya berhenti.
Contoh kode lengkap:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
Percobaan berjalan pada hardisk:
Anda bisa lihatmain()
Fungsi ini terus menerima data pasar yang diterima dari koneksi WebSocket yang dibuat pada thread paralel.
Ketika Anda menghentikan kebijakan real disk, fungsi sweep tail akan mulai bekerja:
Spada bermain kuantitatifContoh terakhir, jika ada banyak thread ws, dan ada banyak topik yang terdaftar, maka komunikasi antar thread, mana yang lebih baik menggunakan metode get/set atau metode peek/post?
Spada bermain kuantitatifImplementasi dasar untuk berbagi variabel antar-threaded adalah tidak mendukung variabel referensi, yang harus disetel ulang setiap kali diperbarui, yang sangat tidak efisien.
Penemu Kuantitas - Mimpi KecilDia mengatakan, "Saya tidak tahu apa yang akan terjadi.