리소스 로딩... 로딩...

웹소켓을 원활하게 사용할 수 있는 정책 템플릿

저자:초목, 창작: 2024-10-30 09:49:20, 업데이트: 2024-11-05 17:45:31

img

FMZ가 공식 개발한 웹소켓 시장 템플릿입니다. 다른 템플릿으로 복사하여 새 정책에서 템플릿을 선택하면 사용할 수 있습니다.https://www.fmz.com/strategy/470349

왜 웹소켓이 필요한지

현재 FMZ의 전략은 주로 전통적인 REST API 포장을 통해 API 접속 단계마다 네트워크 연결을 구축하여 컨설팅 방식으로 시장 데이터를 취득하는 것입니다. 이 방법은 간단하고 사용하기 쉽습니다. 대부분의 요구 사항에 대해 그렇게하는 것이 충분합니다.

그러나, REST 프로토콜에는 고유의 지연 문제가 있으며, 여러 거래 쌍, 여러 거래소 전략이 필요할 때 지연 문제는 증폭된다. 플랫폼의 Go 함수를 사용하여 동시에 실행할 수 있지만, 지연 문제는 여전히 존재하며, 상대적으로 높은 빈도의 전략 거래의 요구를 충족시키는 것이 어렵고, 거래가 너무 많고, 라운징 빈도가 너무 빠르면 거래 플랫폼의 액세스 빈도 제한을 겪을 수 있다.

현재 거래소의 서버 부하도 무겁다. 모두 완벽한 웹소켓 프로토콜을 제공하고 API 사용자가 사용하는 것이 좋습니다. REST 프로토콜에 비해 웹소켓은 지속 가능한 양방향 연결 방식을 제공하여 거래소가 실시간으로 데이터를 클라이언트로 푸시 할 수 있으며, 빈번한 요청과 응답을 피하고 지연을 크게 감소시킵니다. 일반적으로 REST API에 액세스 할 수 있는 지연이 20ms 정도라면 웹소켓의 푸시 데이터가 2ms 정도 지연됩니다. 그리고 링크 웹소켓 프로토콜은 플랫폼 액세스 주파수 제한이 없으며, 기본적으로 한 번에 수십 개의 거래 쌍을 구독 할 수 있습니다.

웹소켓 시장 템플릿 소개

FMZ 양적 거래 플랫폼은 일찍부터 웹소켓 프로토콜을 지원하고 비교적 편리하게 호출되었지만, 초보자에게는 여러 가입을 처리하고 여러 거래소에 가입하며 전체 전략 프로세스에 효율적으로 편리하게 통합하는 것이 너무 복잡합니다. 공개된 웹소켓 실시간 시장 데이터 가속화 템플릿은 이 문제를 해결하고, 매우 사용하기 쉽고, 현재 포괄된 API 호출과 완전히 호환되며, 대부분의 원래의 REST 정책 정책에 대해 간단하게 변경하여 직접 사용하여 정책을 가속시킵니다.

주요 특징:

  • 다중 거래소 지원: 이 정책은 Bitcoin, OKX, ByBit, Bitget 등 여러 거래소와 웹소켓 연결을 지원하고 사용자가 이 템플릿의 포장을 따라가 더 많은 거래소를 지원할 수 있습니다.
  • 사용자 지정 할 수 있습니다: 특정 시장 채널에 가입 할 수 있습니다 (예: 깊이, 거래 등) 그리고 거래 전략에 즉각적으로 사용할 수 있도록 수신된 데이터를 효율적으로 처리합니다.
  • 고도의 오류 처리: 데이터 스트림의 안정성과 지속성을 보장하기 위해 내장된 오류 추적 및 웹소켓 재연결 메커니즘.

구현 원리에 대한 간단한 설명

참고로, 이 전략은 타입스크립트를 사용한다. 만약 당신이 단지 자바스크립트에 익숙한 사람이라면 약간 낯설어 보일 것이다. 타입스크립트는 자바스크립트 기반의 타입 시스템과 더 풍부한 언어 특성을 도입하고, 양적 거래와 같은 복잡한 논리를 처리해야 하는 애플리케이션을 위해, 타입스크립트를 사용하는 것은 잠재적인 오류를 줄이고 코드의 가독성 및 유지보수성을 향상시킬 수 있다. 따라서 간단한 학습이 권장된다.

또한 FMZ 플랫폼의 비시시 메커니즘을 사용하며, 메커니즘 하위 스레드는 __threadPostMessage 함수를 통해 메인 스레드에 메시지를 보낼 수 있다. 이 방법은 비시시이며, 하위 스레드에서 생성된 데이터 업데이트 알림을 메인 스레드에 적용된다. 메인 스레드와 하위 스레드 사이에 __threadGetData와 __threadSetData 함수를 통해 데이터를 공유할 수 있다. 이 방법은 스레드에 액세스하고 공유 상태를 수정할 수 있다. 만약 당신이 다음 여러 스레드를 배우고 싶다면, 플랫폼 문서와 결합하여 이 메커니즘은 또한 좋은 학습 예시이다.

이 전략의 주요 원리는 웹소켓을 통해 주요 디지털 통화 거래소를 연결하여 실시간 시장 데이터를 (지속 정보 및 거래 정보와 같은) 수신하여 양적 거래 결정을 위한 데이터 지원을 제공하는 것입니다. 구체적으로 프로세스를 구현하는 방법은 다음과 같습니다.

1. 웹소켓 연결 설정

setupWebsocket함수는 웹소켓 연결을 초기화하여 시장 데이터를 수신하는 데 사용됩니다.main_exchanges이 사이트는 인터넷에 접속할 수 있는 사이트입니다.

  • MyDial함수: 웹소켓 연결을 생성하고 연결 시간을 기록하고 연결을 종료할 때 종료 시간을 출력합니다.
  • updateSymbols함수: 정기적으로 새로운 가입 요청이 있는지 확인하고 필요에 따라 현재 거래 쌍 목록을 업데이트합니다.

2. 데이터 처리

supports객체는 지원되는 거래소와 처리 기능을 정의합니다.Binance각 거래소의 처리 기능은 수신된 메시지를 분석하고 관련 데이터를 추출하는 역할을 합니다.

  • processMsg함수: 거래소로부터의 메시지를 처리하고, 다양한 유형의 데이터를 식별하고 (지속 업데이트, 거래 등) 통일된 이벤트 객체로 포맷합니다.

3. 가입자 데이터

각 연결시, 시스템은 현재 거래에 따라 가입에 관련된 시장 데이터 채널을 사용합니다.

  • getFunction함수: 거래소 이름에 따라 해당 처리 기능을 얻습니다.
  • this.wssPublic함수: 웹소켓 연결을 초기화하고 데이터 수신을 시작합니다.

4. 스레드 관리

각 거래소에 대해 하나의 스레드를 시작하여 실시간으로 데이터를 수신하고 회환 함수를 통해 데이터를 처리합니다.

  • threadMarket함수: 하위 스레드에서 데이터를 수신하고, 최신 깊이와 거래 정보를 분석하고 저장합니다.

5. 데이터 취득 방법을 다시 쓰기

각 거래소에 대한 깊이와 거래 정보를 얻는 방법을 다시 작성하여 실시간 업데이트 데이터를 우선적으로 반환합니다.

템플릿 사용 방법

  1. 초기화사용:$.setupWebsocket()초기화 대상 거래소의 웹소켓 연결.
  2. 가입: 시스템에서는 자동으로 당신이 거래하는 품종에 관련된 채널을 구독합니다 (예: 깊이, 거래 등).
  3. 데이터 획득전화로:GetDepth()그리고GetTrades()함수, 웹소켓 실시간 데이터를 사용하여 시장 깊이 및 거래 기록을 자동으로 반환합니다.
  4. 잘못된 처리: 정책에는 연결과 데이터 오류를 기록하고 연결이 끊어지면 자동으로 다시 연결을 시도하는 추적 메커니즘이 포함되어 있습니다.

만약 정책에 EventLoop () 함수를 추가한다면, 트리거 메커니즘으로 바뀌게 되고,WSS 데이터 업데이트가 있을 때 자동으로 즉시 가져오게 되고, 최신 데이터가 없을 때 기다리게 된다.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

이 글은 제가 전에 다화폐 거래 전략에 대해 쓴 글입니다.https://www.fmz.com/digest-topic/10506웹소켓을 지원하기 위해 웹소켓을 변경하는 방법은 다음과 같습니다.

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

새로운 거래소를 직접 추가하는 방법

이 정책의 템플릿을 따라 아래의 형식을 그대로 따라서 거래소 API 문서를 참조하면 됩니다.

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

더 많은