这是FMZ官方开发的一个WebSocket行情模板,复制另存为模板,在新策略中勾选这个模板就可以使用:https://www.fmz.com/strategy/470349
目前FMZ策略主要是传统的REST API封装,每一步API访问都要建立一个网络连接,通过轮询的方式获取市场数据。这种方法简单易用,对于大部分需求,这样做完全足够。
但是,REST协议有固有的延迟问题,当需要多交易对、多交易所策略时,延时问题会被放大。虽然用平台的Go函数可以并发执行,但延时问题依旧存在,难以满足相对高频一点的策略交易的需求,并且交易对太多,轮询频率过快还会遇到交易平台的访问频率限制。
目前交易所的服务器负担也很重,都提供了完善的WebSocket协议,并且推荐API用户使用。相对REST协议,WebSocket提供了一种持久化的双向连接方式,使得交易所能够实时推送数据到客户端,避免了频繁的请求和响应,极大地降低了延迟。一般来说,如果访问REST API的延时在20ms左右,WebSocket的推送数据大概延时2ms左右。并且链接WebSocket协议不受平台访问频率限制,基本上可以做到一次订阅几十个交易对。
FMZ量化交易平台很早就支持了WebSocket协议,并且相对方便的调用,但对于新手用户来说,处理多个订阅,订阅多个交易所行情,并且高效方便的嵌入整个策略流程中,还是过于复杂。 这个公开的WebSocket实时行情数据加速模板,就解决了这个痛点,非常易用,完全和当前封装的API调用兼容,对于大部分原来的REST策略策略,简单改动直接使用,加速你的策略。
主要特点:
注意到这个策略使用了TypeScript,如果你只是熟悉在JavaScript的话,看起来会有点陌生。 TypeScript 在JavaScript 的基础上引入了类型系统和更丰富的语言特性,对于量化交易等需要处理复杂逻辑的应用,使用 TypeScript 可以减少潜在的错误,提高代码的可读性和可维护性。因此推荐可以简单的学习下。
另外策略使用了FMZ平台的异步机制,机制子线程可以通过 __threadPostMessage 函数向主线程发送消息。这种方式是异步的,适用于在子线程中产生的数据更新通知主线程。主线程和子线程之间可以通过 __threadGetData 和 __threadSetData 函数共享数据。这种方式允许线程访问和修改共享的状态。如果你想学习下多线程,结合平台文档,这个策略也是一个很好的学习范例。
这个策略的主要原理是通过WebSocket连接主流数字货币交易所,实时接收市场数据(如深度信息和交易信息),以便为量化交易决策提供数据支持。具体实现流程如下:
1.WebSocket 连接设置
setupWebsocket
函数用于初始化WebSocket连接,接收市场数据。它接收一个参数 main_exchanges
,表示需要连接的交易所。
MyDial
函数:创建WebSocket连接,记录连接时间,并在关闭连接时输出关闭时间。updateSymbols
函数:定时检查是否有新的订阅请求,并根据需要更新当前的交易对列表。2. 数据处理
supports
对象定义了支持的交易所及其处理函数(如 Binance
)。每个交易所的处理函数负责解析接收到的消息并提取相关数据。
processMsg
函数:处理来自交易所的消息,识别不同类型的数据(如深度更新、交易等),并格式化为统一的事件对象。3. 订阅数据
在每次连接时,系统会根据当前的交易对订阅相关的市场数据通道。
getFunction
函数:根据交易所名称获取相应的处理函数。this.wssPublic
函数:初始化WebSocket连接并启动数据接收。4. 线程管理
为每个交易所启动一个线程,实时接收数据并通过回调函数处理数据。
threadMarket
函数:在子线程中接收数据,解析并存储最新的深度和交易信息。5. 数据获取方法重写
为每个交易所重写获取深度和交易信息的方法,优先返回实时更新的数据。
$.setupWebsocket()
初始化目标交易所的WebSocket连接。GetDepth()
和 GetTrades()
函数,自动使用WebSocket实时数据进行市场深度和交易记录的返回。如果策略中加入EventLoop()函数,会改成触发机制,当有wss数据更新时会自动立即获取,没有最新数据就等待。相当于智能的Sleep函数,当然也可以直接使用Sleep。
function main() {
$.setupWebsocket()
while (true) {
exchanges.map(e=>{
Log(e.GetName(), e.GetDepth())
Log(e.GetName(), e.GetTrades())
})
EventLoop(100) // trigger by websocket
}
}
参考我的上一篇多币种交易策略指南 https://www.fmz.com/digest-topic/10506 ,这里可以非常方便的对其进行改造支持WebSocket:
function MakeOrder() {
for (let i in Info.trade_symbols) {
let symbol = Info.trade_symbols[i];
let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
let buy_amount = 50 / buy_price;
if (Info.position[symbol].value < 2000){
Trade(symbol, "buy", buy_price, buy_amount, symbol);
}
}
}
function OnTick() {
try {
UpdatePosition();
MakeOrder();
UpdateStatus();
} catch (error) {
Log("循环出错: " + error);
}
}
function main() {
$.setupWebsocket()
InitInfo();
while (true) {
let loop_start_time = Date.now();
if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
OnTick();
Info.time.last_loop_time = Date.now();
Info.time.loop_delay = Date.now() - loop_start_time;
}
Sleep(5);
}
}
只要按照策略的模板,自己仿照下面的格式,自己参考交易所API文档即可:
supports["Binance"] = function (ctx:ICtx) {
let processMsg = function (obj) {
let event = {
ts: obj.E,
instId: obj.s,
depth: null,
trades: [],
}
if (obj.e == "depthUpdate") {
let depth = {
asks: [],
bids: []
}
obj.b.forEach(function (item) {
depth.bids.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
obj.a.forEach(function (item) {
depth.asks.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
event.depth = depth
} else if (obj.e == 'bookTicker') {
event.depth = {
asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
}
} else if (obj.e == 'aggTrade') {
event.ts = obj.E
event.trades = [{
price: Number(obj.p),
qty: Number(obj.q),
ts: obj.T,
side: obj.m ? "sell" : "buy"
}]
} else if (typeof (obj.asks) !== 'undefined') {
event.ts = obj.E || new Date().getTime()
let depth = {
asks: [],
bids: []
}
obj.bids.forEach(function (item) {
depth.bids.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
obj.asks.forEach(function (item) {
depth.asks.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
event.depth = depth
} else {
return
}
return event
}
let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
let ws = null
let endPoint = "wss://stream.binance.com/stream"
if (ctx.name == "Futures_Binance") {
endPoint = "wss://fstream.binance.com/stream"
}
while (true) {
if (!ws) {
let subscribes = []
ctx.symbols.forEach(function (symbol) {
channels.forEach(function (channel) {
subscribes.push(symbol.toLowerCase() + "@" + channel)
})
})
ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
}
if (!ws) {
Sleep(1000)
continue
}
updateSymbols(ctx, function(symbol:string, method:string) {
ws.write(JSON.stringify({
"method": method.toUpperCase(),
"params": channels.map(c=>symbol.toLowerCase()+'@'+c),
"id": 2
}))
})
let msg = ws.read(1000)
if (!msg) {
if (msg == "") {
trace("websocket is closed")
ws.close()
ws = null
}
continue
}
if (msg == 'ping') {
ws.write('pong')
} else if (msg == 'pong') {
} else {
let obj = JSON.parse(msg)
if (obj.error) {
trace(obj.error.msg, "#ff0000")
continue
}
if (!obj.stream) {
continue
}
if (obj.stream.indexOf("depth") != -1) {
if (typeof(obj.data.s) !== 'string') {
// patch
obj.data.s = obj.stream.split('@')[0].toUpperCase()
}
}
let event = processMsg(obj.data)
if (event) {
ctx.callback(event)
}
}
}
}