[TOC] Je vous en prie.
Avec l'évolution rapide des marchés financiers et la popularité des transactions quantitatives, de plus en plus de traders commencent à compter sur des stratégies automatisées pour effectuer des transactions. Dans ce processus, la communication et la coordination entre les stratégies sont particulièrement importantes. FMZ (Quantitative Trading Platform) aide les traders à réaliser des stratégies de partage de données en temps réel et en toute transparence en fournissant des protocoles de communication entre les disques de négociation efficaces.
Cet article explore en profondeur les protocoles de communication stratégique sur le terrain dans la plate-forme FMZ, en expliquant leurs concepts, leurs fonctionnalités et leurs avantages dans les applications concrètes. Nous montrerons, à travers des analyses de cas détaillées, comment utiliser ce protocole pour obtenir une communication stratégique efficace et stable, améliorer l'exécution et la performance des stratégies de négociation.
Que vous soyez un amateur de transactions quantitatives débutant dans FMZ ou un programmeur professionnel expérimenté, cet article vous fournira des informations précieuses et des guides pratiques. Explorons ensemble les fonctionnalités puissantes de FMZ et découvrons comment, grâce à des protocoles de communication efficaces, réaliser une collaboration synchrone entre stratégies, améliorer l'efficacité des transactions et saisir les opportunités du marché.
Ces scénarios de demande montrent les multiples possibilités et avantages des protocoles de communication entre les plateaux dans les applications pratiques des stratégies de trading FMZ. Grâce à une communication stratégique efficace, les traders peuvent mieux faire face à un environnement de marché complexe, optimiser leurs stratégies de trading, améliorer l'efficacité et les bénéfices des transactions.
Une fois les besoins de communication entre les disques physiques compris, il est nécessaire de réfléchir à la façon de les réaliser. Tout ce que le disque A veut, c'est pouvoir interagir avec le disque B, même si les besoins semblent simples.
Mqtt / nats / amqp / kafka
L'architecture de communication est la suivante:
Lorsque ces protocoles sont utilisés sur la plateforme FMZ, ils sont simplement interprétés comme mqtt / nats / amqp / kafka.Dial()
Dans les fonctions, utilisezDial()
Les fonctions effectuent des messages, des abonnements, etc. Ces messages sont envoyés par l'intermédiaire de l'agent du serveur du protocole (relayé) au disque dur du souscripteur, il faut donc d'abord lancer un serveur du protocole. Pour simplifier la démonstration, dans les exemples suivants, nous utilisons des déploiements de miroirs de serveur de protocole.
La section de la documentation API dans laquelle la fonction Dial est située:https://www.fmz.com/syntax-guide#fun_dial
N'oubliez pas d'installer le logiciel docker avant de déployer le miroir docker.
Nous allons ensuite explorer ensemble les applications pratiques des protocoles de communication pris en charge par FMZ.
MQTT (Message Queuing Telemetry Transport) est un protocole de transport de messages léger, particulièrement adapté aux environnements réseau à faible bande passante, à haute latence ou peu fiables. Il a été proposé par Andy Stanford-Clark et Arlen Nipper d'IBM en 1999 et est devenu une norme ISO (ISO/IEC PRF 20922) [2].
Principales caractéristiques du protocole MQTT: mode de publication/abonnement
Comme nous déployons un serveur proxy MQTT avec le logiciel supportant le protocole MQTT (eclipse-mosquitto mirror), il est préférable d'installer le docker à l'avance et de ne pas en parler plus tard.
Avant d'exécuter la commande de déploiement du miroir, nous devons écrire un profil de serveur proxy.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Il a ensuite exécuté la commande de déploiement:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
L'image du serveur proxy apparaît comme suit après avoir été activée:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Nous pouvons alors tester les stratégies et les mettre en pratique.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Le code stratégique utilise principalement la fonction Dial:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Le début de la fonction Dialmqtt://
Il s'agit du nom du protocole, suivi de l'adresse d'écoute, du port, du symbole "?" et du nom du sujet de souscription/déclaration, où le sujet de test est appelé:test_topic
。
Les stratégies ci-dessus sont les suivantes:
Il est également possible d'utiliser deux disques pour s'abonner et publier des informations thématiques, un exemple que nous avons utilisé dans la section pratique du protocole nats, mais qui n'est plus mentionné dans les autres protocoles.
Le protocole NATS est un protocole simple, de type publication/abonnement, basé sur du texte. Le client se connecte au serveur gnatsd (NATS) et communique avec gnatsd. Le protocole est basé sur un ensemble de mots TCP/IP ordinaire et définit un très petit ensemble d'opérations, qui indiquent la fin des lignes.
Chaque accord a ses propres caractéristiques et vous pouvez consulter des documents ou des informations spécifiques, qui ne sont pas présentées ici.
Pour déployer le serveur du protocole nats:
-P -P 4222:4222 -P 8222:8222 nats
http_port 8222 auth admin
Cette commande docker télécharge et exécute automatiquement nats image, le port 4222 est le port auquel le client souhaite accéder. Une fois l'image déployée, un moniteur http est également ouvert sur le port 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
Les serveurs sont mis en place pour surveiller le port 4222.
Nous avons besoin de créer deux stratégies (disque virtuel) qui, pour l'instant, sont appelées stratégie A et stratégie B, et qui sont essentiellement du même code.
Stratégie A
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Stratégie B
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Les deux stratégies sont essentiellement les mêmes, mais elles diffèrent en termes de publication, d'abonnement, de thème d'abonnement, de thème de publication, de diffusion d'information.
L'exemple de la stratégie B:
1, UtilisationDial()
Fonction permettant de créer des objets serveurs connectés au clientconnPub
Le blogueur a publié un article intitulé:
Je ne sais pas comment faire.127.0.0.1:4222?topic=pubRobotB”)
La chaîne de paramètres de la fonction Dial, qui commence parnats://
C'est le protocole nats qui est utilisé pour communiquer.admin
C'est un simple message de vérification que vous mettez en place lorsque vous déployez un miroir docker.auth admin
Utilisez le caractère "@127.0.0.1:4222
Le blogueur a écrit:topic=pubRobotB
Attention à l'intervalle entre les adresses précédentes et celles ci-dessus avec le symbole?
2, UtilisationDial()
Fonction permettant de créer des objets serveurs connectés au clientconnSub
Pour les abonnements:
Je ne sais pas comment faire.127.0.0.1:4222?topic=pubRobotA”)
La seule différence.topic=pubRobotA
Il y a une différence entre les deux, car il faut s'abonner à la stratégie A pour envoyer des messages.pubRobotA
。
Pour la création et l'utilisation des objets de connexion de souscription et de publication dans la stratégie A, il en va de même pour ce qui est décrit ci-dessus.
La stratégie A est en marche.
La stratégie B est en marche.
Cela permet de réaliser un exemple simple d'application du protocole nats pour la communication entre le disque A et le disque B.
Dans la communication asynchrone, le message n'arrive pas immédiatement au destinataire, mais est stocké dans un conteneur, où, après avoir satisfait à certaines conditions, le message est envoyé par le conteneur au destinataire, le conteneur est la file d'attente du message. Pour ce faire, les parties et le conteneur ainsi que ses composants doivent respecter des conventions et des règles uniformes.
Chaque accord a ses propres caractéristiques et vous pouvez consulter des documents ou des informations spécifiques, qui ne sont pas présentées ici.
Pour déployer le serveur du protocole amqp:
Le nom de l'hôte de mon lapin est le suivant:
Lorsque vous déployez le Docker Mirror, il télécharge automatiquement le déploiement et affiche:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Une fois l'image du serveur déployée, écrivez un exemple de test:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Une queue utilisant le protocole amqp est une queue où les messages post-déclaration restent dans la queue, par exemple lorsque nous exécutons le code d'exemple ci-dessus une fois. Nous écrivons 10 messages à la queue.
Comme vous pouvez le voir dans la capture d'écran, les deux messages de journal pointés par une flèche rouge ne correspondent pas à l'heure, car le message en rouge est celui lu, mais celui écrit en queue lors de la première exécution du code de stratégie.
Des besoins peuvent être réalisés en fonction de cette fonctionnalité, par exemple: après le redémarrage du disque dur de la stratégie, il est toujours possible d'obtenir des données de marché enregistrées à partir de la file d'attente pour des opérations telles que le calcul d'initialisation.
Apache Kafka est un stockage de données distribué optimisé pour extraire et traiter les flux de données en temps réel. Les flux de données sont des données générées en continu par des milliers de sources de données, qui peuvent généralement être envoyées simultanément.
Kafka offre à ses utilisateurs trois fonctionnalités principales:
Kafka est principalement utilisé pour construire des pipelines et des applications de flux de données en temps réel adaptés aux flux de données. Il combine des fonctionnalités de transmission, stockage et traitement de flux pour stocker des données historiques et en temps réel.
Une image de Docker déployée par Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Tests avec des codes de test:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Nous allons voir comment utiliser le protocole kafka pour publier et s'abonner à des messages dans la fonction Dial.
Dial("kafka://localhost:9092/test_topic")
Comme pour d'autres protocoles, le nom du protocole commence par le nom; ensuite, suivez l'adresse de l'écoute:localhost:9092
Ensuite, en utilisant le symbole "/" comme intervalle, écrivez le sujet de l'abonnement / publication après, où le sujet de test est défini commetest_topic
。
Les résultats des tests: