[TOC] Je vous en prie.
Avec le développement rapide des marchés financiers et la popularité du trading quantitatif, de plus en plus de traders commencent à compter sur des stratégies automatisées pour le trading. Dans ce processus, la communication et la coordination entre les stratégies sont particulièrement importantes.
Cet article explorera le protocole de communication de négociation en direct des stratégies de négociation dans la plate-forme FMZ, présentera son concept de conception, ses caractéristiques fonctionnelles et ses avantages dans les applications pratiques.
Que vous soyez un passionné de trading quantitatif qui commence tout juste avec FMZ ou un programmeur professionnel expérimenté, cet article vous fournira des informations précieuses et des guides d'opération pratiques.
Ces scénarios de demande démontrent les diverses possibilités et avantages du protocole de communication de trading en direct de la stratégie de trading FMZ dans les applications pratiques.
Après avoir compris les exigences de communication entre les transactions en direct, nous devons considérer comment mettre en œuvre ces exigences. Ce n'est rien de plus que le trading en direct A espérant échanger des informations avec le trading en direct B. Bien que les exigences semblent simples, il y a divers détails qui doivent être convenus lors de l'utilisation d'un ensemble de protocoles de communication. FMZ a encapsulé plusieurs protocoles de communication populaires.
Mqtt / nats / amqp / kafka
L'architecture de communication est la suivante:
Serveur (proxy).
Un serveur qui exécute un protocole de communication est nécessaire pour relayer les messages entre les abonnés et les éditeurs. Ce serveur peut être déployé localement sur le système docker
Client (abonné, éditeur). Le programme de trading en direct de stratégie sur FMZ peut être compris comme un client d'un protocole de communication.
Lors de l'application de ces protocoles sur la plateforme FMZ, on peut simplement comprendre que les protocoles mqtt / nats / amqp / kafkap sont intégrés dans le système de gestion des données.Dial()
fonction, et leDial()
La fonction est utilisée pour publier et souscrire des messages. Ces messages publiés sont proxyés (rélayés) au trading en direct souscrit via le serveur de protocole, donc un serveur de protocole doit être exécuté en premier. Pour des raisons de démonstration, nous utilisons divers déploiements d'images de serveur de protocole dans les exemples suivants.
Fonction de numérotation dans la section de documentation API:https://www.fmz.com/syntax-guide#fun_dial
Avant de déployer l'image docker, n'oubliez pas d'installer le logiciel docker en premier.
Ensuite, explorons et pratiquons les applications de protocole de communication prises en charge par FMZ.
MQTT (Message Queuing Telemetry Transport) est un protocole de transmission de messages léger qui convient particulièrement aux environnements réseau à faible bande passante, à haute latence ou peu fiables. Il a été proposé par Andy Stanford-Clark et Arlen Nipper d'IBM en 1999 et est devenu plus tard une norme ISO (ISO / IEC PRF 20922).
Les principales caractéristiques du protocole MQTT: mode publication/abonnement
Publiation: le producteur du message envoie le message au sujet.
Abonnement: Un consommateur de message s'abonne à un sujet d'intérêt, recevant ainsi des messages publiés sur ce sujet.
Broker: MQTT utilise un courtier de messages comme intermédiaire pour transmettre des messages, assurant le découplage entre les éditeurs et les abonnés.
Parce que nous utilisons l'image docker (image eclipse-mosquitto) du logiciel qui prend en charge le protocole MQTT pour déployer le serveur proxy MQTT, nous avons installé docker à l'avance et ne viendrons pas dans les détails plus tard.
Avant d'exécuter la commande pour déployer l'image, nous devons écrire un fichier de configuration de serveur proxymosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
Puis exécutez la commande de déploiement:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Après l'exécution de l'image du serveur proxy, ce qui suit s'affiche:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Ensuite, nous pouvons tester la stratégie pour la mettre en pratique.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
L'utilisation principale de la fonction Dial dans le code de stratégie est la suivante:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Le paramètre de chaîne de la fonction Dial commence parmqtt://
, qui est le nom du protocole, suivi de l'adresse d'écoute et du port. Le symbole test_topic
.
La stratégie ci-dessus publie et souscrit à un sujet en même temps.
Nous pouvons également utiliser deux transactions en direct pour nous abonner et publier des informations sur le sujet.
Le protocole NATS est un protocole de style publication/abonnement simple et textuel. Le client se connecte à gnatsd (serveur NATS) et communique avec gnatsd. La communication est basée sur des sockets TCP/IP ordinaires et définit un très petit ensemble d'opérations. Newline indique la terminaison. Contrairement aux systèmes de communication de messages traditionnels qui utilisent des formats de messages binaires, le protocole NATS textuel rend la mise en œuvre du client très simple et peut être facilement implémentée dans une variété de langages de programmation ou de langages de script.
Chaque protocole a ses propres caractéristiques, vous pouvez consulter les documents et les documents spécifiques, qui ne seront pas détaillés ici.
Déployer le serveur de protocole NATS:
-P -P 4222:4222 -P 8222:8222 nats
http_port 8222 auth admin
Cette commande docker téléchargera et exécutera automatiquement l'image nats, et le port 4222 est le port auquel le client doit accéder.
Listening for client connections on 0.0.0.0:4222
Server is ready
L'image du serveur Nats commence à fonctionner, écoutant sur le port 4222.
Nous devons créer deux stratégies (live trading), appelons-les Stratégie A et Stratégie B. Les codes de ces deux stratégies sont fondamentalement les mêmes.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Ces deux stratégies sont presque les mêmes, sauf qu'elles publient et s'abonnent l'une à l'autre, et que les sujets abonnés, les sujets publiés et les informations publiées sont différents.
Prenons par exemple la stratégie B:
Dial()
fonction pour créer un objet serveur de connexion clientconnPub
pour la publication de messages sur le sujet:Je suis désolée, mais je ne peux pas vous aider.
La chaîne de paramètres de la fonction Dial commence parnats://
Le protocole NATS est utilisé pour la communication.admin
est l'information de vérification simpleauth admin
Le caractère 127.0.0.1:4222
Enfin, il y a le thème de la publication et de l'abonnement:topic=pubRobotB
Notez que le symbole
Dial()
fonction pour créer un objet serveur de connexion clientconnSub
pour l'abonnement au message de sujet:Je suis désolée, mais je n'ai pas le choix.
La seule différence esttopic=pubRobotA
, parce que nous avons besoin de souscrire au sujetpubRobotA
où la stratégie A envoie des informations.
La création et l'utilisation d'objets de connexion d'abonnement et de publication dans la stratégie A sont les mêmes que celles décrites ci-dessus.
De cette façon, un exemple simple d'application de protocole NATS est mis en œuvre dans lequel le trading en direct A et le trading en direct B s'abonnent et publient des messages pour communiquer entre eux.
Dans la communication asynchrone, le message n'atteint pas le récepteur immédiatement, mais il est stocké dans un conteneur. Lorsque certaines conditions sont remplies, le message est envoyé au récepteur par le conteneur. Ce conteneur est la file d'attente du message. Pour remplir cette fonction, les deux parties et le conteneur et ses composants doivent respecter des accords et des règles unifiés.
Chaque protocole a ses propres caractéristiques, vous pouvez consulter les documents et les documents spécifiques, qui ne seront pas détaillés ici.
Déployer le serveur de protocole amqp:
Le nom de l'hôte de mon lapin est le suivant:
Lors du déploiement d'une image docker, elle téléchargera et déploiera automatiquement, et lorsqu'elle sera terminée, elle affichera:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Une fois l'image du serveur déployée, écrivez un exemple de test:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Lorsque vous utilisez la file d'attente du protocole AMQP, veuillez noter que les messages publiés persisteront dans la file d'attente. Par exemple, si nous exécutons le code d'exemple ci-dessus, 10 messages seront écrits dans la file d'attente. Ensuite, lorsque nous l'exécutons une deuxième fois, nous pouvons constater que le premier message écrit sera lu à nouveau lors de la lecture. Comme indiqué sur la figure:
Nous pouvons voir que les deux messages de journal pointés par les flèches rouges dans la capture d'écran ont des temps incohérents. La raison en est que le message rouge est celui qui a été lu et écrit à la file d'attente lorsque le code de stratégie a été exécuté pour la première fois.
Par exemple, après le redémarrage de la stratégie, les données de marché enregistrées peuvent encore être obtenues à partir de la file d'attente pour le calcul de l'initialisation et d'autres opérations.
Apache Kafka est un entrepôt de données distribué optimisé pour ingérer et traiter les données en streaming en temps réel. Les données en streaming sont des données générées en continu par des milliers de sources de données, envoyant souvent des enregistrements de données simultanément.
Kafka fournit trois fonctions principales à ses utilisateurs:
Kafka est principalement utilisé pour créer des pipelines de données en streaming en temps réel et des applications qui s'adaptent aux flux de données.
Déployer l'image du docker du proxy Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Test à l'aide du code de test:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Examinons comment utiliser le protocole Kafka pour publier et souscrire des messages dans la fonction Dial.
Dial("kafka://localhost:9092/test_topic")
Comme pour les autres protocoles, la première partie est le nom du protocole, suivi de l'adresse d'écoute:localhost:9092
. Utilisez ensuite le symbole test_topic
.
Résultats des tests: