資源の読み込みに... 荷物...

ウェブソケットをシームレスに使える ポリシーテンプレート

作者: リン・ハーン小草, 作成: 2024-10-30 09:49:20, 更新: 2024-11-05 17:45:31

img

これはFMZが公式に開発したWebSocket市場テンプレートで,別のテンプレートとしてコピーして,新しいポリシーでこのテンプレートをクリックすると使用できます:https://www.fmz.com/strategy/470349

WebSocket はなぜ必要なのか

現在のFMZ戦略は,主に伝統的なREST API包装であり,APIアクセスごとにネットワーク接続を確立し,相談方式で市場データを入手する.この方法は簡単で,ほとんどのニーズに十分である.

しかし,REST プロトコルは,多取引対,多取引所戦略が必要であるときに,遅延の問題は増幅される. プラットフォームのGo 関数が同時に実行されるが,遅延の問題は依然として存在し,比較的高い周波数の戦略取引の需要を満たすのが困難で,取引があまりにも多く,ランキング頻度があまりにも速く取引プラットフォームへのアクセス頻度制限に直面する.

現在,取引所のサーバー負荷も重い. WebSocket プロトコルは完ぺきで,API ユーザが使用することを推奨している. REST プロトコルに対して,WebSocket は,交換所のクライアントにデータをリアルタイムにプッシュできるようにする,持続的な双方向接続方法を提供し,頻繁な要求と応答を避け,遅延を大幅に削減している. 一般的に,REST API へのアクセス遅延が20ms ほどである場合,WebSocket のプッシュデータはおそらく2ms ほど遅れている.また,リンク WebSocket プロトコルは,プラットフォームへのアクセス頻度制限を受けていないため,基本的には一度に数十の取引ペアをサブスクリブすることができます.

WebSocket 業界 テンプレートの紹介

FMZの量化取引プラットフォームは,WebSocketプロトコルを早期にサポートし,比較的便利な呼び出しが可能であったが,新規ユーザーにとって,複数のサブスクリプション,複数の取引所へのサブスクリプションを処理し,効率的に戦略プロセス全体に簡単に組み込めるのはあまりにも複雑であった. この公開WebSocketのリアルタイム市場データ加速テンプレートは,この痛みを解決し,非常に使いやすい,現在パッケージ化されたAPIコールと完全に互換性があり,ほとんどの元のRESTポリシーポリシーに,簡単な変更を直接使用して,あなたのポリシーを加速します.

基本的特徴:

  • 複数の取引所による支援この方針は,Binan,OKX,Bybit,Bitgetなどの複数の取引所のWebSocket接続をサポートし,ユーザーがこのテンプレートのパッケージ方法を模倣してより多くの取引所をサポートすることができます.
  • カスタマイズできるサブスクリプション: 特定の市場チャンネル (深度,取引など) に購読し,取引戦略の即時利用のために受信されたデータを効率的に処理することができます.
  • 高級エラー処理: データの流れの信頼性と継続性を保証する,組み込まれたエラー追跡とWebSocket再接続メカニズム.

実現原理の簡潔な説明

この策略がTypeScriptを使用していることに注意してください.この策略は,JavaScriptを熟知している方には少し見知らぬもののように見えるでしょう.TypeScriptはJavaScriptをベースにタイプシステムとより豊かな言語機能を導入し,量化取引などの複雑な論理処理を必要とするアプリケーションでは,TypeScriptを使用することで潜在的なエラーを減らすことができ,コードの読みやすさと保守性を向上させることができます.したがって,簡単な学習が推奨されます.

また,策略はFMZプラットフォームの異動メカニズムを使用し,子メカニズムスレッドは __threadPostMessage 関数で主糸にメッセージを送信する.この方法は異動で,子糸内で生成されたデータの更新通知主糸に適用されます.主糸と子糸の間に __threadGetData と __threadSetData 関数でデータを共有することができます.この方法は,スレッドがアクセスし,共有状態を変更することを許可します.

この戦略の主な原理は,WebSocketを通じて主流のデジタル通貨取引所に接続し,市場データ (深度情報や取引情報など) をリアルタイムに受信し,量的な取引決定にデータサポートを提供することです.具体的には,次のプロセスを実現します:

1. WebSocket 接続設定

setupWebsocketWebSocket接続を初期化し,市場データを受信する関数である.main_exchanges接続が必要な取引所を表示します.

  • MyDial関数: WebSocket 接続を作成し,接続時間を記録し,接続を終了すると終了時間を出力します.
  • updateSymbols関数定期的なチェック:新しいサブスクリプションのリクエストがないか確認し,必要に応じて現在の取引対リストを更新します.

2.データ処理

supportsオブジェクトは,サポートされる取引所とその処理機能を定義します.Binance) ・各取引所の処理機能は,受信したメッセージを解析し,関連するデータを抽出する.

  • processMsg関数取引所からのメッセージを処理し,さまざまな種類のデータ (深層更新,取引など) を識別し,統一されたイベントオブジェクトにフォーマットします.

3. サブスクリプションデータ

接続時に,システムは現在の取引に基づいて,サブスクリプションに関連する市場データチャンネルを表示します.

  • getFunction関数取引所の名前に基づいて対応する処理機能を取得します.
  • this.wssPublic関数ウェブソケット接続を初期化し,データ受信を開始します.

4. ストリーム管理

各取引所に対して1つのスレッドを起動し,リアルタイムでデータを受信し,リコール関数で処理します.

  • threadMarket関数:サブスレッドでデータを受信し,解析し,最新の深さや取引情報を保存する.

5. データ取得方法を書き直す

各取引所に対して,深度と取引情報を得る方法を書き直し,リアルタイムで更新されたデータを優先的に返します.

テンプレートの使い方

  1. 初期化使用:$.setupWebsocket()目標取引所のWebSocket接続を初期化します.
  2. サブスクリプションシステムでは自動的に,あなたが取引している品種 (深度,取引など) に関連するチャンネルに購読します.
  3. データ取得呼び出しによってGetDepth()そしてGetTrades()WebSocketのリアルタイムデータを使用して市場深さと取引記録の返還を自動化する機能.
  4. 誤った処理: ポリシーには,接続とデータエラーを記録し,接続が切断されたときに自動的に再接続を試みるトラッキングメカニズムが含まれます.

策略にEventLoop () を追加すると,wss データが更新されたときに自動的に即座に取得されるトリガーメカニズムに変更されます.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

複数の通貨で取引する戦略のガイドを参照してください.https://www.fmz.com/digest-topic/10506ウェブソケットのサポートを改造するには,

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

新しい取引所を自分で追加する方法

取引所のAPIドキュメントを参照してください. ウェブサイトのページをクリックすると,

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

もっと