基于WebSocket的实时行情数据加速模板类库
该策略通过多线程WebSocket连接优化了多个交易所的实时数据处理。在FMZ量化交易平台中使用WebSocket连接获取深度订单簿和交易数据,显著降低了获取市场数据的延迟,并提高了高频交易系统的性能。
主要特点: - 多交易所支持:该策略支持币安、OKX、Bybit、Bitget等多个交易所的WebSocket连接,提供比传统REST API轮询更快、更稳定的数据流。 - 可定制订阅:允许订阅特定市场频道(如深度、交易等),并高效处理接收的数据,供交易策略即时使用。 - 高级错误处理:内置错误跟踪和WebSocket重连机制,确保数据流的可靠性和持续性。 - CRC32校验:对于OKX等交易所,集成了CRC32校验功能,确保接收到的订单簿数据的完整性。
此基于WebSocket的解决方案取代了传统的API轮询,适用于需要最小化延迟、最大化市场响应速度的交易者。
$.setupWebsocket()
初始化目标交易所的WebSocket连接。GetDepth()
和 GetTrades()
函数,自动使用WebSocket实时数据进行市场深度和交易记录的返回。此脚本旨在FMZ量化平台上运行,为多个交易所提供快速、可靠且可扩展的市场数据访问。
function main() {
$.setupWebsocket()
while (true) {
exchanges.map(e=>{
Log(e.GetName(), e.GetDepth())
Log(e.GetName(), e.GetTrades())
// support custom and auto subsribe Eg: e.GetDepth('ETH_USDT')
})
EventLoop(100) // trigger by websocket or use Sleep control delay
}
}
// @ts-check $.setupWebsocket = function(main_exchanges) { let crc32 = function (r) { for (var a, o = [], c = 0; c < 256; c++) { a = c; for (var f = 0; f < 8; f++) a = 1 & a ? 3988292384 ^ a >>> 1 : a >>> 1; o[c] = a } for (var n = -1, t = 0; t < r.length; t++) n = n >>> 8 ^ o[255 & (n ^ r.charCodeAt(t))]; return (-1 ^ n) >>> 0 } let trace = function (...args: any[]) { args.unshift('#' + __threadId()) Log.apply(this, args) } let MyDial = function(address) : IDial { let ws = Dial(address) if (ws) { let createTime = new Date().getTime() let oldClose = ws.close ws.close = function() { oldClose.apply(ws) trace(address, "closed after", (new Date().getTime() - createTime)/1e6, "seconds") } } trace("connect", address, ws ? "success" : "failed") return ws } interface ICtx { name: string symbols: string[] callback?: Function } let lastUpdateSymbolsTime = new Date().getTime() let updateSymbols = function(ctx: ICtx, callBack: Function) { let ts = new Date().getTime() if (ts - lastUpdateSymbolsTime < 5000) { return } lastUpdateSymbolsTime = ts let e = __threadPeekMessage(-1) if (e) { trace("subscribe", e, "#ff0000") callBack(e.symbol, e.method) if (e.method == "subscribe") { ctx.symbols.push(e.symbol) } else { ctx.symbols = ctx.symbols.filter(symbol=>symbol!=e.symbol) } } } let supports = {} supports["Binance"] = function (ctx:ICtx) { let processMsg = function (obj) { let event = { ts: obj.E, instId: obj.s, depth: null, trades: [], } if (obj.e == "depthUpdate") { let depth = { asks: [], bids: [] } obj.b.forEach(function (item) { depth.bids.push({ price: Number(item[0]), qty: Number(item[1]) }) }) obj.a.forEach(function (item) { depth.asks.push({ price: Number(item[0]), qty: Number(item[1]) }) }) event.depth = depth } else if (obj.e == 'bookTicker') { event.depth = { asks: [{ price: Number(obj.a), qty: Number(obj.A) }], bids: [{ price: Number(obj.b), qty: Number(obj.B) }] } } else if (obj.e == 'aggTrade') { event.ts = obj.E event.trades = [{ price: Number(obj.p), qty: Number(obj.q), ts: obj.T, side: obj.m ? "sell" : "buy" }] } else if (typeof (obj.asks) !== 'undefined') { event.ts = obj.E || new Date().getTime() let depth = { asks: [], bids: [] } obj.bids.forEach(function (item) { depth.bids.push({ price: Number(item[0]), qty: Number(item[1]) }) }) obj.asks.forEach(function (item) { depth.asks.push({ price: Number(item[0]), qty: Number(item[1]) }) }) event.depth = depth } else { return } return event } let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"] let ws = null let endPoint = "wss://stream.binance.com/stream" if (ctx.name == "Futures_Binance") { endPoint = "wss://fstream.binance.com/stream" } while (true) { if (!ws) { let subscribes = [] ctx.symbols.forEach(function (symbol) { channels.forEach(function (channel) { subscribes.push(symbol.toLowerCase() + "@" + channel) }) }) ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : "")) } if (!ws) { Sleep(1000) continue } updateSymbols(ctx, function(symbol:string, method:string) { ws.write(JSON.stringify({ "method": method.toUpperCase(), "params": channels.map(c=>symbol.toLowerCase()+'@'+c), "id": 2 })) }) let ts = new Date().getTime() let msg = ws.read(1000) if (!msg) { if (msg == "") { trace("websocket is closed") ws.close() ws = null } continue } if (msg == 'ping') { ws.write('pong') } else if (msg == 'pong') { } else { let obj = JSON.parse(msg) if (obj.error) { trace(obj.error.msg, "#ff0000") continue } if (!obj.stream) { continue } if (obj.stream.indexOf("depth") != -1) { if (typeof(obj.data.s) !== 'string') { // patch obj.data.s = obj.stream.split('@')[0].toUpperCase() } } let event = processMsg(obj.data) if (event) { ctx.callback(event) } } } } supports["OK"] = function (ctx:ICtx) { let depthDicAll = {} let processMsg = function (msg) { let obj = JSON.parse(msg) if (obj.error) { trace(obj.error.msg, "#ff0000") return } if (obj.event == 'subscribe') { // ignore } else if (obj.event == 'error') { trace(obj.msg, "#ff0000") return } else if (obj.event == 'login') { } else { if (!obj.data) { trace(msg) return } let instId = obj.arg.instId let event = { ts: new Date().getTime(), instId: instId, depth: null, trades: [], } obj.data.forEach(function (item) { if (obj.arg.channel == 'trades') { event.ts = Number(item.ts) event.trades.push({ ts: Number(item.ts), side: item.side, price: Number(item.px), qty: Number(item.sz) }) } else if (obj.arg.channel == 'books5' || obj.arg.channel == 'books' || obj.arg.channel == 'books50-l2-tbt') { let depth = { asks: [], bids: [] } if (obj.arg.channel == 'books5') { item.asks.forEach(function (pair) { depth.asks.push({ price: Number(pair[0]), qty: Number(pair[1]) }) }) item.bids.forEach(function (pair) { depth.bids.push({ price: Number(pair[0]), qty: Number(pair[1]) }) }) } else { let depthDic = depthDicAll[instId] if (typeof (depthDic) === 'undefined') { depthDic = { count: 0, dic: { asks: {}, bids: {} } } depthDicAll[instId] = depthDic } depthDic.count += 1 for (let k in depthDic.dic) { if (obj.action == 'snapshot') { depthDic.dic[k] = {} } let mp = depthDic.dic[k] item[k].forEach(function (book) { if (book[1] == '0') { delete mp[book[0]] } else { mp[book[0]] = [book[1], book[3], item['ts']] } }) } for (let k in depth) { let n = k == 'asks' ? 1 : -1 let mp = depthDic.dic[k] Object.keys(depthDic.dic[k]).sort(function (a, b) { return n * (Number(a) - Number(b)) }).forEach(function (x) { // keep string for depth[k].push({ price: x, qty: mp[x][0] }) }) } if (depthDic.count % 5000 == 0) { let s = [] for (let i = 0; i < 25; i++) { ['bids', 'asks'].forEach(function (k) { if (i < depth[k].length) { s.push(depth[k][i].price + ':' + depth[k][i].qty) } }) } if (crc32(s.join(":")) != Uint32Array.from(Int32Array.of(item.checksum))[0]) { throw "depth checksum error" } } // convert to number for (let dir in depth) { let books = depth[dir] for (let i = 0; i < books.length; i++) { books[i].price = Number(books[i].price) books[i].qty = Number(books[i].qty) } } } event.depth = depth event.ts = Number(item.ts) } }) return event } } let channels = ['books5', 'trades'] let ws = null let lastPing = new Date().getTime() while (true) { if (!ws) { depthDicAll = {} // reset ws = MyDial("wss://ws.okx.com:8443/ws/v5/public") if (ws) { let subscribes = [] ctx.symbols.forEach(function (symbol) { channels.forEach(function (channel) { subscribes.push({ channel: channel, instId: symbol // "BTC-USDT" }) }) }) if (subscribes.length > 0) { ws.write(JSON.stringify({ "op": "subscribe", "args": subscribes })) } } } if (!ws) { Sleep(1000) continue } updateSymbols(ctx, function(symbol:string, method:string) { ws.write(JSON.stringify({ "op": method, "args": channels.map(c=>({channel: c, instId: symbol})) })) }) // every 10 seconds let ts = new Date().getTime() if (ts - lastPing > 10000) { ws.write("ping") lastPing = ts } let msg = ws.read(1000) if (!msg) { if (msg == "") { trace("websocket is closed") ws.close() ws = null } continue } if (msg != 'pong') { let event = processMsg(msg) if (event && (event.depth || (event.trades && event.trades.length > 0))) { ctx.callback(event) } } } } supports["Bybit"] = function (ctx:ICtx) { let depthMp = {} let processMsg = function (obj:any) { let event = { ts: obj.ts, instId: obj.data.s, depth: null, trades: [], } if (obj.topic.startsWith("orderbook")) { let depthDic = depthMp[event.instId] if (typeof(depthDic) === 'undefined') { depthDic = {} depthMp[event.instId] = depthDic } let data = {asks: obj.data.a, bids: obj.data.b} let depth = {asks: [], bids: []} if (obj.type == "snapshot") { depthDic.asks = {} depthDic.bids = {} for (let k in depth) { if (!data[k]) { continue } depth[k] = data[k].map(function(item) { depthDic[k][item[0]] = item[1] return { price: Number(item[0]), qty: Number(item[1]), } }) } } else if (obj.type == "delta") { depthDic.count++ for (let k in depth) { if (!data[k]) { continue } data[k].forEach(function(item) { if (item[1] == '0') { delete depthDic[k][item[0]] } else { depthDic[k][item[0]] = item[1] } }) } // build depth for (let k in depth) { let n = k == 'asks' ? 1 : -1 let mp = depthDic[k] Object.keys(depthDic[k]).sort(function(a, b) { return n * (Number(a) - Number(b)) }).forEach(function(x) { depth[k].push({ price: x, qty: mp[x] }) }) } // convert to number for (let dir in depth) { depth[dir].forEach(function(item) { item.price = Number(item.price) item.qty = Number(item.qty) }) } } depth.asks = depth.asks.slice(0, 50) depth.bids = depth.bids.slice(0, 50) event.depth = depth } else if (obj.topic.startsWith("publicTrade")) { event.trades = obj.data.map(function(item) { if (!event.instId) { event.instId = item.s } return { //id: item.i, ts: item.T, price: Number(item.p), qty: Number(item.v), side: item.S.toLowerCase() } }) } return event } let channels = ["orderbook.50", "publicTrade"] let ws = null let endPoint = "wss://stream.bybit.com/v5/public/spot" let isInverse = false ctx.symbols.forEach(s=>{ let prefix = s.split('.')[0] if (!prefix.endsWith('USDT') && !prefix.endsWith('USDC')) { isInverse = true; } else { if (isInverse) { throw "symbols type not support concurrent" } } }) if (ctx.name == "Futures_Bybit") { endPoint = "wss://stream.bybit.com/v5/public/linear" if (isInverse) { endPoint = "wss://stream.bybit.com/v5/public/inverse" } } let lastPing = new Date().getTime() while (true) { if (!ws) { ws = MyDial(endPoint) if (ws) { let subscribes = [] ctx.symbols.forEach(function (symbol) { channels.forEach(function (channel) { subscribes.push(channel+ "." + symbol) }) }) if (subscribes.length > 0) { ws.write(JSON.stringify({ op:"subscribe", args: subscribes })) } } } if (!ws) { Sleep(1000) continue } // every 10 seconds let ts = new Date().getTime() if (ts - lastPing > 10000) { ws.write(JSON.stringify({ op: "ping" })) lastPing = ts } updateSymbols(ctx, function(symbol:string, method:string) { ws.write(JSON.stringify({ "op": method, "args": channels.map(c=>c+"."+symbol) })) }) let msg = ws.read(1000) if (!msg) { if (msg == "") { trace("websocket is closed") ws.close() ws = null depthMp = {} } continue } else if (msg) { let obj = JSON.parse(msg) if (obj.op == "ping" && obj.ret_msg == "pong") { // ignore } else if (obj.op == 'pong') { } else if (obj.op == 'subscribe' || obj.op == 'unsubscribe' || !obj.topic) { trace(obj) } else { try { let event = processMsg(obj) if (event && (event.depth || event.trades.length > 0)) { ctx.callback(event) } } catch(e) { trace("Error:", msg) ws.close() } } } } } supports["Bitget"] = function (ctx:ICtx) { let processMsg = function (obj) { let event = { ts: Number(obj.ts), instId: obj.arg.instId, depth: null, trades: [], } if (obj.arg.channel == 'trade') { event.trades = obj.data.map(function(item) { return { //id: Number(item.tradeId), ts: Number(item.ts), price: Number(item.price), qty: Number(item.size), side: item.side.toLowerCase() } }) } else if (obj.arg.channel == 'books15') { let depth = {asks: [], bids: []} for (let k in depth) { if (!obj.data[0][k]) { continue } depth[k] = obj.data[0][k].map(function(item) { return { price: Number(item[0]), qty: Number(item[1]) } }) } event.depth = depth } return event } let geInstType = function(s) { if (ctx.name.indexOf("Futures_") == 0) { let supports = { "USDT": "USDT-FUTURES", "USD": "COIN-FUTURES", "USDC": "USDC-FUTURES" //"SUMCBL": "SUSDT-FUTURES", //"SDMCBL": "SCOIN-FUTURES", //"SCMCBL": "SUSDC-FUTURES" } let quotes = Object.keys(supports) for (let i = 0; i < quotes.length; i++) { if (s.endsWith(quotes[i])) { return supports[quotes[i]] } } } return "SPOT"; } let channels = ['books15', 'trade'] let ws = null let lastPing = new Date().getTime() while (true) { if (!ws) { ws = MyDial("wss://ws.bitget.com/v2/ws/public") if (ws) { let subscribes = [] ctx.symbols.forEach(function (symbol) { channels.forEach(function (channel) { subscribes.push({instType: geInstType(symbol), channel: channel, instId: symbol}) }) }) if (subscribes.length > 0) { ws.write(JSON.stringify({ op: "subscribe", args: subscribes })) } } } if (!ws) { Sleep(1000) continue } updateSymbols(ctx, function(symbol:string, method:string) { ws.write(JSON.stringify({ "op": method, "args": channels.map(c=>({instType: geInstType(symbol), channel: c, instId: symbol})) })) }) // every 10 seconds let ts = new Date().getTime() if (ts - lastPing > 10000) { ws.write("ping") lastPing = ts } let msg = ws.read(1000) if (!msg) { // is closed if (msg == "") { trace("websocket is closed") ws.close() ws = null } continue } if (msg == "ping" || msg == "pong") { continue } let obj = JSON.parse(msg) if (obj.event == "ping") { ws.write(JSON.stringify({event: "ping"})) } else if (obj.data && obj.arg && obj.arg.channel) { let event = processMsg(obj) if (event && (event.depth || (event.trades && event.trades.length > 0))) { ctx.callback(event) } } else { trace(msg) } } } let getFunction = function (eName:String) { for (let key in supports) { if (eName.toLowerCase().indexOf(key.toLowerCase()) != -1) { return supports[key] } } } this.wssPublic = function (ctx:ICtx) { let func = getFunction(ctx.name) if (!func) { throw "not support " + ctx.name } trace("#"+__threadId(), "init websocket for", ctx.name) return func(ctx) } function threadMarket(eName: string, symbols: string[]) { let trades = [] let tradeId = 0 let currentTid = __threadId() this.wssPublic({ name: eName, symbols: symbols, callback: function (event) { let tick = {event: 'tick', name: eName, method: ''} if (event.depth) { tick.method = event.instId+'_depth' __threadSetData(currentTid, tick.method, event) } else if (event.trades) { tick.method = event.instId+'_trades' event.trades.forEach(item=>{ trades.push({ Id: ++tradeId, Time: item.ts, Price: item.price, Amount: item.qty, Type: item.side == 'buy' ? 0 : 1 }) }) let tradePos = __threadGetData(currentTid, "tradePos") if (typeof(tradePos) === 'number') { while (trades.length > 0 && (trades[0].Id <= tradePos || trades.length > 1000)) { trades.shift() } } __threadSetData(currentTid, tick.method, trades) } }}) } if (__threadId() > 0) { return } // init inside main thread let tidMap = {} if (typeof(main_exchanges) === 'undefined') { main_exchanges = exchanges } main_exchanges.forEach(function(e) { let markets = e.GetMarkets() let eName = e.GetName() if (typeof(getFunction(eName)) !== 'function') { Log(eName, "websocket driver is not implemented yet") return } let tid = tidMap[eName] let subscribe = {} if (typeof(tid) !== 'number') { tid = __Thread([threadMarket, eName, []], [$.setupWebsocket]) tidMap[eName] = tid } let getSymbol = function(symbol) { if (typeof(symbol) === 'undefined') { symbol = e.GetCurrency() if (e.GetName().indexOf("Futures_") != -1) { symbol += '.' + e.GetContractType() } } if (typeof(subscribe[symbol]) === 'undefined') { let info = markets[symbol] if (info) { subscribe[symbol] = true __threadPostMessage(tid, {method: 'subscribe', symbol: info.Symbol}) } } return symbol } let origin_GetDepth = e.GetDepth let origin_GetTrades = e.GetTrades e.GetDepth = function(symbol) : IDepth { symbol = getSymbol(symbol) let info = markets[symbol] if (info) { let obj = __threadGetData(tid, info.Symbol+'_depth') if (obj) { let delay = new Date().getTime() - obj.ts if (delay < 3000) { let ret = {Time: obj.ts, Asks: [], Bids: [], Symbol: symbol, Info: null, Alias: info.Symbol, Ref: 'websocket', Delay: delay}; obj.depth.asks.forEach(item=>{ret.Asks.push({Price: item.price, Amount: item.qty})}) obj.depth.bids.forEach(item=>{ret.Bids.push({Price: item.price, Amount: item.qty})}) return ret } } } return origin_GetDepth(symbol) ; } e.GetTrades = function(symbol) : ITrade[] { symbol = getSymbol(symbol) let info = markets[symbol] if (info) { let obj = __threadGetData(tid, info.Symbol+'_trades') if (obj) { if (obj.length > 0) { __threadSetData(tid, "tradePos", obj[obj.length-1].Id) } return obj } } return origin_GetTrades(symbol) ; } }) } function main() { $.setupWebsocket() if (typeof(threading) === 'undefined') { throw "please update docker to latest version" } while (true) { exchanges.map(e=>{ Log(e.GetName(), e.GetDepth()) Log(e.GetName(), e.GetTrades()) }) EventLoop(100) // trigger by websocket } }