The resource loading... loading...

A policy template that allows you to use WebSocket seamlessly

Author: The grass, Created: 2024-10-30 09:49:20, Updated: 2024-11-05 17:45:31

img

This is a WebSocket Marketplace template developed by FMZ, which can be copied and used as a template by ticking this template in the new policy:https://www.fmz.com/strategy/470349

Why do you need a WebSocket?

The current FMZ strategy is mainly traditional REST API wrapping, whereby a network connection is established at each API access step to obtain market data through consultation. This method is simple and easy to use, and for most needs, this is quite sufficient.

However, the REST protocol has an inherent delay problem, which is amplified when multiple trading pairs, multi-exchange strategies are needed. Although the Go function of the platform can be executed in parallel, the delay problem still exists, which is difficult to meet the needs of a relatively high frequency strategy trading, and too many trades, too fast routing frequency and also encounter access frequency limitations of the trading platform.

Currently, the server load of the exchanges is also heavy, and they all provide a complete WebSocket protocol and recommend API users to use. Compared to the REST protocol, WebSocket provides a durable two-way connection that allows the exchange to push data to the client in real time, avoiding frequent requests and responses, greatly reducing the latency. Generally, if the REST API access delay is around 20 ms, WebSocket's push data delay is around 2 ms.

The WebSocket industry template is introduced

FMZ's quantitative trading platform has been supporting the WebSocket protocol since early days and is relatively easy to call, but for newcomers, it is too complex to handle multiple subscriptions, subscribe to multiple exchanges, and efficiently and easily embed into the entire strategy process. This open WebSocket Real-time Market Data Accelerator template solves this problem by being very easy to use, fully compatible with the current wrapped API calls, and for most of the original REST policy, a simple change directly to use to speed up your policy.

The main features:

  • Supported by multiple exchangesThis policy supports WebSocket connections to multiple exchanges such as Binance, OKX, Bybit, Bitget, etc. Users can emulate the wrapping method of this template to support more exchanges themselves.
  • Customized subscriptionsIt allows you to subscribe to specific market channels (e.g. depth, trading, etc.) and efficiently process the data received for immediate use in trading strategies.
  • High-level mismanagement: Built-in error tracking and WebSocket reconnection mechanisms to ensure reliability and continuity of data streams.

A brief introduction to the implementation process

Note that this policy uses TypeScript, which may seem a bit unfamiliar if you're just familiar with JavaScript. TypeScript introduces a type system and richer language features based on JavaScript, for applications that require processing complex logic such as quantitative transactions. Using TypeScript can reduce potential errors and improve code readability and maintainability.

The policy also uses the asynchronous mechanism of the FMZ platform, whereby a subthread can send a message to the main thread via the __threadPostMessage function. This is asynchronous and is used to notify the main thread of data updates generated in the subthread. Data can be shared between the main thread and the subthread via the __threadGetData and __threadSetData functions. This allows the thread to access and modify the shared state.

The main principle of this strategy is to connect mainstream digital currency exchanges via WebSocket to receive real-time market data (such as in-depth information and transaction information) in order to provide data support for quantitative transaction decisions.

1. WebSocket connection settings

setupWebsocketThe function is used to initialize WebSocket connections, receiving market data. It receives a parametermain_exchangesThe exchange is also known as the "Connected Exchange".

  • MyDialFunction: Create WebSocket connections, record connection times, and output shutdown times when connections are closed.
  • updateSymbolsFunction: Check regularly for new subscription requests and update the list of current transaction pairs as needed.

2. Processing of data

supportsObjects define supported exchanges and their processing functions (e.g.BinanceEach exchange's processing function is responsible for analyzing the received messages and extracting relevant data.

  • processMsgFunctionProcessing messages from exchanges, identifying different types of data (e.g. deep updates, transactions, etc.) and formatting them into a unified event object.

3. Subscription data

At each connection, the system tracks subscription-related market data channels based on the current transaction.

  • getFunctionFunction: Obtain the corresponding processing function according to the name of the exchange.
  • this.wssPublicFunctionInitialize the WebSocket connection and start receiving data.

4. Thread management

Start a thread for each exchange, receive data in real-time and process the data through callback functions.

  • threadMarketFunction: Receives data in sub-threads, parses and stores the latest depth and transaction information.

5. Rewrite the data acquisition method

For each exchange, rewrite the method of obtaining depth and transaction information, giving priority to returning real-time updated data.

How to use templates

  1. InitializationUse:$.setupWebsocket()Initialize the WebSocket connection to the target exchange.
  2. Subscribe: The system will automatically subscribe to the relevant channels (such as depth, trading, etc.) for the varieties you trade.
  3. Acquisition of dataBy calling:GetDepth()andGetTrades()Function that automatically returns market depth and transaction records using WebSocket real-time data.
  4. Mismanagement: The policy includes a tracking mechanism to record connection and data errors and to automatically try to reconnect when the connection is interrupted.

If you add the EventLoop () function to the policy, it will be converted into a trigger mechanism, which will automatically be immediately retrieved when there is a wss data update, without waiting for any updated data.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

Refer to my previous guide to multi-currency trading strategieshttps://www.fmz.com/digest-topic/10506Here's a very convenient way to modify it to support WebSocket:

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

How to add new exchanges yourself

If you follow the template in the policy and copy the format below, you can refer to the API documentation:

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

More