[TOC]
Mit der rasanten Entwicklung der Finanzmärkte und der Popularität des quantitativen Handels verlassen sich immer mehr Händler auf automatisierte Strategien für den Handel. In diesem Prozess sind Kommunikation und Koordination zwischen Strategien besonders wichtig.
In diesem Artikel wird das Live-Handelskommunikationsprotokoll von Handelsstrategien in der FMZ-Plattform untersucht, sein Designkonzept, seine Funktionsmerkmale und Vorteile in praktischen Anwendungen vorgestellt.
Egal, ob Sie ein quantitativer Handelsenthusiast sind, der gerade erst mit FMZ beginnt oder ein erfahrener professioneller Programmierer, dieser Artikel wird Ihnen wertvolle Einblicke und praktische Betriebsführer geben.
Diese Nachfrage-Szenarien zeigen die verschiedenen Möglichkeiten und Vorteile des FMZ-Handelsstrategien-Live-Handelskommunikationsprotokolls in praktischen Anwendungen. Durch effektive Kommunikation zwischen den Strategien können Händler besser mit komplexen Marktumgebungen umgehen, Handelsstrategien optimieren und die Handelseffizienz und -gewinne verbessern.
Nach dem Verständnis der Kommunikationsanforderungen zwischen Live-Handeln müssen wir überlegen, wie diese Anforderungen umgesetzt werden können. Es ist nichts anderes als Live-Handel A, der hofft, Informationen mit Live-Handel B auszutauschen. Obwohl die Anforderungen einfach erscheinen, gibt es verschiedene Details, die bei der Verwendung einer Reihe von Kommunikationsprotokollen vereinbart werden müssen. FMZ hat mehrere beliebte Kommunikationsprotokolle zusammengefasst.
mqtt / nats / amqp / kafka
Die Kommunikationsarchitektur ist:
Server (Proxy).
Ein Server, der ein Kommunikationsprotokoll ausführt, ist erforderlich, um Nachrichten zwischen Abonnenten und Publishern weiterzuleiten. Dieser Server kann lokal auf dem Docker
Kunden (Abonnent, Verleger). Das Strategie-Live-Trading-Programm auf FMZ kann als Client eines Kommunikationsprotokolls verstanden werden.
Bei der Anwendung dieser Protokolle auf der FMZ-Plattform kann einfach verstanden werden, daß die mqtt/nats/amqp/kafkap-Protokolle in die FMZ-Plattform integriert sind.Dial()
Funktion undDial()
Diese veröffentlichten Nachrichten werden dem abonnierten Live-Trading über den Protokollserver weitergeleitet, so dass zuerst ein Protokollserver ausgeführt werden muss.
Wählen Sie in der API-Dokumentation:https://www.fmz.com/syntax-guide#fun_dial
Bevor Sie das Docker-Image bereitstellen, müssen Sie die Docker-Software installieren.
Als nächstes wollen wir die von FMZ unterstützten Kommunikationsprotokollanwendungen erforschen und üben.
MQTT (Message Queuing Telemetry Transport) ist ein leichtes Nachrichtenübertragungsprotokoll, das besonders für Netzwerkumgebungen mit geringer Bandbreite, hoher Latenz oder unzuverlässigem Netzwerk geeignet ist. Es wurde 1999 von Andy Stanford-Clark und Arlen Nipper von IBM vorgeschlagen und wurde später zu einem ISO-Standard (ISO/IEC PRF 20922).
Hauptmerkmale des MQTT-Protokolls: Veröffentlichungs-/Abonnementmodus
Veröffentlichung: Der Nachrichtenproduzent sendet die Nachricht an das Thema.
Abonnement: Ein Nachrichtenverbraucher abonniert ein Thema, das ihn interessiert, und erhält damit Nachrichten, die zu diesem Thema veröffentlicht werden.
Broker: MQTT verwendet einen Nachrichtenbroker als Vermittler zur Weiterleitung von Nachrichten und sorgt so für eine Entkopplung zwischen Publishern und Abonnenten.
Da wir das Docker-Image (Eclipse-Mosquitto-Image) einer Software, die das MQTT-Protokoll unterstützt, zum Bereitstellen des MQTT-Proxy-Servers verwenden, haben wir Docker im Voraus installiert und werden später nicht näher eingehen.
Bevor Sie den Befehl ausführen, um das Bild bereitzustellen, müssen wir eine Proxy-Server-Konfigurationsdatei schreibenmosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
Dann führen Sie den Befehl "Deployment" aus:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Nachdem das Proxy-Server-Image ausgeführt wurde, wird Folgendes angezeigt:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Dann können wir die Strategie testen, um sie in die Praxis umzusetzen.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Der Hauptzweck der Dial-Funktion im Strategiecode ist:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Der Stringparameter der Dial-Funktion beginnt mitmqtt://
, der der Protokollname ist, gefolgt von der Höradresse und dem Port. Das Symbol test_topic
.
Die obige Strategie veröffentlicht und abonniert gleichzeitig ein Thema.
Wir können auch zwei Live-Tradings verwenden, um sich gegenseitig zu abonnieren und Themeninformationen zu veröffentlichen.
Das NATS-Protokoll ist ein einfaches, textbasiertes Publish/Subscribe-Stil-Protokoll. Der Client verbindet sich mit gnatsd (NATS-Server) und kommuniziert mit gnatsd. Die Kommunikation basiert auf normalen TCP/IP-Sockeln und definiert einen sehr kleinen Satz von Operationen. Newline zeigt die Beendigung an. Im Gegensatz zu traditionellen Nachrichtenkommunikationssystemen, die binäre Nachrichtenformate verwenden, macht das textbasierte NATS-Protokoll die Clientimplementierung sehr einfach und kann leicht in einer Vielzahl von Programmiersprachen oder Skriptsprachen implementiert werden.
Jedes Protokoll hat seine eigenen Merkmale. Sie können sich auf die spezifischen Dokumente und Materialien beziehen, die hier nicht näher erläutert werden.
Bereitstellung des NATS-Protokollservers:
Docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Dieser Docker-Befehl lädt automatisch das Nats-Image herunter und führt es aus, und Port 4222 ist der Port, auf den der Client zugreifen muss.
Listening for client connections on 0.0.0.0:4222
Server is ready
Das Nats-Server-Bild läuft und hört auf Port 4222.
Wir müssen zwei Strategien (Live-Handel) erstellen, nennen wir sie Strategie A und Strategie B. Die Codes dieser beiden Strategien sind grundsätzlich gleich. Sie sind in Javascript geschrieben, die die am einfachsten zu verwendende Sprache auf der FMZ-Plattform ist.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Diese beiden Strategien sind fast identisch, außer dass sie sich gegenseitig veröffentlichen und abonnieren, und die abonnierten Themen, veröffentlichten Themen und veröffentlichten Informationen sind unterschiedlich.
Nehmen wir Strategie B als Beispiel:
Dial()
Funktion zum Erstellen eines ClientverbindungsserverobjektsconnPub
für die Veröffentlichung von Themennachrichten:Ich bin nicht derjenige, der das Problem hat, aber ich bin derjenige, der das Problem hat.
Die Parameterfolge der Funktion Dial beginnt mitnats://
Die Daten werden mit dem NATS-Protokoll verknüpft, das für die Kommunikation verwendet wird.admin
ist die einfache Überprüfungsinformationauth admin
Das Zeichen 127.0.0.1:4222
Schließlich gibt es das Thema Veröffentlichen/Abonnieren:topic=pubRobotB
Beachten Sie, dass das
Dial()
Funktion zum Erstellen eines ClientverbindungsserverobjektsconnSub
für das Abonnieren von Themennachrichten:Ich bin nicht derjenige, der das Problem hat, aber ich bin derjenige, der das Problem hat.
Der einzige Unterschied ist:topic=pubRobotA
, weil wir das Thema abonnieren müssenpubRobotA
wo Strategie A Informationen sendet.
Die Erstellung und Nutzung von Abonnement- und Veröffentlichungsverbindungsgegenständen in Strategie A ist wie oben beschrieben.
Auf diese Weise wird ein einfaches Beispiel für die Anwendung des NATS-Protokolls implementiert, bei dem Live-Trading A und Live-Trading B Nachrichten abonnieren und veröffentlichen, um miteinander zu kommunizieren.
Bei der asynchronen Kommunikation erreicht die Nachricht den Empfänger nicht sofort, sondern wird in einem Container gespeichert. Wenn bestimmte Bedingungen erfüllt sind, wird die Nachricht vom Container an den Empfänger gesendet. Dieser Container ist die Nachrichtenwarteschlange. Um diese Funktion zu erfüllen, müssen beide Parteien und der Container und seine Komponenten einheitliche Vereinbarungen und Regeln einhalten. AMQP ist ein solches Protokoll. Sowohl der Absender als auch der Empfänger der Nachricht können eine asynchrone Kommunikation durch Einhaltung dieses Protokolls umsetzen. Dieses Protokoll legt das Format und die Arbeitsweise der Nachricht fest.
Jedes Protokoll hat seine eigenen Merkmale. Sie können sich auf die spezifischen Dokumente und Materialien beziehen, die hier nicht näher erläutert werden.
Bereitstellung des amqp-Protokollservers:
Docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Bei der Bereitstellung eines Docker-Images wird es automatisch heruntergeladen und bereitgestellt, und wenn es abgeschlossen ist, wird angezeigt:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Nach der Bereitstellung des Serverimages schreiben Sie ein Testbeispiel:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Bei der Verwendung der AMQP-Protokoll-Warteschlange beachten Sie bitte, dass die veröffentlichten Nachrichten in der Warteschlange bestehen bleiben. Wenn wir beispielsweise den obigen Beispielcode ausführen, werden 10 Nachrichten in die Warteschlange geschrieben. Wenn wir es dann ein zweites Mal ausführen, können wir feststellen, dass die erste geschriebene Nachricht beim Lesen erneut gelesen wird. Wie in der Abbildung gezeigt:
Wir können sehen, dass die beiden Log-Nachrichten, die durch die roten Pfeile im Screenshot gezeigt werden, inkonsistente Zeiten haben. Der Grund dafür ist, dass die rote Nachricht die ist, die in die Warteschlange gelesen und geschrieben wurde, als der Strategiecode zum ersten Mal ausgeführt wurde.
Auf der Grundlage dieser Eigenschaft können einige Anforderungen erfüllt werden. Zum Beispiel können nach dem Neustart der Strategie die erfassten Marktdaten noch aus der Warteschlange für die Berechnung der Initialisierung und andere Operationen gewonnen werden.
Apache Kafka ist ein verteiltes Datenspeicher, das optimiert ist, um Streaming-Daten in Echtzeit aufzunehmen und zu verarbeiten. Streaming-Daten sind Daten, die kontinuierlich von Tausenden von Datenquellen generiert werden und oft gleichzeitig Datensätze senden. Eine Streaming-Plattform muss diesen kontinuierlichen Datenzufluss handhaben und sie sequenziell und inkrementell verarbeiten.
Kafka bietet seinen Nutzern drei Hauptfunktionen:
Kafka wird hauptsächlich zum Erstellen von Echtzeit-Streaming-Datenpipelines und Anwendungen verwendet, die sich an Datenströme anpassen.
Das Docker-Image des Kafka-Proxys bereitstellen:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Prüfung unter Verwendung des Prüfcodes:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Lassen Sie uns einen Blick darauf werfen, wie Sie das Kafka-Protokoll zum Veröffentlichen und Abonnieren von Nachrichten in der Dial-Funktion verwenden.
Dial("kafka://localhost:9092/test_topic")
Wie bei anderen Protokollen ist der erste Teil der Protokollname, gefolgt von der Höradresse:localhost:9092
. Verwenden Sie dann das Symbol test_topic
.
Testergebnisse: