资源加载中... loading...

WebSocket 加速驱动

Author: 发明者量化, Date: 2024-10-24 18:11:09
Tags:

基于WebSocket的实时行情数据加速模板类库

描述:

该策略通过多线程WebSocket连接优化了多个交易所的实时数据处理。在FMZ量化交易平台中使用WebSocket连接获取深度订单簿和交易数据,显著降低了获取市场数据的延迟,并提高了高频交易系统的性能。

主要特点: - 多交易所支持:该策略支持币安、OKX、Bybit、Bitget等多个交易所的WebSocket连接,提供比传统REST API轮询更快、更稳定的数据流。 - 可定制订阅:允许订阅特定市场频道(如深度、交易等),并高效处理接收的数据,供交易策略即时使用。 - 高级错误处理:内置错误跟踪和WebSocket重连机制,确保数据流的可靠性和持续性。 - CRC32校验:对于OKX等交易所,集成了CRC32校验功能,确保接收到的订单簿数据的完整性。

此基于WebSocket的解决方案取代了传统的API轮询,适用于需要最小化延迟、最大化市场响应速度的交易者。

使用方法:

  1. 初始化:使用 $.setupWebsocket() 初始化目标交易所的WebSocket连接。
  2. 订阅:系统会自动为你交易的品种订阅相关频道(如深度、交易等)。
  3. 数据获取:通过调用 GetDepth()GetTrades() 函数,自动使用WebSocket实时数据进行市场深度和交易记录的返回。
  4. 错误处理:策略包括一个追踪机制,用于记录连接和数据错误,并在连接中断时自动尝试重新连接。

此脚本旨在FMZ量化平台上运行,为多个交易所提供快速、可靠且可扩展的市场数据访问。

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
            // support custom and auto subsribe Eg: e.GetDepth('ETH_USDT')
        })
        EventLoop(100) // trigger by websocket or use Sleep control delay
    }
}

// @ts-check

$.setupWebsocket = function(main_exchanges) {
    let crc32 = function (r) {
        for (var a, o = [], c = 0; c < 256; c++) {
            a = c;
            for (var f = 0; f < 8; f++) a = 1 & a ? 3988292384 ^ a >>> 1 : a >>> 1;
            o[c] = a
        }
        for (var n = -1, t = 0; t < r.length; t++) n = n >>> 8 ^ o[255 & (n ^ r.charCodeAt(t))];
        return (-1 ^ n) >>> 0
    }

    let trace = function (...args: any[]) {
        args.unshift('#' + __threadId())
        Log.apply(this, args)
    }
    
    let MyDial = function(address) : IDial {
        let ws = Dial(address)
        if (ws) {
            let createTime = new Date().getTime()
            let oldClose = ws.close
            ws.close = function() {
                oldClose.apply(ws)
                trace(address, "closed after", (new Date().getTime() - createTime)/1e6, "seconds")
            }
        }
        trace("connect", address, ws ? "success" : "failed")
        return ws
    }

    interface ICtx {
        name: string
        symbols: string[]
        callback?: Function
    }
    let lastUpdateSymbolsTime = new Date().getTime()
    let updateSymbols = function(ctx: ICtx, callBack: Function) {
        let ts = new Date().getTime()
        if (ts - lastUpdateSymbolsTime < 5000) {
            return
        }
        lastUpdateSymbolsTime = ts
        let e = __threadPeekMessage(-1)
        if (e) {
            trace("subscribe", e, "#ff0000")
            callBack(e.symbol, e.method)
            if (e.method == "subscribe") {
                ctx.symbols.push(e.symbol)
            } else {
                ctx.symbols = ctx.symbols.filter(symbol=>symbol!=e.symbol)
            }
        }
    }

    let supports = {}
    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let ts = new Date().getTime()
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }

    supports["OK"] = function (ctx:ICtx) {
        let depthDicAll = {}
        let processMsg = function (msg) {
            let obj = JSON.parse(msg)
            if (obj.error) {
                trace(obj.error.msg, "#ff0000")
                return
            }
    
            if (obj.event == 'subscribe') {
                // ignore
            } else if (obj.event == 'error') {
                trace(obj.msg, "#ff0000")
                return
            } else if (obj.event == 'login') {
            } else {
                if (!obj.data) {
                    trace(msg)
                    return
                }
                let instId = obj.arg.instId
    
                let event = {
                    ts: new Date().getTime(),
                    instId: instId,
                    depth: null,
                    trades: [],
                }
    
                obj.data.forEach(function (item) {
                    if (obj.arg.channel == 'trades') {
                        event.ts = Number(item.ts)
                        event.trades.push({
                            ts: Number(item.ts),
                            side: item.side,
                            price: Number(item.px),
                            qty: Number(item.sz)
                        })
                    } else if (obj.arg.channel == 'books5' || obj.arg.channel == 'books' || obj.arg.channel == 'books50-l2-tbt') {
                        let depth = {
                            asks: [],
                            bids: []
                        }
                        if (obj.arg.channel == 'books5') {
                            item.asks.forEach(function (pair) {
                                depth.asks.push({
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                                })
                            })
                            item.bids.forEach(function (pair) {
                                depth.bids.push({
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                                })
                            })
                        } else {
                            let depthDic = depthDicAll[instId]
                            if (typeof (depthDic) === 'undefined') {
                                depthDic = {
                                    count: 0,
                                    dic: {
                                        asks: {},
                                        bids: {}
                                    }
                                }
                                depthDicAll[instId] = depthDic
                            }
                            depthDic.count += 1
                            for (let k in depthDic.dic) {
                                if (obj.action == 'snapshot') {
                                    depthDic.dic[k] = {}
                                }
                                let mp = depthDic.dic[k]
                                item[k].forEach(function (book) {
                                    if (book[1] == '0') {
                                        delete mp[book[0]]
                                    } else {
                                        mp[book[0]] = [book[1], book[3], item['ts']]
                                    }
                                })
                            }
    
                            for (let k in depth) {
                                let n = k == 'asks' ? 1 : -1
                                let mp = depthDic.dic[k]
                                Object.keys(depthDic.dic[k]).sort(function (a, b) {
                                    return n * (Number(a) - Number(b))
                                }).forEach(function (x) {
                                    // keep string for 
                                    depth[k].push({
                                        price: x,
                                        qty: mp[x][0]
                                    })
                                })
                            }
    
                            if (depthDic.count % 5000 == 0) {
                                let s = []
                                for (let i = 0; i < 25; i++) {
                                    ['bids', 'asks'].forEach(function (k) {
                                        if (i < depth[k].length) {
                                            s.push(depth[k][i].price + ':' + depth[k][i].qty)
                                        }
                                    })
                                }
                                if (crc32(s.join(":")) != Uint32Array.from(Int32Array.of(item.checksum))[0]) {
                                    throw "depth checksum error"
                                }
                            }
                            // convert to number
                            for (let dir in depth) {
                                let books = depth[dir]
                                for (let i = 0; i < books.length; i++) {
                                    books[i].price = Number(books[i].price)
                                    books[i].qty = Number(books[i].qty)
                                }
                            }
                        }
                        event.depth = depth
                        event.ts = Number(item.ts)
                    }
                })
                return event
            }
        }
        let channels = ['books5', 'trades']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                depthDicAll = {} // reset
                ws = MyDial("wss://ws.okx.com:8443/ws/v5/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push({
                                channel: channel,
                                instId: symbol // "BTC-USDT"
                            })
                        })
                    })
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ "op": "subscribe", "args": subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>({channel: c, instId: symbol})) 
                }))
            })
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write("ping")
                lastPing = ts
            }
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            } 
            if (msg != 'pong') {
                let event = processMsg(msg)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {
                    ctx.callback(event)
                }
            }
        }
    }

    supports["Bybit"] = function (ctx:ICtx) {
        let depthMp = {}
        let processMsg = function (obj:any) {
            let event = {
                ts: obj.ts,
                instId: obj.data.s,
                depth: null,
                trades: [],
            }

            if (obj.topic.startsWith("orderbook")) {
                let depthDic = depthMp[event.instId]
                if (typeof(depthDic) === 'undefined') {
                    depthDic = {}
                    depthMp[event.instId] = depthDic
                }
                let data = {asks: obj.data.a, bids: obj.data.b}
                let depth = {asks: [], bids: []}
                if (obj.type == "snapshot") {
                    depthDic.asks = {}
                    depthDic.bids = {}
                    for (let k in depth) {
                        if (!data[k]) {
                            continue
                        }
                        depth[k] = data[k].map(function(item) {
                            depthDic[k][item[0]] = item[1]
                            return {
                                price: Number(item[0]),
                                qty: Number(item[1]),
                            }
                        })
                    }
                } else if (obj.type == "delta") {
                    depthDic.count++
                    for (let k in depth) {
                        if (!data[k]) {
                            continue
                        }
                        data[k].forEach(function(item) {
                            if (item[1] == '0') {
                                delete depthDic[k][item[0]]
                            } else {
                                depthDic[k][item[0]] = item[1]
                            }
                        })
                    }
                    // build depth
                    for (let k in depth) {
                        let n = k == 'asks' ? 1 : -1
                        let mp = depthDic[k]
                        Object.keys(depthDic[k]).sort(function(a, b) {
                            return n * (Number(a) - Number(b))
                        }).forEach(function(x) {
                            depth[k].push({
                                price: x,
                                qty: mp[x]
                            })
                        })
                    }
                    // convert to number
                    for (let dir in depth) {
                        depth[dir].forEach(function(item) {
                            item.price = Number(item.price)
                            item.qty = Number(item.qty)
                        })
                    }
                }
                depth.asks = depth.asks.slice(0, 50)
                depth.bids = depth.bids.slice(0, 50)
                event.depth = depth
            } else if (obj.topic.startsWith("publicTrade")) {
                event.trades = obj.data.map(function(item) {
                    if (!event.instId) {
                        event.instId = item.s
                    }
                    return {
                        //id: item.i,
                        ts: item.T,
                        price: Number(item.p),
                        qty: Number(item.v),
                        side: item.S.toLowerCase()
                    }
                })
            }
            return event
        }
        let channels = ["orderbook.50", "publicTrade"]

        let ws = null
        let endPoint = "wss://stream.bybit.com/v5/public/spot"
        let isInverse = false
        ctx.symbols.forEach(s=>{
            let prefix = s.split('.')[0]
            if (!prefix.endsWith('USDT') && !prefix.endsWith('USDC')) {
                isInverse = true;
            } else {
                if (isInverse) {
                    throw "symbols type not support concurrent"
                }
            }
        })
        if (ctx.name == "Futures_Bybit") {
            endPoint = "wss://stream.bybit.com/v5/public/linear"
            if (isInverse) {
                endPoint = "wss://stream.bybit.com/v5/public/inverse"
            }
        }
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial(endPoint)
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push(channel+ "." + symbol)
                        })
                    })
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ op:"subscribe", args: subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write(JSON.stringify({ op: "ping" }))
                lastPing = ts
            }

            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>c+"."+symbol)
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                    depthMp = {}
                }
                continue
            } else if (msg) {
                let obj = JSON.parse(msg)
                if (obj.op == "ping" && obj.ret_msg == "pong") {
                    // ignore
                } else if (obj.op == 'pong') {
                } else if (obj.op == 'subscribe' || obj.op == 'unsubscribe' || !obj.topic) {
                    trace(obj)
                } else {
                    try {
                        let event = processMsg(obj)
                        if (event && (event.depth || event.trades.length > 0)) {
                            ctx.callback(event)
                        }
                    } catch(e) {
                        trace("Error:", msg)
                        ws.close()
                    }
                }
            }
        }
    }

    supports["Bitget"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: Number(obj.ts),
                instId: obj.arg.instId,
                depth: null,
                trades: [],
            }

            if (obj.arg.channel == 'trade') {
                event.trades = obj.data.map(function(item) {
                    return {
                        //id: Number(item.tradeId),
                        ts: Number(item.ts),
                        price: Number(item.price),
                        qty: Number(item.size),
                        side: item.side.toLowerCase()
                    }
                })
            } else if (obj.arg.channel == 'books15') {
                let depth = {asks: [], bids: []}
                for (let k in depth) {
                    if (!obj.data[0][k]) {
                        continue
                    }
                    depth[k] = obj.data[0][k].map(function(item) {
                        return { price: Number(item[0]), qty: Number(item[1]) }
                    })
                }
                event.depth = depth
            }
            return event
        }
        let geInstType = function(s) {
            if (ctx.name.indexOf("Futures_") == 0) {
                let supports = {
                    "USDT": "USDT-FUTURES",
                    "USD": "COIN-FUTURES",
                    "USDC": "USDC-FUTURES"
                        //"SUMCBL": "SUSDT-FUTURES",
                        //"SDMCBL": "SCOIN-FUTURES",
                        //"SCMCBL": "SUSDC-FUTURES"
                }
                let quotes = Object.keys(supports)
                for (let i = 0; i < quotes.length; i++) {
                    if (s.endsWith(quotes[i])) {
                        return supports[quotes[i]]
                    }
                }
            }
            return "SPOT";
        }
        
        let channels = ['books15', 'trade']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial("wss://ws.bitget.com/v2/ws/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push({instType: geInstType(symbol), channel: channel, instId: symbol})
                        })
                    })
                    if (subscribes.length > 0) {                 
                        ws.write(JSON.stringify({ op: "subscribe", args: subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>({instType: geInstType(symbol), channel: c, instId: symbol}))
                }))
            })
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write("ping")
                lastPing = ts
            }
            let msg = ws.read(1000)
            if (!msg) {
                // is closed
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == "ping" || msg == "pong") {
                continue
            }
            let obj = JSON.parse(msg)
            if (obj.event == "ping") {
                ws.write(JSON.stringify({event: "ping"}))
            } else if (obj.data && obj.arg && obj.arg.channel) {
                let event = processMsg(obj)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {
                    ctx.callback(event)
                }
            } else {
                trace(msg)
            }
        }
    }

    let getFunction = function (eName:String) {
        for (let key in supports) {
            if (eName.toLowerCase().indexOf(key.toLowerCase()) != -1) {
                return supports[key]
            }
        }
    }

    this.wssPublic = function (ctx:ICtx) {
        let func = getFunction(ctx.name)
        if (!func) {
            throw "not support " + ctx.name
        }
        trace("#"+__threadId(), "init websocket for", ctx.name)
        return func(ctx)
    }
    
    function threadMarket(eName: string, symbols: string[]) {
        let trades = []
        let tradeId = 0
        let currentTid = __threadId()
        this.wssPublic({ name: eName, symbols: symbols, callback: function (event) {
            let tick = {event: 'tick', name: eName, method: ''}
            if (event.depth) {
                tick.method = event.instId+'_depth'
                __threadSetData(currentTid, tick.method, event)
            } else if (event.trades) {
                tick.method = event.instId+'_trades'
                event.trades.forEach(item=>{
                    trades.push({
                        Id: ++tradeId,
                        Time: item.ts, 
                        Price: item.price,
                        Amount: item.qty,
                        Type: item.side == 'buy' ? 0 : 1
                    })
                })
                let tradePos = __threadGetData(currentTid, "tradePos")
                if (typeof(tradePos) === 'number') {
                    while (trades.length > 0 && (trades[0].Id <= tradePos || trades.length > 1000)) {
                        trades.shift()
                    }
                }
                __threadSetData(currentTid, tick.method, trades)
            }
        }})
    }

    if (__threadId() > 0) {
        return
    }
    // init inside main thread
    let tidMap = {}
    if (typeof(main_exchanges) === 'undefined') {
        main_exchanges = exchanges
    }
    main_exchanges.forEach(function(e) {
        let markets = e.GetMarkets()
        let eName = e.GetName()

        if (typeof(getFunction(eName)) !== 'function') {
            Log(eName, "websocket driver is not implemented yet")
            return
        }

        let tid = tidMap[eName]
        let subscribe = {}
        if (typeof(tid) !== 'number') {
            tid = __Thread([threadMarket, eName, []], [$.setupWebsocket])
            tidMap[eName] = tid
        }

        let getSymbol = function(symbol) {
            if (typeof(symbol) === 'undefined') {
                symbol = e.GetCurrency()
                if (e.GetName().indexOf("Futures_") != -1) {
                    symbol += '.' + e.GetContractType()
                }
            }
            if (typeof(subscribe[symbol]) === 'undefined') {
                let info = markets[symbol]
                if (info) {
                    subscribe[symbol] = true
                    __threadPostMessage(tid, {method: 'subscribe', symbol: info.Symbol})
                }
            }
            return symbol                
        }
        let origin_GetDepth = e.GetDepth
        let origin_GetTrades = e.GetTrades
        e.GetDepth = function(symbol) : IDepth {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_depth')
                if (obj) {
                    let delay = new Date().getTime() - obj.ts
                    if (delay < 3000) {
                        let ret = {Time: obj.ts, Asks: [], Bids: [], Symbol: symbol, Info: null, Alias: info.Symbol, Ref: 'websocket', Delay: delay};
                        obj.depth.asks.forEach(item=>{ret.Asks.push({Price: item.price, Amount: item.qty})})
                        obj.depth.bids.forEach(item=>{ret.Bids.push({Price: item.price, Amount: item.qty})})
                        return ret
                    }
                }
            }
            return origin_GetDepth(symbol) ;
        }
        e.GetTrades = function(symbol) : ITrade[] {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_trades')
                if (obj) {
                    if (obj.length > 0) {
                        __threadSetData(tid, "tradePos", obj[obj.length-1].Id)
                    }
                    return obj
                }
            }
            return origin_GetTrades(symbol) ;
        }
    })
}

function main() {
    $.setupWebsocket()
    if (typeof(threading) === 'undefined') {
        throw "please update docker to latest version"
    }
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}


更多内容