وسائل لوڈ ہو رہے ہیں... لوڈنگ...

ویب ساکٹ تیز رفتار ڈرائیور

مصنف:ایجاد کنندہ کوانٹائزیشن، تاریخ: 2024-10-24 18:11:09

ویب ساکٹ تیز رفتار حکمت عملی


ریئل ٹائم مارکیٹ ڈیٹا پروسیسنگ کے لئے ویب ساکٹ ایکسلریشن (FMZ Quant)


یہ حکمت عملی ایف ایم زیڈ کوانٹ ٹریڈنگ پلیٹ فارم میں متعدد تبادلے کے لئے ریئل ٹائم ڈیٹا پروسیسنگ کو تیز کرنے کے لئے ویب ساکٹ کنکشن کے استعمال کو بہتر بناتی ہے۔ گہری آرڈر بکس اور تجارت کے لئے ویب ساکٹ کنکشن کا فائدہ اٹھاتے ہوئے ، یہ کوڈ مارکیٹ کے ڈیٹا کو حاصل کرنے میں تاخیر کو نمایاں طور پر کم کرتا ہے اور کارکردگی کو بہتر بناتا ہے ، خاص طور پر ہائی فریکوئینسی ٹریڈنگ (ایچ ایف ٹی) سسٹم کے لئے۔

اہم خصوصیات:

  • کثیر تبادلے کی حمایت:یہ حکمت عملی بائننس ، او کے ایکس ، بائی بٹ ، بٹ جیٹ ، اور دیگر تبادلے کو ویب ساکٹ کے ذریعے سپورٹ کرتی ہے ، روایتی ریسٹ API پولنگ کے مقابلے میں تیز اور زیادہ قابل اعتماد ڈیٹا فیڈ کو یقینی بناتی ہے۔
  • حسب ضرورت سبسکرپشن:یہ مخصوص مارکیٹ چینلز (گہرائی، تجارت، وغیرہ) کی رکنیت کی اجازت دیتا ہے اور فوری طور پر تجارتی حکمت عملی میں استعمال کرنے کے لئے موصول ہونے والے اعداد و شمار کو مؤثر طریقے سے عمل کرتا ہے.
  • اعلی درجے کی غلطی ہینڈلنگ:بلٹ ان غلطیوں کا سراغ لگانا اور ویب ساکٹ کے دوبارہ رابطے کے طریقہ کار ڈیٹا فیڈ کی وشوسنییتا اور تسلسل کو یقینی بناتے ہیں۔
  • CRC32 چیک سوم کی تصدیق:OKX جیسے تبادلے کے لئے، کوڈ میں وصول شدہ آرڈر بک کے اعداد و شمار کی سالمیت کو یقینی بنانے کے لئے CRC32 چیک سوم کی توثیق شامل ہے۔

یہ ویب ساکٹ پر مبنی حل روایتی اے پی آئی پولنگ کو ریئل ٹائم اپ ڈیٹس کے ساتھ تبدیل کرتا ہے ، جس سے یہ ان تاجروں کے لئے مثالی ہے جنھیں تاخیر کو کم سے کم کرنے اور مارکیٹ کی ردعمل کو زیادہ سے زیادہ کرنے کی ضرورت ہے۔

استعمال کا طریقہ:

  1. ابتدائیہ:استعمال$.setupWebsocket()آپ کے ہدف کے تبادلے کے لئے WebSocket کنکشن شروع کرنے کے لئے.
  2. اشتہار:سسٹم خود بخود آپ کی تجارت کی علامتوں کے لئے متعلقہ چینلز (گہرائی، تجارت، وغیرہ) کو سبسکرائب کرتا ہے.
  3. ڈیٹا بازیافت:مارکیٹ کے اعداد و شمار (گہرائی اور تجارت) پر عملدرآمد کیا جاتا ہے اور کال کرکے واپس کیا جاتا ہےGetDepth()اورGetTrades(). یہ افعال خود کار طریقے سے حقیقی وقت ویب ساکٹ کے اعداد و شمار کا استعمال کرتے ہیں اگر دستیاب ہو.
  4. غلطی ہینڈلنگ:اس حکمت عملی میں کنکشن اور ڈیٹا کی غلطیوں کو لاگ کرنے کے لئے ٹریس میکانزم شامل ہے ، اور جب کنکشن گر جاتے ہیں تو خود بخود دوبارہ رابطہ قائم کرنے کی کوشش کرتا ہے۔

یہ اسکرپٹ ایف ایم زیڈ کوانٹ پلیٹ فارم پر چلانے کے لئے ڈیزائن کیا گیا ہے ، جو متعدد تبادلے میں مارکیٹ کے ڈیٹا تک تیز ، قابل اعتماد اور توسیع پذیر رسائی فراہم کرتا ہے۔

function main() {
    while (true) {
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
            // support custom and auto subsribe Eg: e.GetDepth('ETH_USDT')
        EventLoop(100) // trigger by websocket or use Sleep control delay

// @ts-check

$.setupWebsocket = function(main_exchanges) {
    let crc32 = function (r) {
        for (var a, o = [], c = 0; c < 256; c++) {
            a = c;
            for (var f = 0; f < 8; f++) a = 1 & a ? 3988292384 ^ a >>> 1 : a >>> 1;
            o[c] = a
        for (var n = -1, t = 0; t < r.length; t++) n = n >>> 8 ^ o[255 & (n ^ r.charCodeAt(t))];
        return (-1 ^ n) >>> 0

    let trace = function (...args: any[]) {
        args.unshift('#' + __threadId())
        Log.apply(this, args)
    let MyDial = function(address) : IDial {
        let ws = Dial(address)
        if (ws) {
            let createTime = new Date().getTime()
            let oldClose = ws.close
            ws.close = function() {
                trace(address, "closed after", (new Date().getTime() - createTime)/1e6, "seconds")
        trace("connect", address, ws ? "success" : "failed")
        return ws

    interface ICtx {
        name: string
        symbols: string[]
        callback?: Function
    let lastUpdateSymbolsTime = new Date().getTime()
    let updateSymbols = function(ctx: ICtx, callBack: Function) {
        let ts = new Date().getTime()
        if (ts - lastUpdateSymbolsTime < 5000) {
        lastUpdateSymbolsTime = ts
        let e = __threadPeekMessage(-1)
        if (e) {
            trace("subscribe", e, "#ff0000")
            callBack(e.symbol, e.method)
            if (e.method == "subscribe") {
            } else {
                ctx.symbols = ctx.symbols.filter(symbol=>symbol!=e.symbol)

    let supports = {}
    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                obj.b.forEach(function (item) {
                        price: Number(item[0]),
                        qty: Number(item[1])
                obj.a.forEach(function (item) {
                        price: Number(item[0]),
                        qty: Number(item[1])
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                obj.bids.forEach(function (item) {
                        price: Number(item[0]),
                        qty: Number(item[1])
                obj.asks.forEach(function (item) {
                        price: Number(item[0]),
                        qty: Number(item[1])
                event.depth = depth
            } else {
            return event
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            if (!ws) {
            updateSymbols(ctx, function(symbol:string, method:string) {
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
            let ts = new Date().getTime()
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws = null
            if (msg == 'ping') {
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                if (!obj.stream) {
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                let event = processMsg(obj.data)
                if (event) {

    supports["OK"] = function (ctx:ICtx) {
        let depthDicAll = {}
        let processMsg = function (msg) {
            let obj = JSON.parse(msg)
            if (obj.error) {
                trace(obj.error.msg, "#ff0000")
            if (obj.event == 'subscribe') {
                // ignore
            } else if (obj.event == 'error') {
                trace(obj.msg, "#ff0000")
            } else if (obj.event == 'login') {
            } else {
                if (!obj.data) {
                let instId = obj.arg.instId
                let event = {
                    ts: new Date().getTime(),
                    instId: instId,
                    depth: null,
                    trades: [],
                obj.data.forEach(function (item) {
                    if (obj.arg.channel == 'trades') {
                        event.ts = Number(item.ts)
                            ts: Number(item.ts),
                            side: item.side,
                            price: Number(item.px),
                            qty: Number(item.sz)
                    } else if (obj.arg.channel == 'books5' || obj.arg.channel == 'books' || obj.arg.channel == 'books50-l2-tbt') {
                        let depth = {
                            asks: [],
                            bids: []
                        if (obj.arg.channel == 'books5') {
                            item.asks.forEach(function (pair) {
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                            item.bids.forEach(function (pair) {
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                        } else {
                            let depthDic = depthDicAll[instId]
                            if (typeof (depthDic) === 'undefined') {
                                depthDic = {
                                    count: 0,
                                    dic: {
                                        asks: {},
                                        bids: {}
                                depthDicAll[instId] = depthDic
                            depthDic.count += 1
                            for (let k in depthDic.dic) {
                                if (obj.action == 'snapshot') {
                                    depthDic.dic[k] = {}
                                let mp = depthDic.dic[k]
                                item[k].forEach(function (book) {
                                    if (book[1] == '0') {
                                        delete mp[book[0]]
                                    } else {
                                        mp[book[0]] = [book[1], book[3], item['ts']]
                            for (let k in depth) {
                                let n = k == 'asks' ? 1 : -1
                                let mp = depthDic.dic[k]
                                Object.keys(depthDic.dic[k]).sort(function (a, b) {
                                    return n * (Number(a) - Number(b))
                                }).forEach(function (x) {
                                    // keep string for 
                                        price: x,
                                        qty: mp[x][0]
                            if (depthDic.count % 5000 == 0) {
                                let s = []
                                for (let i = 0; i < 25; i++) {
                                    ['bids', 'asks'].forEach(function (k) {
                                        if (i < depth[k].length) {
                                            s.push(depth[k][i].price + ':' + depth[k][i].qty)
                                if (crc32(s.join(":")) != Uint32Array.from(Int32Array.of(item.checksum))[0]) {
                                    throw "depth checksum error"
                            // convert to number
                            for (let dir in depth) {
                                let books = depth[dir]
                                for (let i = 0; i < books.length; i++) {
                                    books[i].price = Number(books[i].price)
                                    books[i].qty = Number(books[i].qty)
                        event.depth = depth
                        event.ts = Number(item.ts)
                return event
        let channels = ['books5', 'trades']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                depthDicAll = {} // reset
                ws = MyDial("wss://ws.okx.com:8443/ws/v5/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                                channel: channel,
                                instId: symbol // "BTC-USDT"
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ "op": "subscribe", "args": subscribes }))
            if (!ws) {
            updateSymbols(ctx, function(symbol:string, method:string) {
                    "op": method, 
                    "args": channels.map(c=>({channel: c, instId: symbol})) 
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                lastPing = ts
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws = null
            if (msg != 'pong') {
                let event = processMsg(msg)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {

    supports["Bybit"] = function (ctx:ICtx) {
        let depthMp = {}
        let processMsg = function (obj:any) {
            let event = {
                ts: obj.ts,
                instId: obj.data.s,
                depth: null,
                trades: [],

            if (obj.topic.startsWith("orderbook")) {
                let depthDic = depthMp[event.instId]
                if (typeof(depthDic) === 'undefined') {
                    depthDic = {}
                    depthMp[event.instId] = depthDic
                let data = {asks: obj.data.a, bids: obj.data.b}
                let depth = {asks: [], bids: []}
                if (obj.type == "snapshot") {
                    depthDic.asks = {}
                    depthDic.bids = {}
                    for (let k in depth) {
                        if (!data[k]) {
                        depth[k] = data[k].map(function(item) {
                            depthDic[k][item[0]] = item[1]
                            return {
                                price: Number(item[0]),
                                qty: Number(item[1]),
                } else if (obj.type == "delta") {
                    for (let k in depth) {
                        if (!data[k]) {
                        data[k].forEach(function(item) {
                            if (item[1] == '0') {
                                delete depthDic[k][item[0]]
                            } else {
                                depthDic[k][item[0]] = item[1]
                    // build depth
                    for (let k in depth) {
                        let n = k == 'asks' ? 1 : -1
                        let mp = depthDic[k]
                        Object.keys(depthDic[k]).sort(function(a, b) {
                            return n * (Number(a) - Number(b))
                        }).forEach(function(x) {
                                price: x,
                                qty: mp[x]
                    // convert to number
                    for (let dir in depth) {
                        depth[dir].forEach(function(item) {
                            item.price = Number(item.price)
                            item.qty = Number(item.qty)
                depth.asks = depth.asks.slice(0, 50)
                depth.bids = depth.bids.slice(0, 50)
                event.depth = depth
            } else if (obj.topic.startsWith("publicTrade")) {
                event.trades = obj.data.map(function(item) {
                    if (!event.instId) {
                        event.instId = item.s
                    return {
                        //id: item.i,
                        ts: item.T,
                        price: Number(item.p),
                        qty: Number(item.v),
                        side: item.S.toLowerCase()
            return event
        let channels = ["orderbook.50", "publicTrade"]

        let ws = null
        let endPoint = "wss://stream.bybit.com/v5/public/spot"
        let isInverse = false
            let prefix = s.split('.')[0]
            if (!prefix.endsWith('USDT') && !prefix.endsWith('USDC')) {
                isInverse = true;
            } else {
                if (isInverse) {
                    throw "symbols type not support concurrent"
        if (ctx.name == "Futures_Bybit") {
            endPoint = "wss://stream.bybit.com/v5/public/linear"
            if (isInverse) {
                endPoint = "wss://stream.bybit.com/v5/public/inverse"
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial(endPoint)
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push(channel+ "." + symbol)
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ op:"subscribe", args: subscribes }))
            if (!ws) {
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write(JSON.stringify({ op: "ping" }))
                lastPing = ts

            updateSymbols(ctx, function(symbol:string, method:string) {
                    "op": method, 
                    "args": channels.map(c=>c+"."+symbol)
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws = null
                    depthMp = {}
            } else if (msg) {
                let obj = JSON.parse(msg)
                if (obj.op == "ping" && obj.ret_msg == "pong") {
                    // ignore
                } else if (obj.op == 'pong') {
                } else if (obj.op == 'subscribe' || obj.op == 'unsubscribe' || !obj.topic) {
                } else {
                    try {
                        let event = processMsg(obj)
                        if (event && (event.depth || event.trades.length > 0)) {
                    } catch(e) {
                        trace("Error:", msg)

    supports["Bitget"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: Number(obj.ts),
                instId: obj.arg.instId,
                depth: null,
                trades: [],

            if (obj.arg.channel == 'trade') {
                event.trades = obj.data.map(function(item) {
                    return {
                        //id: Number(item.tradeId),
                        ts: Number(item.ts),
                        price: Number(item.price),
                        qty: Number(item.size),
                        side: item.side.toLowerCase()
            } else if (obj.arg.channel == 'books15') {
                let depth = {asks: [], bids: []}
                for (let k in depth) {
                    if (!obj.data[0][k]) {
                    depth[k] = obj.data[0][k].map(function(item) {
                        return { price: Number(item[0]), qty: Number(item[1]) }
                event.depth = depth
            return event
        let geInstType = function(s) {
            if (ctx.name.indexOf("Futures_") == 0) {
                let supports = {
                    "USDT": "USDT-FUTURES",
                    "USD": "COIN-FUTURES",
                    "USDC": "USDC-FUTURES"
                        //"SUMCBL": "SUSDT-FUTURES",
                        //"SDMCBL": "SCOIN-FUTURES",
                        //"SCMCBL": "SUSDC-FUTURES"
                let quotes = Object.keys(supports)
                for (let i = 0; i < quotes.length; i++) {
                    if (s.endsWith(quotes[i])) {
                        return supports[quotes[i]]
            return "SPOT";
        let channels = ['books15', 'trade']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial("wss://ws.bitget.com/v2/ws/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push({instType: geInstType(symbol), channel: channel, instId: symbol})
                    if (subscribes.length > 0) {                 
                        ws.write(JSON.stringify({ op: "subscribe", args: subscribes }))
            if (!ws) {
            updateSymbols(ctx, function(symbol:string, method:string) {
                    "op": method, 
                    "args": channels.map(c=>({instType: geInstType(symbol), channel: c, instId: symbol}))
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                lastPing = ts
            let msg = ws.read(1000)
            if (!msg) {
                // is closed
                if (msg == "") {
                    trace("websocket is closed")
                    ws = null
            if (msg == "ping" || msg == "pong") {
            let obj = JSON.parse(msg)
            if (obj.event == "ping") {
                ws.write(JSON.stringify({event: "ping"}))
            } else if (obj.data && obj.arg && obj.arg.channel) {
                let event = processMsg(obj)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {
            } else {

    let getFunction = function (eName:String) {
        for (let key in supports) {
            if (eName.toLowerCase().indexOf(key.toLowerCase()) != -1) {
                return supports[key]

    this.wssPublic = function (ctx:ICtx) {
        let func = getFunction(ctx.name)
        if (!func) {
            throw "not support " + ctx.name
        trace("#"+__threadId(), "init websocket for", ctx.name)
        return func(ctx)
    function threadMarket(eName: string, symbols: string[]) {
        let trades = []
        let tradeId = 0
        let currentTid = __threadId()
        this.wssPublic({ name: eName, symbols: symbols, callback: function (event) {
            let tick = {event: 'tick', name: eName, method: ''}
            if (event.depth) {
                tick.method = event.instId+'_depth'
                __threadSetData(currentTid, tick.method, event)
            } else if (event.trades) {
                tick.method = event.instId+'_trades'
                        Id: ++tradeId,
                        Time: item.ts, 
                        Price: item.price,
                        Amount: item.qty,
                        Type: item.side == 'buy' ? 0 : 1
                let tradePos = __threadGetData(currentTid, "tradePos")
                if (typeof(tradePos) === 'number') {
                    while (trades.length > 0 && (trades[0].Id <= tradePos || trades.length > 1000)) {
                __threadSetData(currentTid, tick.method, trades)

    if (__threadId() > 0) {
    // init inside main thread
    let tidMap = {}
    if (typeof(main_exchanges) === 'undefined') {
        main_exchanges = exchanges
    main_exchanges.forEach(function(e) {
        let markets = e.GetMarkets()
        let eName = e.GetName()

        if (typeof(getFunction(eName)) !== 'function') {
            Log(eName, "websocket driver is not implemented yet")

        let tid = tidMap[eName]
        let subscribe = {}
        if (typeof(tid) !== 'number') {
            tid = __Thread([threadMarket, eName, []], [$.setupWebsocket])
            tidMap[eName] = tid

        let getSymbol = function(symbol) {
            if (typeof(symbol) === 'undefined') {
                symbol = e.GetCurrency()
                if (e.GetName().indexOf("Futures_") != -1) {
                    symbol += '.' + e.GetContractType()
            if (typeof(subscribe[symbol]) === 'undefined') {
                let info = markets[symbol]
                if (info) {
                    subscribe[symbol] = true
                    __threadPostMessage(tid, {method: 'subscribe', symbol: info.Symbol})
            return symbol                
        let origin_GetDepth = e.GetDepth
        let origin_GetTrades = e.GetTrades
        e.GetDepth = function(symbol) : IDepth {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_depth')
                if (obj) {
                    let delay = new Date().getTime() - obj.ts
                    if (delay < 3000) {
                        let ret = {Time: obj.ts, Asks: [], Bids: [], Symbol: symbol, Info: null, Alias: info.Symbol, Ref: 'websocket', Delay: delay};
                        obj.depth.asks.forEach(item=>{ret.Asks.push({Price: item.price, Amount: item.qty})})
                        obj.depth.bids.forEach(item=>{ret.Bids.push({Price: item.price, Amount: item.qty})})
                        return ret
            return origin_GetDepth(symbol) ;
        e.GetTrades = function(symbol) : ITrade[] {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_trades')
                if (obj) {
                    if (obj.length > 0) {
                        __threadSetData(tid, "tradePos", obj[obj.length-1].Id)
                    return obj
            return origin_GetTrades(symbol) ;

function main() {
    if (typeof(threading) === 'undefined') {
        throw "please update docker to latest version"
    while (true) {
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        EventLoop(100) // trigger by websocket
