资源加载中... loading...

一个策略模板让你无缝使用WebSocket行情

Author: 小草, Created: 2024-10-30 09:49:20, Updated: 2024-11-05 17:45:31

一个策略模板让你无缝使用WebSocket行情

这是FMZ官方开发的一个WebSocket行情模板,复制另存为模板,在新策略中勾选这个模板就可以使用:https://www.fmz.com/strategy/470349

为什么需要WebSocket

目前FMZ策略主要是传统的REST API封装,每一步API访问都要建立一个网络连接,通过轮询的方式获取市场数据。这种方法简单易用,对于大部分需求,这样做完全足够。

但是,REST协议有固有的延迟问题,当需要多交易对、多交易所策略时,延时问题会被放大。虽然用平台的Go函数可以并发执行,但延时问题依旧存在,难以满足相对高频一点的策略交易的需求,并且交易对太多,轮询频率过快还会遇到交易平台的访问频率限制。

目前交易所的服务器负担也很重,都提供了完善的WebSocket协议,并且推荐API用户使用。相对REST协议,WebSocket提供了一种持久化的双向连接方式,使得交易所能够实时推送数据到客户端,避免了频繁的请求和响应,极大地降低了延迟。一般来说,如果访问REST API的延时在20ms左右,WebSocket的推送数据大概延时2ms左右。并且链接WebSocket协议不受平台访问频率限制,基本上可以做到一次订阅几十个交易对。

WebSocket行情模板介绍

FMZ量化交易平台很早就支持了WebSocket协议,并且相对方便的调用,但对于新手用户来说,处理多个订阅,订阅多个交易所行情,并且高效方便的嵌入整个策略流程中,还是过于复杂。 这个公开的WebSocket实时行情数据加速模板,就解决了这个痛点,非常易用,完全和当前封装的API调用兼容,对于大部分原来的REST策略策略,简单改动直接使用,加速你的策略。

主要特点:

  • 多交易所支持:该策略支持币安、OKX、Bybit、Bitget等多个交易所的WebSocket连接,用户自己可以仿照这个模板的封装方法,自己支持更多的交易所。
  • 可定制订阅:允许订阅特定市场频道(如深度、交易等),并高效处理接收的数据,供交易策略即时使用。
  • 高级错误处理:内置错误跟踪和WebSocket重连机制,确保数据流的可靠性和持续性。

实现原理简单介绍

注意到这个策略使用了TypeScript,如果你只是熟悉在JavaScript的话,看起来会有点陌生。 TypeScript 在JavaScript 的基础上引入了类型系统和更丰富的语言特性,对于量化交易等需要处理复杂逻辑的应用,使用 TypeScript 可以减少潜在的错误,提高代码的可读性和可维护性。因此推荐可以简单的学习下。

另外策略使用了FMZ平台的异步机制,机制子线程可以通过 __threadPostMessage 函数向主线程发送消息。这种方式是异步的,适用于在子线程中产生的数据更新通知主线程。主线程和子线程之间可以通过 __threadGetData 和 __threadSetData 函数共享数据。这种方式允许线程访问和修改共享的状态。如果你想学习下多线程,结合平台文档,这个策略也是一个很好的学习范例。

这个策略的主要原理是通过WebSocket连接主流数字货币交易所,实时接收市场数据(如深度信息和交易信息),以便为量化交易决策提供数据支持。具体实现流程如下:

1.WebSocket 连接设置

setupWebsocket 函数用于初始化WebSocket连接,接收市场数据。它接收一个参数 main_exchanges,表示需要连接的交易所。

  • MyDial 函数:创建WebSocket连接,记录连接时间,并在关闭连接时输出关闭时间。
  • updateSymbols 函数:定时检查是否有新的订阅请求,并根据需要更新当前的交易对列表。

2. 数据处理

supports 对象定义了支持的交易所及其处理函数(如 Binance)。每个交易所的处理函数负责解析接收到的消息并提取相关数据。

  • processMsg 函数:处理来自交易所的消息,识别不同类型的数据(如深度更新、交易等),并格式化为统一的事件对象。

3. 订阅数据

在每次连接时,系统会根据当前的交易对订阅相关的市场数据通道。

  • getFunction 函数:根据交易所名称获取相应的处理函数。
  • this.wssPublic 函数:初始化WebSocket连接并启动数据接收。

4. 线程管理

为每个交易所启动一个线程,实时接收数据并通过回调函数处理数据。

  • threadMarket 函数:在子线程中接收数据,解析并存储最新的深度和交易信息。

5. 数据获取方法重写

为每个交易所重写获取深度和交易信息的方法,优先返回实时更新的数据。

模板使用方法

  1. 初始化:使用 $.setupWebsocket() 初始化目标交易所的WebSocket连接。
  2. 订阅:系统会自动为你交易的品种订阅相关频道(如深度、交易等)。
  3. 数据获取:通过调用 GetDepth()GetTrades() 函数,自动使用WebSocket实时数据进行市场深度和交易记录的返回。
  4. 错误处理:策略包括一个追踪机制,用于记录连接和数据错误,并在连接中断时自动尝试重新连接。

如果策略中加入EventLoop()函数,会改成触发机制,当有wss数据更新时会自动立即获取,没有最新数据就等待。相当于智能的Sleep函数,当然也可以直接使用Sleep。

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

参考我的上一篇多币种交易策略指南 https://www.fmz.com/digest-topic/10506 ,这里可以非常方便的对其进行改造支持WebSocket:

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

如何自己添加新的交易所

只要按照策略的模板,自己仿照下面的格式,自己参考交易所API文档即可:

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

更多内容