[TOC]
Avec le développement rapide des marchés financiers et la popularité du trading quantitatif, de plus en plus de traders commencent à s’appuyer sur des stratégies automatisées pour trader. Dans ce processus, la communication et la coordination entre les stratégies sont particulièrement importantes. FMZ (Quantitative Trading Platform) aide les traders à réaliser une intégration transparente des stratégies et un partage de données en temps réel en fournissant un protocole de communication efficace entre les stratégies de trading réelles.
Cet article explorera en profondeur le protocole de communication en temps réel des stratégies de trading sur la plateforme FMZ et présentera son concept de conception, ses caractéristiques fonctionnelles et ses avantages dans les applications pratiques. Grâce à une analyse de cas détaillée, nous démontrerons comment utiliser ce protocole pour parvenir à une communication stratégique efficace et stable et améliorer l’exécution et la rentabilité des stratégies de trading.
Que vous soyez un passionné de trading quantitatif qui débute avec FMZ ou un programmeur professionnel expérimenté, cet article vous fournira des informations précieuses et des guides d’utilisation pratiques. Explorons les puissantes fonctions de FMZ et apprenons à réaliser une collaboration entre les stratégies grâce à des protocoles de communication efficaces, à améliorer l’efficacité des transactions et à saisir les opportunités du marché.
Ces scénarios de demande démontrent les différentes possibilités et avantages du protocole de communication en temps réel de la stratégie de trading FMZ dans des applications pratiques. Grâce à une communication efficace entre les stratégies, les traders peuvent mieux faire face aux environnements de marché complexes, optimiser les stratégies de trading et améliorer l’efficacité et les bénéfices du trading.
Après avoir compris les exigences de communication entre les disques réels, nous devons réfléchir à la manière de réaliser ces exigences. Il ne s’agit de rien d’autre que du fait que le cas réel A espère échanger des informations avec le cas réel B, même si la demande semble simple. Toutefois, il est nécessaire de convenir de plusieurs détails à l’aide d’un ensemble de protocoles de communication. FMZ a encapsulé plusieurs protocoles de communication populaires.
mqtt / nats / amqp / kafka
L’architecture de communication est :
Lors de l’application de ces protocoles sur la plateforme FMZ, on peut simplement les comprendre comme mqtt/nats/amqp/kafka. Ces protocoles sont intégrés dansDial()
Dans la fonction, utilisezDial()
Les fonctions exécutent des opérations telles que la publication de messages et l’abonnement. Ces messages publiés sont transmis par proxy (relayés) au disque réel abonné via le serveur de protocole, un serveur de protocole doit donc être exécuté en premier. Pour faciliter la démonstration, nous utilisons divers déploiements d’images de serveur de protocole dans les exemples suivants.
Section de documentation de l’API de la fonction Dial : https://www.fmz.com/syntax-guide#fun_dial
Avant de déployer l’image Docker, n’oubliez pas d’installer d’abord le logiciel Docker.
Ensuite, explorons et pratiquons les applications du protocole de communication prises en charge par FMZ.
MQTT (Message Queuing Telemetry Transport) est un protocole de transmission de messages léger particulièrement adapté aux environnements réseau à faible bande passante, à latence élevée ou peu fiables. Elle a été proposée par Andy Stanford-Clark et Arlen Nipper d’IBM en 1999 et est devenue plus tard une norme ISO (ISO/IEC PRF 20922).
Les principales fonctionnalités du protocole MQTT : mode publication/abonnement
Étant donné que nous utilisons l’image Docker (image eclipse-mosquitto) du logiciel prenant en charge le protocole MQTT pour déployer le serveur proxy MQTT, nous avons installé Docker à l’avance et n’entrerons pas dans les détails plus tard.
Avant d’exécuter la commande pour déployer l’image, nous devons écrire un fichier de configuration de serveur proxymosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Exécutez ensuite la commande de déploiement :
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Une fois l’image du serveur proxy exécutée, l’écran suivant s’affiche :
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Nous pouvons ensuite tester la stratégie pour la mettre en pratique.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
L’utilisation principale de la fonction Dial dans le code de stratégie est :
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Le paramètre de chaîne de la fonction Dial commence parmqtt://
Il s’agit du nom du protocole, suivi de l’adresse d’écoute et du port. Le symbole « ? » est suivi du nom du sujet d’abonnement/publication. Le nom du sujet testé ici est :test_topic
。
La stratégie ci-dessus consiste à publier et à souscrire à un sujet en même temps. Le test en cours est le suivant :
Vous pouvez également utiliser deux disques réels pour vous abonner l’un à l’autre et publier des informations sur les sujets. Nous utilisons un tel exemple dans la section Pratique du protocole NATS et ne répéterons pas cette méthode dans d’autres protocoles.
Le protocole NATS est un protocole simple de type publication/abonnement basé sur du texte. Le client se connecte à gnatsd (serveur NATS) et communique avec gnatsd. La communication est basée sur des sockets TCP/IP ordinaires et définit un très petit ensemble d’opérations. Une nouvelle ligne indique la fin de l’opération. Contrairement aux systèmes de messagerie traditionnels qui utilisent des formats de messages binaires, le protocole NATS basé sur du texte simplifie la mise en œuvre du client et peut être facilement implémenté dans une variété de langages de programmation ou de langages de script.
Chaque protocole a ses propres caractéristiques. Vous pouvez vous référer aux documents et matériels spécifiques, qui ne seront pas développés ici.
Déployer le serveur de protocole NATS :
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Cette commande Docker téléchargera et exécutera automatiquement l’image nats. Le port 4222 est le port auquel le client souhaite accéder. Une fois l’image déployée, un moniteur http sera ouvert sur le port 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
L’image du serveur nats commence à s’exécuter, en écoutant sur le port 4222.
Nous devons créer deux stratégies (trading réel), appelons-les pour l’instant Stratégie A et Stratégie B. Les codes de ces deux stratégies sont fondamentalement les mêmes. Écrit en Javascript, le langage le plus simple à utiliser sur la plateforme FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Ces deux stratégies sont fondamentalement les mêmes, sauf qu’elles publient et s’abonnent l’une à l’autre, et les sujets souscrits, les sujets publiés et les informations publiées sont différents.
Prenons l’exemple de la stratégie B :
Dial()
La fonction crée un objet serveur de connexion clientconnPub
, utilisé pour la publication de messages thématiques :var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
La chaîne de paramètres de la fonction Dial commence parnats://
Indique que le protocole NATS est utilisé pour la communication, alorsadmin
Il s’agit d’un ensemble d’informations de vérification simple lors du déploiement de l’image Dockerauth admin
, utilisez le caractère « @ » pour séparer le contenu suivant, puis l’adresse du service et le port127.0.0.1:4222
, et enfin le sujet de publication/abonnement :topic=pubRobotB
Notez qu’il y a un symbole « ? » entre l’adresse précédente.
Dial()
La fonction crée un objet serveur de connexion clientconnSub
, utilisé pour l’abonnement aux messages thématiques :var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
La seule différencetopic=pubRobotA
Différent, car vous devez vous abonner au sujet où la stratégie A envoie des informationspubRobotA
。
La création et l’utilisation des objets de connexion d’abonnement et de publication dans la stratégie A sont les mêmes que celles décrites ci-dessus.
De cette manière, un exemple simple d’application du protocole NATS est mis en œuvre dans lequel le disque réel A et le disque réel B s’abonnent et publient des messages pour communiquer entre eux.
Dans une communication asynchrone, le message n’atteindra pas immédiatement le récepteur, mais sera stocké dans un conteneur. Lorsque certaines conditions sont remplies, le message sera envoyé au récepteur par le conteneur. Ce conteneur est la file d’attente des messages. Pour compléter cette fonction , les deux parties doivent Le conteneur et ses composants doivent respecter des conventions et des règles unifiées. AMQP est un tel protocole. L’expéditeur et le destinataire des messages peuvent tous deux réaliser une communication asynchrone en se conformant à ce protocole. Ce protocole spécifie le format des messages et leur fonctionnement.
Chaque protocole a ses propres caractéristiques. Vous pouvez vous référer aux documents et matériels spécifiques, qui ne seront pas développés ici.
Déployer le serveur de protocole amqp :
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Lors du déploiement d’une image Docker, elle sera automatiquement téléchargée et déployée, et une fois terminée, elle s’affichera :
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Une fois l’image du serveur déployée, écrivez un exemple de test :
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Lorsque vous utilisez la file d’attente du protocole AMQP, veuillez noter que les messages publiés persisteront dans la file d’attente. Par exemple, exécutons une fois l’exemple de code ci-dessus. 10 messages seront écrits dans la file d’attente. Ensuite, lorsque nous l’exécutons une deuxième fois, nous pouvons constater que lors de la lecture, les informations écrites la première fois seront lues à nouveau. Comme le montre la figure :
Vous pouvez constater que les deux messages de journal pointés par les flèches rouges dans la capture d’écran ont des heures incohérentes. La raison en est que le message rouge est celui qui a été lu et écrit dans la file d’attente lors de la première exécution du code de stratégie.
Sur la base de cette fonctionnalité, certaines exigences peuvent être satisfaites. Par exemple, après le redémarrage de la stratégie, les données de marché enregistrées peuvent toujours être obtenues à partir de la file d’attente pour le calcul d’initialisation et d’autres opérations.
Apache Kafka est un magasin de données distribué optimisé pour l’ingestion et le traitement de données en streaming en temps réel. Les données en streaming font référence aux données générées en continu par des milliers de sources de données, envoyant souvent des enregistrements de données simultanément. La plateforme de streaming doit traiter ces données en flux continu et les traiter étape par étape, en séquence.
Kafka fournit trois fonctions principales à ses utilisateurs :
Kafka est principalement utilisé pour créer des pipelines de données en streaming en temps réel et des applications qui s’adaptent aux flux de données. Il combine des capacités de messagerie, de stockage et de traitement de flux pour stocker des données historiques et en temps réel.
Déployez l’image Docker du proxy Kafka :
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Testez en utilisant le code de test :
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Voyons comment utiliser le protocole Kafka pour publier et souscrire des messages dans la fonction Dial.
Dial("kafka://localhost:9092/test_topic")
Comme plusieurs autres protocoles, la première partie est le nom du protocole. Suivez ensuite l’adresse d’écoute :localhost:9092
. Utilisez ensuite le symbole « / » comme séparateur, suivi du sujet de l’abonnement/publication. Ici, le sujet du test est défini surtest_topic
。
Résultats des tests :