[TOC] ¿Qué quieres decir?
Con el rápido desarrollo de los mercados financieros y la popularidad del comercio cuantitativo, cada vez más operadores comienzan a confiar en estrategias automatizadas para el comercio. En este proceso, la comunicación y la coordinación entre las estrategias son particularmente importantes.
Este artículo explorará el protocolo de comunicación comercial en vivo de las estrategias comerciales en la plataforma FMZ, presentará su concepto de diseño, características funcionales y ventajas en aplicaciones prácticas.
Si usted es un entusiasta del comercio cuantitativo que acaba de comenzar con FMZ o un programador profesional experimentado, este artículo le proporcionará información valiosa y guías prácticas de operación. Exploremos las poderosas funciones de la plataforma FMZ y aprendamos cómo lograr la colaboración entre estrategias a través de protocolos de comunicación eficientes, mejorar la eficiencia del comercio y capturar oportunidades de mercado.
Estos escenarios de demanda demuestran las diversas posibilidades y ventajas del protocolo de comunicación comercial en vivo de la estrategia de trading FMZ en aplicaciones prácticas.
Después de comprender los requisitos de comunicación entre las operaciones en vivo, debemos considerar cómo implementar estos requisitos. No es más que el comercio en vivo A que espera intercambiar información con el comercio en vivo B. Aunque los requisitos parecen simples, hay varios detalles que deben acordarse al usar un conjunto de protocolos de comunicación. FMZ ha encapsulado varios protocolos de comunicación populares.
Mqtt / nats / amqp / kafka
La arquitectura de comunicación es:
El servidor (proxy). Se necesita un servidor que ejecute un protocolo de comunicación para retransmitir mensajes entre suscriptores y editores.
Cliente (suscriptor, editor). El programa de trading en vivo de estrategia en FMZ puede entenderse como un cliente de un protocolo de comunicación.
Cuando se aplican estos protocolos en la plataforma FMZ, se puede entender simplemente que los protocolos mqtt / nats / amqp / kafkap están integrados en la plataforma FMZ.Dial()
La función y elDial()
La función se utiliza para publicar y suscribir mensajes. Estos mensajes publicados son proxy (retransmitidos) al comercio en vivo suscrito a través del servidor de protocolo, por lo que primero se debe ejecutar un servidor de protocolo.
Función de marcado en la sección de documentación de la API:https://www.fmz.com/syntax-guide#fun_dial
Antes de implementar la imagen docker, recuerde instalar el software docker primero.
A continuación, exploremos y practicemos las aplicaciones de protocolo de comunicación soportadas por FMZ.
MQTT (Message Queuing Telemetry Transport) es un protocolo de transmisión de mensajes ligero que es particularmente adecuado para entornos de red de bajo ancho de banda, alta latencia o poco confiables. Fue propuesto por Andy Stanford-Clark y Arlen Nipper de IBM en 1999 y más tarde se convirtió en un estándar ISO (ISO / IEC PRF 20922).
Las principales características del protocolo MQTT: modo de publicación/suscripción
Publicación: El productor del mensaje envía el mensaje al tema.
Suscripción: un consumidor de mensajes se suscribe a un tema de interés, recibiendo así mensajes publicados en ese tema.
Intermediario: MQTT utiliza un intermediario de mensajes como intermediario para reenviar mensajes, asegurando el desacoplamiento entre los editores y los suscriptores.
Debido a que utilizamos la imagen de docker (imagen de eclipse-mosquito) de software que admite el protocolo MQTT para implementar el servidor proxy MQTT, hemos instalado docker por adelantado y no entraremos en detalles más adelante.
Antes de ejecutar el comando para desplegar la imagen, necesitamos escribir un servidor proxy archivo de configuraciónmosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
Luego ejecuta el comando de despliegue:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Después de que se ejecute la imagen del servidor proxy, se muestra lo siguiente:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Entonces podemos probar la estrategia para ponerla en práctica.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
El uso principal de la función Dial en el código de estrategia es:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
El parámetro de la cadena de la función Dial comienza conmqtt://
, que es el nombre del protocolo, seguido de la dirección de escucha y el puerto. El símbolo test_topic
.
La estrategia anterior publica y suscribe un tema al mismo tiempo.
También podemos usar dos operaciones en vivo para suscribirnos entre sí y publicar información sobre el tema.
El protocolo NATS es un protocolo de estilo publish/subscribe basado en texto. El cliente se conecta a gnatsd (servidor NATS) y se comunica con gnatsd. La comunicación se basa en sockets TCP/IP ordinarios y define un conjunto muy pequeño de operaciones. Newline indica terminación. A diferencia de los sistemas tradicionales de comunicación de mensajes que utilizan formatos de mensajes binarios, el protocolo NATS basado en texto hace que la implementación del cliente sea muy simple y puede implementarse fácilmente en una variedad de lenguajes de programación o lenguajes de script.
Cada uno de los protocolos tiene sus propias características, por lo que puede consultar los documentos y materiales específicos, que no se detallarán aquí.
Despliegue el servidor de protocolo NATS:
el nombre de los usuarios de Docker Run
nombre nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Este comando docker descargará y ejecutará automáticamente la imagen nats, y el puerto 4222 es el puerto al que el cliente necesita acceder.
Listening for client connections on 0.0.0.0:4222
Server is ready
La imagen del servidor de Nats comienza a correr, escuchando en el puerto 4222.
Necesitamos crear dos estrategias (negociación en vivo), llamémoslas Estrategia A y Estrategia B. Los códigos de estas dos estrategias son básicamente los mismos. Están escritos en JavaScript, que es el lenguaje más fácil de usar en la plataforma FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Estas dos estrategias son casi iguales, excepto que se publican y se suscriben entre sí, y los temas suscritos, los temas publicados y la información publicada son diferentes.
Tomemos la Estrategia B como ejemplo:
Dial()
función para crear un objeto de conexión de servidor clienteconnPub
para la publicación de mensajes sobre temas:Var connPub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotB )
La cadena de parámetros de la función Dial comienza connats://
Indicando que el protocolo NATS se utiliza para la comunicación.admin
es la información de verificación simpleauth admin
El carácter 127.0.0.1:4222
Por último, está el tema de publicar/suscribirse:topic=pubRobotB
Tenga en cuenta que el símbolo
Dial()
función para crear un objeto de conexión de servidor clienteconnSub
para la suscripción de mensajes de tema:Var connSub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotA )
La única diferencia estopic=pubRobotA
, porque necesitamos suscribirnos al temapubRobotA
donde la estrategia A envía información.
La creación y el uso de objetos de conexión de suscripción y publicación en la estrategia A son los mismos que los descritos anteriormente.
De esta manera, se implementa un ejemplo simple de aplicación de protocolo NATS en el que el comercio en vivo A y el comercio en vivo B se suscriben y publican mensajes para comunicarse entre sí.
En la comunicación asíncrona, el mensaje no llega al receptor inmediatamente, sino que se almacena en un contenedor. Cuando se cumplen ciertas condiciones, el mensaje será enviado al receptor por el contenedor. Este contenedor es la cola de mensajes. Para completar esta función, ambas partes y el contenedor y sus componentes deben cumplir con acuerdos y reglas unificados.
Cada uno de los protocolos tiene sus propias características, por lo que puede consultar los documentos y materiales específicos, que no se detallarán aquí.
Despliegue el servidor de protocolo amqp:
el nombre de host de mi conejo
nombre de conejo -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Al desplegar una imagen de docker, se descargará y desplegará automáticamente, y cuando se complete se mostrará:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Después de desplegar la imagen del servidor, escriba un ejemplo de prueba:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Cuando se utiliza la cola de protocolo AMQP, tenga en cuenta que los mensajes publicados persistirán en la cola. Por ejemplo, si ejecutamos el código de ejemplo anterior, se escribirán 10 mensajes en la cola. Luego, cuando lo ejecutamos una segunda vez, podemos encontrar que el primer mensaje escrito se volverá a leer al leer. Como se muestra en la figura:
Podemos ver que los dos mensajes de registro señalados por las flechas rojas en la captura de pantalla tienen tiempos inconsistentes. La razón es que el mensaje rojo es el que se leyó y escribió en la cola cuando el código de estrategia se ejecutó por primera vez.
Por ejemplo, después de que se reinicie la estrategia, los datos de mercado registrados aún se pueden obtener de la cola para el cálculo de inicialización y otras operaciones.
Apache Kafka es un almacén de datos distribuido optimizado para ingerir y procesar datos de transmisión en tiempo real. Los datos de transmisión son datos que son generados continuamente por miles de fuentes de datos, a menudo enviando registros de datos simultáneamente.
Kafka ofrece tres funciones principales a sus usuarios:
Kafka se utiliza principalmente para construir tuberías de datos de transmisión en tiempo real y aplicaciones que se adaptan a los flujos de datos.
Despliegue la imagen del docker del proxy de Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Prueba con el código de prueba:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Echemos un vistazo a cómo usar el protocolo Kafka para publicar y suscribir mensajes en la función Dial.
Dial("kafka://localhost:9092/test_topic")
Al igual que otros protocolos, la primera parte es el nombre del protocolo.localhost:9092
. Luego use el símbolo test_topic
.
Resultados de las pruebas: