[TOC]
Angesichts der rasanten Entwicklung der Finanzmärkte und der Popularität des quantitativen Handels verlassen sich immer mehr Händler beim Handel auf automatisierte Strategien. Dabei kommt der Kommunikation und Abstimmung der Strategien eine besondere Bedeutung zu. FMZ (Quantitative Trading Platform) unterstützt Händler bei der nahtlosen Strategieintegration und dem Echtzeit-Datenaustausch, indem es ein effizientes Kommunikationsprotokoll zwischen realen Handelsstrategien bereitstellt.
In diesem Artikel wird das Echtzeit-Kommunikationsprotokoll von Handelsstrategien auf der FMZ-Plattform eingehend untersucht und sein Designkonzept, seine Funktionsmerkmale und seine Vorteile in der praktischen Anwendung vorgestellt. Anhand einer detaillierten Fallanalyse zeigen wir, wie Sie mit diesem Protokoll eine effiziente und stabile Strategiekommunikation erreichen und die Ausführung und Rentabilität von Handelsstrategien verbessern können.
Egal, ob Sie ein quantitativer Handelsenthusiast sind, der gerade erst mit FMZ anfängt, oder ein erfahrener professioneller Programmierer, dieser Artikel wird Ihnen wertvolle Einblicke und praktische Bedienungshandbücher liefern. Lassen Sie uns die leistungsstarken Funktionen von FMZ erkunden und erfahren, wie Sie durch effiziente Kommunikationsprotokolle eine Zusammenarbeit zwischen Strategien erreichen, die Handelseffizienz verbessern und Marktchancen nutzen können.
Diese Anforderungsszenarien demonstrieren die vielfältigen Möglichkeiten und Vorteile des FMZ Handelsstrategie Echtzeit-Kommunikationsprotokolls in der praktischen Anwendung. Durch eine effektive Kommunikation zwischen Strategien können Händler besser mit komplexen Marktumgebungen umgehen, Handelsstrategien optimieren und die Handelseffizienz und den Gewinn verbessern.
Nachdem wir die Kommunikationsanforderungen zwischen realen Festplatten verstanden haben, müssen wir überlegen, wie wir diese Anforderungen realisieren können. Es handelt sich lediglich darum, dass der reale Fall A einen Informationsaustausch mit dem realen Fall B erhofft, auch wenn die Forderung einfach erscheint. Allerdings müssen verschiedene Details mithilfe einer Reihe von Kommunikationsprotokollen vereinbart werden. FMZ hat mehrere gängige Kommunikationsprotokolle gekapselt.
mqtt / nats / amqp / kafka
Die Kommunikationsarchitektur ist:
Wenn diese Protokolle auf der FMZ-Plattform angewendet werden, kann man sie einfach als mqtt/nats/amqp/kafka verstehen. Diese Protokolle sind integriert inDial()
Verwenden Sie in der FunktionDial()
Funktionen führen Vorgänge wie die Veröffentlichung und das Abonnieren von Nachrichten aus. Diese veröffentlichten Nachrichten werden über den Protokollserver an die abonnierte reale Festplatte weitergeleitet, daher muss zuerst ein Protokollserver ausgeführt werden. Zur Vereinfachung der Demonstration verwenden wir in den folgenden Beispielen verschiedene Bereitstellungen von Protokollserver-Images.
Abschnitt zur API-Dokumentation der Wählfunktion: https://www.fmz.com/syntax-guide#fun_dial
Denken Sie daran, vor der Bereitstellung des Docker-Image zuerst die Docker-Software zu installieren.
Lassen Sie uns als Nächstes die von FMZ unterstützten Kommunikationsprotokollanwendungen erkunden und üben.
MQTT (Message Queuing Telemetry Transport) ist ein leichtes Nachrichtenübertragungsprotokoll, das sich besonders für Netzwerkumgebungen mit geringer Bandbreite, hoher Latenz oder unzuverlässigen Netzwerkumgebungen eignet. Es wurde 1999 von Andy Stanford-Clark und Arlen Nipper von IBM vorgeschlagen und wurde später zum ISO-Standard (ISO/IEC PRF 20922).
Die Hauptfunktionen des MQTT-Protokolls: Publish/Subscribe-Modus
Da wir zum Bereitstellen des MQTT-Proxyservers das Docker-Image (Eclipse-Mosquitto-Image) einer Software verwenden, die das MQTT-Protokoll unterstützt, haben wir Docker im Voraus installiert und gehen später nicht näher darauf ein.
Bevor wir den Befehl zum Bereitstellen des Images ausführen, müssen wir eine Proxyserver-Konfigurationsdatei schreibenmosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Führen Sie dann den Bereitstellungsbefehl aus:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Nachdem das Proxy-Server-Image ausgeführt wurde, wird die folgende Anzeige angezeigt:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Dann können wir die Strategie testen und in die Praxis umsetzen.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Die Dial-Funktion wird im Strategiecode hauptsächlich wie folgt verwendet:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Der String-Parameter der Dial-Funktion beginnt mitmqtt://
Es handelt sich um den Protokollnamen, gefolgt von der Abhöradresse und dem Port. Auf das Symbol “?” folgt der Name des Abonnement-/Veröffentlichungsthemas. Der hier getestete Themenname lautet:test_topic
。
Die obige Strategie veröffentlicht und abonniert gleichzeitig ein Thema. Der laufende Test sieht wie in der Abbildung dargestellt aus:
Sie können auch zwei reale Festplatten verwenden, um sich gegenseitig zu abonnieren und Themeninformationen zu veröffentlichen. Wir verwenden ein solches Beispiel im Praxisabschnitt des NATS-Protokolls und werden diese Methode in anderen Protokollen nicht wiederholen.
Das NATS-Protokoll ist ein einfaches, textbasiertes Publish/Subscribe-Protokoll. Der Client stellt eine Verbindung zu gnatsd (NATS-Server) her und kommuniziert mit gnatsd. Die Kommunikation basiert auf normalen TCP/IP-Sockets und definiert eine sehr kleine Anzahl von Operationen. Ein Zeilenumbruch zeigt die Beendigung an. Im Gegensatz zu herkömmlichen Nachrichtensystemen, die binäre Nachrichtenformate verwenden, vereinfacht das textbasierte NATS-Protokoll die Client-Implementierung und kann problemlos in einer Vielzahl von Programmiersprachen oder Skriptsprachen implementiert werden.
Jedes Protokoll hat seine eigenen Merkmale. Sie können auf die spezifischen Dokumente und Materialien verweisen, auf die hier nicht näher eingegangen wird.
Stellen Sie den NATS-Protokollserver bereit:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Dieser Docker-Befehl lädt das NAT-Image automatisch herunter und führt es aus. Port 4222 ist der Port, auf den der Client zugreifen möchte. Nachdem das Image bereitgestellt wurde, wird auf Port 8222 ein HTTP-Monitor geöffnet.
Listening for client connections on 0.0.0.0:4222
Server is ready
Das NAT-Server-Image wird gestartet und lauscht auf Port 4222.
Wir müssen zwei Strategien erstellen (echtes Trading), nennen wir sie vorerst Strategie A und Strategie B. Die Codes dieser beiden Strategien sind grundsätzlich gleich. Geschrieben in Javascript, der am einfachsten zu verwendenden Sprache auf der FMZ-Plattform.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Diese beiden Strategien sind grundsätzlich gleich, mit der Ausnahme, dass sie sich gegenseitig veröffentlichen und abonnieren und dass die abonnierten Themen, die veröffentlichten Themen und die veröffentlichten Informationen unterschiedlich sind.
Nehmen wir Strategie B als Beispiel:
Dial()
Funktion erstellt ein Client-VerbindungsserverobjektconnPub
, wird zum Veröffentlichen von Themennachrichten verwendet:var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
Der Parameterstring der Dial-Funktion beginnt mitnats://
Zeigt an, dass das NATS-Protokoll für die Kommunikation verwendet wird.admin
Es handelt sich um eine einfache Überprüfungsinformation, die beim Bereitstellen des Docker-Image festgelegt wirdauth admin
, verwenden Sie das Zeichen “@”, um den folgenden Inhalt zu trennen, und dann die Serviceadresse und den Port127.0.0.1:4222
und schließlich das Publizieren/Abonnieren-Thema:topic=pubRobotB
Beachten Sie, dass zwischen der vorherigen Adresse ein „?“-Symbol steht.
Dial()
Funktion erstellt ein Client-VerbindungsserverobjektconnSub
, wird für das Abonnement von Themennachrichten verwendet:var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
Der einzige Unterschiedtopic=pubRobotA
Anders, weil Sie das Thema abonnieren müssen, an das Strategie A Informationen sendetpubRobotA
。
Die Erstellung und Verwendung von Abonnement- und Veröffentlichungsverbindungsobjekten in Strategie A sind dieselben wie oben beschrieben.
Auf diese Weise wird ein einfaches Beispiel einer NATS-Protokollanwendung implementiert, bei der die reale Festplatte A und die reale Festplatte B Nachrichten abonnieren und veröffentlichen, um miteinander zu kommunizieren.
Bei der asynchronen Kommunikation erreicht die Nachricht den Empfänger nicht sofort, sondern wird in einem Container gespeichert. Wenn bestimmte Bedingungen erfüllt sind, wird die Nachricht vom Container an den Empfänger gesendet. Dieser Container ist die Nachrichtenwarteschlange. Um diese Funktion abzuschließen , müssen beide Parteien Der Container und seine Komponenten müssen einheitliche Konventionen und Regeln einhalten. AMQP ist ein solches Protokoll. Sowohl der Absender als auch der Empfänger von Nachrichten können durch die Einhaltung dieses Protokolls eine asynchrone Kommunikation erreichen. Dieses Protokoll gibt das Nachrichtenformat und ihre Funktionsweise an.
Jedes Protokoll hat seine eigenen Merkmale. Sie können auf die spezifischen Dokumente und Materialien verweisen, auf die hier nicht näher eingegangen wird.
Stellen Sie den AMQP-Protokollserver bereit:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Beim Bereitstellen eines Docker-Images wird es automatisch heruntergeladen und bereitgestellt. Nach Abschluss wird Folgendes angezeigt:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Nachdem das Server-Image bereitgestellt wurde, schreiben Sie einen Beispieltest:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Beachten Sie bei Verwendung der AMQP-Protokollwarteschlange, dass die veröffentlichten Nachrichten in der Warteschlange verbleiben. Lassen Sie uns beispielsweise den obigen Beispielcode einmal ausführen. 10 Nachrichten werden in die Warteschlange geschrieben. Wenn wir es dann zum zweiten Mal ausführen, können wir feststellen, dass beim Lesen die beim ersten Mal geschriebenen Informationen erneut gelesen werden. Wie in der Abbildung gezeigt:
Sie können sehen, dass die beiden Protokollmeldungen, auf die im Screenshot die roten Pfeile zeigen, inkonsistente Zeiten aufweisen. Der Grund dafür ist, dass die rote Meldung diejenige ist, die gelesen und in die Warteschlange geschrieben wurde, als der Strategiecode zum ersten Mal ausgeführt wurde.
Basierend auf dieser Funktion können einige Anforderungen erfüllt werden. Beispielsweise können nach dem Neustart der Strategie die aufgezeichneten Marktdaten weiterhin aus der Warteschlange für Initialisierungsberechnungen und andere Vorgänge abgerufen werden.
Apache Kafka ist ein verteilter Datenspeicher, der für die Aufnahme und Verarbeitung von Streaming-Daten in Echtzeit optimiert ist. Beim Streaming-Daten handelt es sich um Daten, die kontinuierlich von Tausenden von Datenquellen generiert werden, wobei Datensätze häufig gleichzeitig gesendet werden. Die Streaming-Plattform muss diese kontinuierlich fließenden Daten verarbeiten und sie Schritt für Schritt nacheinander abarbeiten.
Kafka bietet seinen Benutzern drei Hauptfunktionen:
Kafka wird hauptsächlich zum Erstellen von Echtzeit-Streaming-Datenpipelines und Anwendungen verwendet, die sich an Datenströme anpassen. Es kombiniert Messaging-, Speicher- und Stream-Verarbeitungsfunktionen, um sowohl historische als auch Echtzeitdaten zu speichern.
Stellen Sie das Docker-Image des Kafka-Proxys bereit:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Testen Sie mit dem Testcode:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Sehen wir uns an, wie das Kafka-Protokoll zum Veröffentlichen und Abonnieren von Nachrichten in der Dial-Funktion verwendet wird.
Dial("kafka://localhost:9092/test_topic")
Wie bei mehreren anderen Protokollen ist der erste Teil der Protokollname. Anschließend folgt die Höradresse:localhost:9092
. Dann verwenden Sie das Symbol “/” als Trennzeichen, gefolgt vom Thema des Abonnements/der Veröffentlichung. Hier wird das Testthema auftest_topic
。
Testergebnisse: