Les ressources ont été chargées... Je charge...

Exploration de FMZ: pratique du protocole de communication entre les stratégies de négociation en direct

Auteur:FMZ~Lydia, Créé: 2024-08-08 10:09:21, mis à jour: 2024-09-12 09:46:18

img

Avec le développement rapide des marchés financiers et la popularité du trading quantitatif, de plus en plus de traders commencent à compter sur des stratégies automatisées pour le trading. Dans ce processus, la communication et la coordination entre les stratégies sont particulièrement importantes.

Cet article explorera le protocole de communication de négociation en direct des stratégies de négociation dans la plate-forme FMZ, présentera son concept de conception, ses caractéristiques fonctionnelles et ses avantages dans les applications pratiques.

Que vous soyez un passionné de trading quantitatif qui commence tout juste avec FMZ ou un programmeur professionnel expérimenté, cet article vous fournira des informations précieuses et des guides d'opération pratiques.

Scénario de la demande

    1. Commerce collaboratif à plusieurs stratégies Scénario de la demande: Dans un environnement de marché complexe, une seule stratégie peut ne pas être en mesure de faire face à diverses urgences et changements du marché. Les traders veulent exécuter plusieurs stratégies en même temps, telles que des stratégies de suivi des tendances, des stratégies de réversion moyenne et des stratégies d'arbitrage, et laisser ces stratégies communiquer en temps réel pour partager des informations sur le marché et des signaux de trading, améliorant ainsi l'efficacité et la stabilité globales du trading.
    1. Arbitrage transfrontalier Scénario de la demande: Les traders souhaitent effectuer des transactions d'arbitrage entre différents marchés de négociation. Par exemple, l'arbitrage en utilisant la différence de prix entre le marché des actions A et le marché boursier de Hong Kong. Lorsqu'une anomalie de prix se produit sur un certain marché, la stratégie doit notifier rapidement les stratégies sur d'autres marchés pour effectuer les opérations d'achat et de vente correspondantes afin de saisir les opportunités d'arbitrage.
    1. Gestion des risques et couverture Scénario de la demande: Une stratégie est chargée de trouver et d'exécuter des transactions à haut risque et à haut rendement sur le marché, tandis qu'une autre stratégie se concentre sur le suivi du risque global et l'exécution d'opérations de couverture.
    1. Système de négociation distribué Scénario de la demande: Les grandes institutions de négociation souhaitent exécuter des systèmes de négociation distribués sur plusieurs serveurs physiques afin d'améliorer la tolérance aux pannes et les performances du système de négociation.
    1. Surveillance du marché et alerte précoce Scénario de la demande: Une stratégie est responsable de la surveillance en temps réel de la dynamique du marché. Lorsqu'il y a des changements majeurs sur le marché (tels qu'une chute ou une hausse soudaine des prix), la stratégie doit informer rapidement d'autres stratégies pour prendre les mesures de réponse correspondantes, telles que la fermeture de positions, l'ajustement de positions ou l'ajout de positions, afin de réduire les risques ou de saisir les opportunités de trading.
    1. Gestion de la stratégie de portefeuille Scénario de la demande: Les traders utilisent un portefeuille de stratégies pour gérer les investissements dans différentes classes d'actifs, chaque stratégie se concentrant sur une classe d'actifs spécifique (comme les actions, les obligations, les contrats à terme, etc.).

Ces scénarios de demande démontrent les diverses possibilités et avantages du protocole de communication de trading en direct de la stratégie de trading FMZ dans les applications pratiques.

Protocole de communication encapsulé FMZ et fonction de composition

Après avoir compris les exigences de communication entre les transactions en direct, nous devons considérer comment mettre en œuvre ces exigences. Ce n'est rien de plus que le trading en direct A espérant échanger des informations avec le trading en direct B. Bien que les exigences semblent simples, il y a divers détails qui doivent être convenus lors de l'utilisation d'un ensemble de protocoles de communication. FMZ a encapsulé plusieurs protocoles de communication populaires.

Mqtt / nats / amqp / kafka

Architecture de la communication

L'architecture de communication est la suivante:

  • Serveur (proxy). Un serveur qui exécute un protocole de communication est nécessaire pour relayer les messages entre les abonnés et les éditeurs. Ce serveur peut être déployé localement sur le système dockers (pour la communication commerciale en direct locale) ou comme un service distant (pour la communication commerciale en direct entre serveurs).
  • Client (abonné, éditeur). Le programme de trading en direct de stratégie sur FMZ peut être compris comme un client d'un protocole de communication.

Fonction de numérotation

Lors de l'application de ces protocoles sur la plateforme FMZ, on peut simplement comprendre que les protocoles mqtt / nats / amqp / kafkap sont intégrés dans le système de gestion des données.Dial()fonction, et leDial()La fonction est utilisée pour publier et souscrire des messages. Ces messages publiés sont proxyés (rélayés) au trading en direct souscrit via le serveur de protocole, donc un serveur de protocole doit être exécuté en premier. Pour des raisons de démonstration, nous utilisons divers déploiements d'images de serveur de protocole dans les exemples suivants.

Fonction de numérotation dans la section de documentation API:https://www.fmz.com/syntax-guide#fun_dial

Avant de déployer l'image docker, n'oubliez pas d'installer le logiciel docker en premier.

img

Ensuite, explorons et pratiquons les applications de protocole de communication prises en charge par FMZ.

Plateforme FMZ Pratique de protocole de communication de négociation en direct

Le protocole mqtt

MQTT (Message Queuing Telemetry Transport) est un protocole de transmission de messages léger qui convient particulièrement aux environnements réseau à faible bande passante, à haute latence ou peu fiables. Il a été proposé par Andy Stanford-Clark et Arlen Nipper d'IBM en 1999 et est devenu plus tard une norme ISO (ISO / IEC PRF 20922).

Les principales caractéristiques du protocole MQTT: mode publication/abonnement

  • Publiation: le producteur du message envoie le message au sujet.
  • Abonnement: Un consommateur de message s'abonne à un sujet d'intérêt, recevant ainsi des messages publiés sur ce sujet.
  • Broker: MQTT utilise un courtier de messages comme intermédiaire pour transmettre des messages, assurant le découplage entre les éditeurs et les abonnés.

Publication et abonnement des messages

Parce que nous utilisons l'image docker (image eclipse-mosquitto) du logiciel qui prend en charge le protocole MQTT pour déployer le serveur proxy MQTT, nous avons installé docker à l'avance et ne viendrons pas dans les détails plus tard.

Avant d'exécuter la commande pour déployer l'image, nous devons écrire un fichier de configuration de serveur proxymosquitto.conf.

# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true

Puis exécutez la commande de déploiement:

docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

Après l'exécution de l'image du serveur proxy, ce qui suit s'affiche:

1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running

Ensuite, nous pouvons tester la stratégie pour la mettre en pratique.

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("Current live trading robotId:", robotId)

    conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    if (!conn) {
        Log("Communication failure!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // Write
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("Write a message to testQueue:", msg)

        // Read
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("close conn")
}

L'utilisation principale de la fonction Dial dans le code de stratégie est la suivante:

Dial("mqtt://127.0.0.1:1883?topic=test_topic")

Le paramètre de chaîne de la fonction Dial commence parmqtt://, qui est le nom du protocole, suivi de l'adresse d'écoute et du port. Le symbole ? est suivi du nom du sujet d'abonnement/publication.test_topic.

La stratégie ci-dessus publie et souscrit à un sujet en même temps.

img

Nous pouvons également utiliser deux transactions en direct pour nous abonner et publier des informations sur le sujet.

Nats Protocole

Le protocole NATS est un protocole de style publication/abonnement simple et textuel. Le client se connecte à gnatsd (serveur NATS) et communique avec gnatsd. La communication est basée sur des sockets TCP/IP ordinaires et définit un très petit ensemble d'opérations. Newline indique la terminaison. Contrairement aux systèmes de communication de messages traditionnels qui utilisent des formats de messages binaires, le protocole NATS textuel rend la mise en œuvre du client très simple et peut être facilement implémentée dans une variété de langages de programmation ou de langages de script.

Chaque protocole a ses propres caractéristiques, vous pouvez consulter les documents et les documents spécifiques, qui ne seront pas détaillés ici.

Déployer le serveur de protocole NATS:

-P -P 4222:4222 -P 8222:8222 nats http_port 8222 auth admin

Cette commande docker téléchargera et exécutera automatiquement l'image nats, et le port 4222 est le port auquel le client doit accéder.

Listening for client connections on 0.0.0.0:4222
Server is ready

L'image du serveur Nats commence à fonctionner, écoutant sur le port 4222.

Communication entre les stratégies de négociation en direct des appareils locaux

Nous devons créer deux stratégies (live trading), appelons-les Stratégie A et Stratégie B. Les codes de ces deux stratégies sont fondamentalement les mêmes.

  • Stratégie A
var connPub = null 
var connSub = null

function main() {
    var robotId = _G()
    Log("Current live trading robotId:", robotId)

    connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
    if (!connPub) {
        Log("Communication failure!")
        return 
    }

    connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
    if (!connSub) {
        Log("Communication failure!")
        return 
    }

    while (true) {
        connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
        var msgRead = connSub.read(10000)
        if (msgRead) {
            Log("msgRead:", msgRead)
        }

        LogStatus(_D())
        Sleep(10000)
    }
}

function onexit() {
    connPub.close()
    connSub.close()
}
  • Stratégie B
var connPub = null 
var connSub = null

function main() {
    var robotId = _G()
    Log("Current live trading robotId:", robotId)

    connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
    if (!connPub) {
        Log("Communication failure!")
        return 
    }

    connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
    if (!connSub) {
        Log("Communication failure!")
        return 
    }

    while (true) {
        connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
        var msgRead = connSub.read(10000)
        if (msgRead) {
            Log("msgRead:", msgRead)
        }

        LogStatus(_D())
        Sleep(10000)
    }
}

function onexit() {
    connPub.close()
    connSub.close()
}

Ces deux stratégies sont presque les mêmes, sauf qu'elles publient et s'abonnent l'une à l'autre, et que les sujets abonnés, les sujets publiés et les informations publiées sont différents.

Prenons par exemple la stratégie B:

    1. Utilisez leDial()fonction pour créer un objet serveur de connexion clientconnPubpour la publication de messages sur le sujet:

Je ne sais pas comment faire.127.0.0.1:4222?topic=pubRobotB”)

La chaîne de paramètres de la fonction Dial commence parnats://Le protocole NATS est utilisé pour la communication.adminest l'information de vérification simpleauth adminLe caractère @ est utilisé pour séparer le contenu suivant. Ensuite, il y a l'adresse du service et le port127.0.0.1:4222Enfin, il y a le thème de la publication et de l'abonnement:topic=pubRobotBNotez que le symbole ? est utilisé pour le séparer de l'adresse précédente.

    1. Utilisez leDial()fonction pour créer un objet serveur de connexion clientconnSubpour l'abonnement au message de sujet:

Je ne sais pas comment faire.127.0.0.1:4222?topic=pubRobotA”)

La seule différence esttopic=pubRobotA, parce que nous avons besoin de souscrire au sujetpubRobotAoù la stratégie A envoie des informations.

La création et l'utilisation d'objets de connexion d'abonnement et de publication dans la stratégie A sont les mêmes que celles décrites ci-dessus.

  • La stratégie A fonctionne

img

  • La stratégie B fonctionne

img

De cette façon, un exemple simple d'application de protocole NATS est mis en œuvre dans lequel le trading en direct A et le trading en direct B s'abonnent et publient des messages pour communiquer entre eux.

Protocole amqp

amqp Protocole à la file d'attente

Dans la communication asynchrone, le message n'atteint pas le récepteur immédiatement, mais il est stocké dans un conteneur. Lorsque certaines conditions sont remplies, le message est envoyé au récepteur par le conteneur. Ce conteneur est la file d'attente du message. Pour remplir cette fonction, les deux parties et le conteneur et ses composants doivent respecter des accords et des règles unifiés.

Chaque protocole a ses propres caractéristiques, vous pouvez consulter les documents et les documents spécifiques, qui ne seront pas détaillés ici.

Déployer le serveur de protocole amqp:

Le nom de l'hôte de mon lapin est le suivant:

Lors du déploiement d'une image docker, elle téléchargera et déploiera automatiquement, et lorsqu'elle sera terminée, elle affichera:

2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms

Une fois l'image du serveur déployée, écrivez un exemple de test:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("Current live trading robotId:", robotId)

    conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
    if (!conn) {
        Log("Communication failure!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // Read
        Log("read:", conn.read(1000), "#FF0000")
        
        // Write
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("Write a message to testQueue:", msg)

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("close conn")
}

Lorsque vous utilisez la file d'attente du protocole AMQP, veuillez noter que les messages publiés persisteront dans la file d'attente. Par exemple, si nous exécutons le code d'exemple ci-dessus, 10 messages seront écrits dans la file d'attente. Ensuite, lorsque nous l'exécutons une deuxième fois, nous pouvons constater que le premier message écrit sera lu à nouveau lors de la lecture. Comme indiqué sur la figure:

img

Nous pouvons voir que les deux messages de journal pointés par les flèches rouges dans la capture d'écran ont des temps incohérents. La raison en est que le message rouge est celui qui a été lu et écrit à la file d'attente lorsque le code de stratégie a été exécuté pour la première fois.

Par exemple, après le redémarrage de la stratégie, les données de marché enregistrées peuvent encore être obtenues à partir de la file d'attente pour le calcul de l'initialisation et d'autres opérations.

Le protocole kafka

Apache Kafka est un entrepôt de données distribué optimisé pour ingérer et traiter les données en streaming en temps réel. Les données en streaming sont des données générées en continu par des milliers de sources de données, envoyant souvent des enregistrements de données simultanément.

Kafka fournit trois fonctions principales à ses utilisateurs:

  • Publication et abonnement à des flux de disques
  • Stocker efficacement un flux d'enregistrements dans l'ordre dans lequel ils ont été générés
  • Traitement des flux d'enregistrements en temps réel

Kafka est principalement utilisé pour créer des pipelines de données en streaming en temps réel et des applications qui s'adaptent aux flux de données.

Publication et abonnement des messages

Déployer l'image du docker du proxy Kafka:

docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
        -e KAFKA_CFG_NODE_ID=0 \
        -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
        -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
        -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
        -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
        -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
        -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
        bitnami/kafka:latest

Test à l'aide du code de test:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("Current live trading robotId:", robotId)

    conn = Dial("kafka://localhost:9092/test_topic")
    if (!conn) {
        Log("Communication failure!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // Write
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("Write a message to testQueue:", msg)

        // Read
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("close conn")
}

Examinons comment utiliser le protocole Kafka pour publier et souscrire des messages dans la fonction Dial.

Dial("kafka://localhost:9092/test_topic")

Comme pour les autres protocoles, la première partie est le nom du protocole, suivi de l'adresse d'écoute:localhost:9092. Utilisez ensuite le symbole / comme séparateur, suivi du sujet d'abonnement/publication.test_topic.

Résultats des tests:

img


Plus de