[TOC]
Com o rápido desenvolvimento dos mercados financeiros e a popularidade da negociação quantitativa, mais e mais comerciantes começam a confiar em estratégias automatizadas para negociação. Neste processo, a comunicação e coordenação entre as estratégias são particularmente importantes.
Este artigo irá explorar o protocolo de comunicação de negociação ao vivo de estratégias de negociação na plataforma FMZ, apresentar seu conceito de design, características funcionais e vantagens em aplicações práticas. Através de uma análise detalhada de casos, mostraremos como usar este protocolo para alcançar uma comunicação de estratégia eficiente e estável e melhorar a execução e o desempenho de lucro de estratégias de negociação.
Se você é um entusiasta de negociação quantitativa que está apenas começando com FMZ ou um programador profissional experiente, este artigo irá fornecer-lhe insights valiosos e guias práticos de operação.
Esses cenários de demanda demonstram as várias possibilidades e vantagens do protocolo de comunicação de negociação ao vivo da estratégia de negociação FMZ em aplicações práticas.
Após entender os requisitos de comunicação entre as negociações ao vivo, precisamos considerar como implementar esses requisitos. Não é mais do que a negociação ao vivo A esperando trocar informações com a negociação ao vivo B. Embora os requisitos pareçam simples, existem vários detalhes que precisam ser acordados ao usar um conjunto de protocolos de comunicação.
mqtt / nats / amqp / kafka
A arquitetura de comunicação é:
Servidor (Proxy). Um servidor que execute um protocolo de comunicação é necessário para retransmitir mensagens entre assinantes e editores.
Cliente (assinante, editor). O programa de negociação de estratégia ao vivo no FMZ pode ser entendido como um cliente de um protocolo de comunicação.
Quando se aplicam estes protocolos na plataforma FMZ, pode-se simplesmente entender que os protocolos mqtt / nats / amqp / kafkap são integrados na plataforma FMZ.Dial()
Função, e oDial()
A função é usada para publicar e subscrever mensagens. Estas mensagens publicadas são proxyadas (retransmitidas) para a negociação ao vivo assinada através do servidor de protocolo, portanto, um servidor de protocolo deve ser executado primeiro. Para fins de demonstração, usamos várias implantações de imagem de servidor de protocolo nos exemplos a seguir.
Função de discagem na secção de documentação da API:https://www.fmz.com/syntax-guide#fun_dial
Antes de implantar a imagem do docker, lembre-se de instalar o software docker primeiro.
Em seguida, vamos explorar e praticar as aplicações de protocolo de comunicação suportadas pelo FMZ.
MQTT (Message Queuing Telemetry Transport) é um protocolo de transmissão de mensagens leve que é particularmente adequado para ambientes de rede de baixa largura de banda, alta latência ou não confiável.
Principais características do protocolo MQTT: modo de publicação/assinatura
Publicação: O produtor da mensagem envia a mensagem para o tópico.
Assinatura: um consumidor de mensagens se inscreve em um tópico de interesse, recebendo assim mensagens publicadas nesse tópico.
Corretor: o MQTT utiliza um corretor de mensagens como intermediário para encaminhar mensagens, garantindo a descoplagem entre editores e assinantes.
Porque usamos a imagem docker (eclipse-mosquitto imagem) de software que suporta o protocolo MQTT para implantar o servidor proxy MQTT, temos instalado docker com antecedência e não entrar em detalhes mais tarde.
Antes de executar o comando para implantar a imagem, precisamos escrever um arquivo de configuração do servidor proxymosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
Em seguida, execute o comando de implantação:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Após a execução da imagem do servidor proxy, o seguinte é exibido:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Depois podemos testar a estratégia para a pôr em prática.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
O principal uso da função Dial no código de estratégia é:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
O parâmetro de cadeia da função Dial começa commqtt://
, que é o nome do protocolo, seguido pelo endereço de escuta e porta. O símbolo test_topic
.
A estratégia acima publica e subscreve um tópico ao mesmo tempo.
Também podemos usar dois comércios ao vivo para assinar uns aos outros e publicar informações sobre o tópico.
O protocolo NATS é um protocolo de estilo de publicação/subscrição simples e baseado em texto. O cliente se conecta ao gnatsd (servidor NATS) e comunica com o gnatsd. A comunicação é baseada em soquetes TCP/IP comuns e define um conjunto muito pequeno de operações.
Cada protocolo tem as suas próprias características, podendo consultar os documentos e materiais específicos, que não serão tratados aqui.
Implementar o servidor de protocolo NATS:
docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Este comando do docker irá automaticamente baixar e executar a imagem nats, e a porta 4222 é a porta que o cliente precisa acessar.
Listening for client connections on 0.0.0.0:4222
Server is ready
A imagem do servidor Nats começa a funcionar, escutando na porta 4222.
Precisamos criar duas estratégias (comércio ao vivo), vamos chamá-las de Estratégia A e Estratégia B. Os códigos dessas duas estratégias são basicamente os mesmos.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Estas duas estratégias são quase as mesmas, exceto que elas publicam e se inscrevem uma na outra, e os tópicos inscritos, tópicos publicados e informações publicadas são diferentes.
Tomemos a Estratégia B como exemplo:
Dial()
função para criar um objeto do servidor de conexão do clienteconnPub
para a publicação de mensagens temáticas:Var connPub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotB )
A cadeia de parâmetros da função Dial começa comnats://
Indicando que o protocolo NATS é utilizado para comunicação.admin
é a informação de verificação simplesauth admin
O caráter 127.0.0.1:4222
Finalmente, há o tópico publicar/subscrever:topic=pubRobotB
Observe que o símbolo
Dial()
função para criar um objeto do servidor de conexão do clienteconnSub
para assinatura de mensagens de tópicos:Var connSub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotA )
A única diferença étopic=pubRobotA
, porque precisamos de subscrever o tópicopubRobotA
onde a estratégia A envia informações.
A criação e utilização de objetos de ligação de assinatura e publicação na estratégia A são as mesmas descritas acima.
Desta forma, um exemplo simples de aplicação de protocolo NATS é implementado em que a negociação ao vivo A e a negociação ao vivo B se inscrevem e publicam mensagens para se comunicar entre si.
Na comunicação assíncrona, a mensagem não chega ao receptor imediatamente, mas é armazenada em um contêiner. Quando certas condições são atendidas, a mensagem é enviada ao receptor pelo contêiner. Este contêiner é a fila de mensagens. Para completar essa função, ambas as partes e o contêiner e seus componentes devem cumprir acordos e regras unificados.
Cada protocolo tem as suas próprias características, podendo consultar os documentos e materiais específicos, que não serão tratados aqui.
Implementar o servidor de protocolo amqp:
docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Ao implantar uma imagem docker, ele irá baixar e implantar automaticamente, e quando estiver concluído ele irá exibir:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Após a imagem do servidor ser implantada, escreva um exemplo de teste:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Ao usar a fila do protocolo AMQP, observe que as mensagens publicadas persistirão na fila. Por exemplo, se executarmos o código de exemplo acima, 10 mensagens serão escritas na fila. Então, quando executá-lo pela segunda vez, podemos descobrir que a primeira mensagem escrita será lida novamente ao ler. Como mostrado na figura:
Podemos ver que as duas mensagens de log apontadas pelas setas vermelhas na captura de tela têm tempos inconsistentes. A razão é que a mensagem vermelha é a que foi lida e escrita na fila quando o código de estratégia foi executado pela primeira vez.
Com base nesta característica, alguns requisitos podem ser cumpridos. Por exemplo, após a reinicialização da estratégia, os dados de mercado registados ainda podem ser obtidos a partir da fila para o cálculo de inicialização e outras operações.
O Apache Kafka é um armazenamento de dados distribuído otimizado para ingerir e processar dados de streaming em tempo real. Os dados de streaming são dados que são gerados continuamente por milhares de fontes de dados, muitas vezes enviando registros de dados simultaneamente.
O Kafka oferece três funções principais aos seus utilizadores:
O Kafka é usado principalmente para construir pipelines de dados de streaming em tempo real e aplicativos que se adaptam aos fluxos de dados.
Implementar a imagem do docker do proxy Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Ensaio com o código de ensaio:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Vamos dar uma olhada em como usar o protocolo Kafka para publicar e subscrever mensagens na função Dial.
Dial("kafka://localhost:9092/test_topic")
Como outros protocolos, a primeira parte é o nome do protocolo.localhost:9092
. Em seguida, use o símbolo test_topic
.
Resultados dos testes: