[TOC]
Con el rápido desarrollo de los mercados financieros y la popularidad del trading cuantitativo, cada vez más operadores comienzan a confiar en estrategias automatizadas para operar. En este proceso, la comunicación y la coordinación entre estrategias son especialmente importantes. FMZ (Plataforma de negociación cuantitativa) ayuda a los operadores a lograr una integración perfecta de estrategias y un intercambio de datos en tiempo real al proporcionar un protocolo de comunicación eficiente entre estrategias de negociación reales.
Este artículo explorará en profundidad el protocolo de comunicación en tiempo real de las estrategias comerciales en la plataforma FMZ y presentará su concepto de diseño, características funcionales y ventajas en aplicaciones prácticas. A través de un análisis de caso detallado, demostraremos cómo utilizar este protocolo para lograr una comunicación de estrategia eficiente y estable y mejorar la ejecución y la rentabilidad de las estrategias comerciales.
Ya sea un entusiasta del trading cuantitativo que recién comienza a utilizar FMZ o un programador profesional experimentado, este artículo le brindará información valiosa y guías prácticas de operación. Exploremos las poderosas funciones de FMZ y aprendamos cómo lograr la colaboración entre estrategias a través de protocolos de comunicación eficientes, mejorar la eficiencia comercial y capturar oportunidades de mercado.
Estos escenarios de demanda demuestran las diversas posibilidades y ventajas del protocolo de comunicación en tiempo real de la estrategia comercial FMZ en aplicaciones prácticas. A través de una comunicación efectiva entre estrategias, los traders pueden afrontar mejor entornos de mercado complejos, optimizar las estrategias comerciales y mejorar la eficiencia y las ganancias comerciales.
Después de comprender los requisitos de comunicación entre discos reales, debemos considerar cómo realizar estos requisitos. No es más que que el mercado real A espera intercambiar información con el mercado real B, aunque la demanda parezca simple. Sin embargo, es necesario acordar varios detalles mediante un conjunto de protocolos de comunicación. FMZ ha encapsulado varios protocolos de comunicación populares.
mqtt / nats / amqp / kafka
La arquitectura de la comunicación es:
Al aplicar estos protocolos en la plataforma FMZ, se puede entender simplemente como mqtt/nats/amqp/kafka. Estos protocolos están integrados enDial()
En la función, utiliceDial()
Las funciones realizan operaciones como publicación y suscripción de mensajes. Estos mensajes publicados se transmiten al disco real suscrito a través del servidor de protocolo, por lo que primero se debe ejecutar un servidor de protocolo. Para facilitar la demostración, utilizamos varias implementaciones de imágenes de servidor de protocolo en los siguientes ejemplos.
Sección de documentación de la API de la función de marcado: https://www.fmz.com/syntax-guide#fun_dial
Antes de implementar la imagen de Docker, recuerde instalar primero el software de Docker.
A continuación, exploremos y practiquemos las aplicaciones del protocolo de comunicación compatibles con FMZ.
MQTT (Message Queuing Telemetry Transport) es un protocolo de transmisión de mensajes liviano que es particularmente adecuado para entornos de red con bajo ancho de banda, alta latencia o poco confiables. Fue propuesto por Andy Stanford-Clark y Arlen Nipper de IBM en 1999 y posteriormente se convirtió en un estándar ISO (ISO/IEC PRF 20922).
Las principales características del protocolo MQTT: modo publicación/suscripción
Dado que utilizamos la imagen de Docker (imagen eclipse-mosquitto) del software que admite el protocolo MQTT para implementar el servidor proxy MQTT, hemos instalado Docker con anticipación y no entraremos en detalles más adelante.
Antes de ejecutar el comando para implementar la imagen, necesitamos escribir un archivo de configuración del servidor proxymosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Luego ejecute el comando de implementación:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Una vez ejecutada la imagen del servidor proxy, aparece la siguiente pantalla:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Luego podremos probar la estrategia para ponerla en práctica.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
El uso principal de la función Dial en el código de estrategia es:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
El parámetro de cadena de la función Dial comienza conmqtt://
Es el nombre del protocolo, seguido de la dirección de escucha y el puerto. El símbolo “?” va seguido del nombre del tema de suscripción o publicación. El nombre del tema probado aquí es:test_topic
。
La estrategia anterior publica y se suscribe a un tema al mismo tiempo. La prueba en ejecución se muestra en la figura:
También puede utilizar dos discos reales para suscribirse entre sí y publicar información sobre temas. Utilizamos un ejemplo de este tipo en la sección de práctica del protocolo NATS y no repetiremos este método en otros protocolos.
El protocolo NATS es un protocolo de estilo publicación/suscripción simple y basado en texto. El cliente se conecta a gnatsd (servidor NATS) y se comunica con él. La comunicación se basa en sockets TCP/IP comunes y define un conjunto muy pequeño de operaciones. La nueva línea indica la terminación. A diferencia de los sistemas de mensajería tradicionales que utilizan formatos de mensajes binarios, el protocolo NATS basado en texto hace que la implementación del cliente sea sencilla y puede implementarse fácilmente en una variedad de lenguajes de programación o lenguajes de script.
Cada protocolo tiene sus propias características. Puedes consultar los documentos y materiales específicos, que no se detallarán aquí.
Implementar el servidor de protocolo NATS:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Este comando de Docker descargará y ejecutará automáticamente la imagen nats. El puerto 4222 es el puerto al que el cliente desea acceder. Después de implementar la imagen, se abrirá un monitor http en el puerto 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
La imagen del servidor nats comienza a ejecutarse y escucha en el puerto 4222.
Necesitamos crear dos estrategias (operaciones reales), que por ahora denominaremos Estrategia A y Estrategia B. Los códigos de estas dos estrategias son básicamente los mismos. Escrito en Javascript, el lenguaje más fácil de usar en la plataforma FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Estas dos estrategias son básicamente las mismas, excepto que se publican y se suscriben entre sí, y los temas suscritos, los temas publicados y la información publicada son diferentes.
Tomemos como ejemplo la estrategia B:
Dial()
La función crea un objeto de servidor de conexión de clienteconnPub
, utilizado para publicar mensajes de tema:var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
La cadena de parámetros de la función Dial comienza connats://
Indica que se utiliza el protocolo NATS para la comunicación, entoncesadmin
Es un conjunto de información de verificación simple que se establece al implementar la imagen de Docker.auth admin
, utilice el carácter “@” para separar el siguiente contenido y luego la dirección del servicio y el puerto.127.0.0.1:4222
, y por último el tema publicar/suscribirse:topic=pubRobotB
Tenga en cuenta que hay un símbolo “?” entre la dirección anterior.
Dial()
La función crea un objeto de servidor de conexión de clienteconnSub
, utilizado para suscripción de mensajes de tema:var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
La única diferenciatopic=pubRobotA
Diferente, porque necesitas suscribirte al tema donde la estrategia A envía informaciónpubRobotA
。
La creación y el uso de objetos de conexión de suscripción y publicación en la estrategia A son los mismos que los descritos anteriormente.
De esta manera se implementa un ejemplo sencillo de aplicación del protocolo NATS en el que el disco real A y el disco real B se suscriben y publican mensajes para comunicarse entre sí.
En la comunicación asincrónica, el mensaje no llegará al receptor inmediatamente, sino que se almacenará en un contenedor. Cuando se cumplan ciertas condiciones, el mensaje será enviado al receptor por el contenedor. Este contenedor es la cola de mensajes. Para completar esta función Ambas partes deben cumplir con las convenciones y reglas unificadas. AMQP es un protocolo de este tipo. Tanto el remitente como el receptor de mensajes pueden lograr una comunicación asincrónica al cumplir con este protocolo. Este protocolo especifica el formato de los mensajes y cómo funcionan.
Cada protocolo tiene sus propias características. Puedes consultar los documentos y materiales específicos, que no se detallarán aquí.
Implementar el servidor de protocolo amqp:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Al implementar una imagen de Docker, se descargará e implementará automáticamente y, cuando se complete, se mostrará lo siguiente:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Una vez implementada la imagen del servidor, escriba un ejemplo de prueba:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Al utilizar la cola del protocolo AMQP, tenga en cuenta que los mensajes publicados permanecerán en la cola. Por ejemplo, ejecutemos el código de ejemplo anterior una vez. Se escribirán 10 mensajes en la cola. Luego, cuando lo ejecutamos por segunda vez, podemos encontrar que al leer, la información escrita por primera vez se leerá nuevamente. Como se muestra en la figura:
Puede ver que los dos mensajes de registro señalados por las flechas rojas en la captura de pantalla tienen tiempos inconsistentes. La razón es que el mensaje rojo es el que se leyó y escribió en la cola cuando se ejecutó por primera vez el código de estrategia.
Con esta función se pueden cumplir algunos requisitos. Por ejemplo, después de reiniciar la estrategia, los datos de mercado registrados aún se pueden obtener de la cola para el cálculo de inicialización y otras operaciones.
Apache Kafka es un almacén de datos distribuido optimizado para ingerir y procesar datos de transmisión en tiempo real. Los datos de transmisión se refieren a datos que se generan continuamente por miles de fuentes de datos, que a menudo envían registros de datos simultáneamente. La plataforma de transmisión necesita procesar estos datos que fluyen continuamente y procesarlos paso a paso en secuencia.
Kafka ofrece tres funciones principales a sus usuarios:
Kafka se utiliza principalmente para crear canales y aplicaciones de transmisión de datos en tiempo real que se adaptan a los flujos de datos. Combina capacidades de mensajería, almacenamiento y procesamiento de flujo para almacenar datos históricos y en tiempo real.
Implementar la imagen de Docker del proxy de Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Prueba usando el código de prueba:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Veamos cómo utilizar el protocolo Kafka para publicar y suscribirse a mensajes en la función Dial.
Dial("kafka://localhost:9092/test_topic")
Al igual que varios otros protocolos, la primera parte es el nombre del protocolo. A continuación la dirección de escucha:localhost:9092
. Luego, utilice el símbolo “/” como separador, seguido del tema de suscripción/publicación. Aquí, el tema de prueba se establece entest_topic
。
Resultados de la prueba: