O recurso está a ser carregado... Carregamento...

Um modelo de estratégia para usar o WebSocket sem problemas

Autora:Ervas daninhas, Criado: 2024-10-30 09:49:20, Atualizado: 2024-11-05 17:45:31

img

Este é um modelo de mercado WebSocket desenvolvido oficialmente pela FMZ, que pode ser copiado para ser usado na nova política:https://www.fmz.com/strategy/470349

Por que precisamos de um WebSocket?

A estratégia atual do FMZ é basicamente o envelopamento tradicional da API REST, que estabelece uma conexão de rede para cada acesso à API e obtém dados de mercado por meio de consultas. Este método é simples e fácil de usar e é suficiente para a maioria das necessidades.

No entanto, o protocolo REST tem problemas de atraso inerentes, que são amplificados quando são necessários vários pares de negociações, políticas de várias bolsas. Embora a função Go da plataforma possa ser executada em simultâneo, o problema de atraso ainda existe, sendo difícil atender às necessidades de negociações estratégicas de uma freqüência relativamente alta, e com muitas transações, a freqüência de consultas é muito rápida e o acesso à plataforma é limitado.

Os servidores dos exchanges atuais também são pesados, oferecendo protocolos WebSocket perfeitos e recomendando o uso de API. Com relação ao protocolo REST, o WebSocket oferece um modo de conexão bidirecional persistente que permite que os exchanges impulsionem dados para os clientes em tempo real, evitando pedidos e respostas frequentes, reduzindo drasticamente o atraso. Geralmente, se o atraso de acesso ao REST API for de cerca de 20 ms, o WebSocket provavelmente atrasará os dados de impulso em cerca de 2 ms.

Introdução ao modelo de mercado do WebSocket

A plataforma de negociação quantitativa FMZ tem suporte ao protocolo WebSocket desde o início e é relativamente fácil de usar, mas para os usuários novatos, é muito complexo lidar com vários assinaturas, assinaturas de vários mercados de negociação e uma integração eficiente e fácil em todo o processo de estratégia. Este modelo aberto de aceleração de dados de mercado em tempo real do WebSocket resolve esse problema, sendo muito fácil de usar e totalmente compatível com as chamadas de API atualmente embaladas. Para a maioria das políticas de REST originais, é simples mudar diretamente para acelerar suas políticas.

As principais características:

  • Apoio de várias bolsasA política é que os usuários possam copiar o padrão para suportar mais exchanges.
  • Subscrições personalizáveisPermite subscrever canais de mercado específicos (como profundidade, negociação, etc.) e processar de forma eficiente os dados recebidos para uso imediato de estratégias de negociação.
  • Erros de alto nívelO mecanismo de rastreamento de erros e de religação do WebSocket é construído para garantir a confiabilidade e a continuidade do fluxo de dados.

Introdução ao processo de implementação

Observe que esta estratégia usa o TypeScript, o que pode parecer um pouco estranho se você é apenas familiar com o JavaScript. O TypeScript introduz um sistema de tipos e recursos linguísticos mais ricos baseados no JavaScript, para aplicações que precisam processar lógica complexa, como transações de quantificação. O uso do TypeScript pode reduzir os erros potenciais e melhorar a legibilidade e a manutenção do código.

Além disso, a política usa o mecanismo de sincronização da plataforma FMZ, que permite que os sub-trechos enviem mensagens para o principal através da função __threadPostMessage. Este método é sincronizado e é usado para notificar o thread principal de atualizações de dados gerados em sub-trechos. Os dados podem ser compartilhados entre o thread principal e o sub-trecho através das funções __threadGetData e __threadSetData. Este método permite que o thread acesse e modifique o estado do compartilhamento.

O principal princípio desta estratégia é conectar as principais exchanges de moeda digital através do WebSocket e receber dados de mercado em tempo real (como informações de profundidade e informações de transações) para fornecer suporte de dados para decisões de transações quantitativas.

1. Configuração de conexão do WebSocket

setupWebsocketA função é usada para inicializar a conexão do WebSocket, recebendo dados de mercado.main_exchangesO site do Facebook, que é o site oficial do Facebook, também está disponível no site do Facebook.

  • MyDialFunções: Criação de conexões WebSocket, gravação do tempo de conexão e saída do tempo de desligamento quando a conexão é desligada.
  • updateSymbolsFunçõesA lista de transações atuais é atualizada conforme necessário.

2. Processamento de dados

supportsOs objetos definem as trocas suportadas e suas funções de processamento (comoBinanceA função de processamento de cada bolsa é responsável por analisar as mensagens recebidas e extrair os dados relevantes.

  • processMsgFunçõesOs dados são processados de acordo com as seguintes regras: processar as mensagens provenientes das bolsas, identificar diferentes tipos de dados (como atualizações profundas, transações, etc.) e formatá-los em objetos de eventos uniformes.

3. Dados de subscrição

Em cada conexão, o sistema passa o canal de dados de mercado para a assinatura de acordo com a transação atual.

  • getFunctionFunçõesA função de processamento correspondente é obtida com base no nome da bolsa.
  • this.wssPublicFunçõesInitialize a conexão do WebSocket e inicie a recepção de dados.

4. Gestão de threads

Inicie um thread para cada bolsa, receba dados em tempo real e processe os dados através de funções de retorno.

  • threadMarketFunçõesA função é a seguinte: receber dados em sub-trechos, analisá-los e armazená-los com a última informação de profundidade e transação.

5. Reescrever o método de obtenção de dados

O método de reescrever a profundidade e a informação de negociação para cada bolsa, com prioridade de retornar dados atualizados em tempo real.

Como usar o modelo

  1. InicializaçãoUtilização:$.setupWebsocket()Iniciar a conexão do WebSocket com o exchange alvo.
  2. SubscreverO sistema subscreve automaticamente os canais relacionados com a variedade que você está negociando (por exemplo, profundidade, negociação, etc.).
  3. Obtenção de dadosPor telefone:GetDepth()eGetTrades()Função que utiliza automaticamente dados WebSocket em tempo real para retornar profundidade de mercado e registros de transações.
  4. Mal-tratamentoA política inclui um mecanismo de rastreamento para registrar erros de conexão e dados e tentar automaticamente se conectar novamente quando a conexão é interrompida.

Se a função EventLoop ((() for adicionada à política, ela será transformada em um mecanismo de gatilho, que será automaticamente obtido imediatamente quando houver um atualização de dados do wss, em vez de esperar sem dados atualizados. O equivalente à função Sleep inteligente, é claro, também pode ser usado diretamente no Sleep.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

Veja meu último guia de estratégias de negociação em várias moedas.https://www.fmz.com/digest-topic/10506A partir de agora, o site está disponível para download gratuito, e você pode fazer uma modificação muito fácil para suportar o WebSocket aqui:

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

Como adicionar novas casas de câmbio

A partir daí, você pode usar o formato abaixo para copiar o modelo da política e consultar o documento da API da bolsa:

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

Mais.