[TOC]
Mit der rasanten Entwicklung der Finanzmärkte und der Verbreitung von quantitativen Transaktionen werden immer mehr Händler auf automatisierte Strategien angewiesen. Kommunikation und Koordination zwischen den Strategien sind in diesem Prozess besonders wichtig. FMZ (Quantitativen Handelsplattformen) helfen Händlern, ihre Strategien nahtlos zu verknüpfen und in Echtzeit Daten zu teilen, indem sie effiziente Handelsstrategien mit realen Kommunikationsprotokollen bereitstellen.
In diesem Artikel werden wir die Konzepte, Funktionsmerkmale und Vorteile in der Praxis der FMZ-Plattform im Detail analysieren. Wir werden zeigen, wie wir diese Plattform nutzen können, um effiziente und stabile strategische Kommunikation zu erzielen und die Ausführung und Ertragsperformance von Handelsstrategien zu verbessern.
Egal, ob Sie ein neuer Quantitative Trading-Fan von FMZ sind oder ein erfahrener professioneller Programmierer, dieser Artikel wird Ihnen wertvolle Einblicke und praktische Handbücher geben.
Diese Bedarfsszenarien zeigen die Möglichkeiten und Vorteile von FMZ-Trading-Strategie-Real-Disk-Kommunikations-Protokollen in der Praxis. Durch effektive Strategie-Intercommunication können Händler besser auf die komplexe Marktumgebung reagieren, ihre Handelsstrategien optimieren, ihre Handelseffizienz und ihre Gewinne steigern.
Nach dem Verständnis der Kommunikationsbedürfnisse zwischen den Festplatten ist es wichtig zu überlegen, wie diese erreicht werden können. Es ist nichts anderes, als dass Festplatte A Informationen mit Festplatte B interagieren möchte, obwohl die Bedürfnisse auf den ersten Blick einfach sind.
mqtt / nats / amqp / kafka
Die Kommunikationsarchitektur ist:
Bei der Anwendung dieser Protokolle auf der FMZ-Plattform kann man sie einfach als mqtt / nats / amqp / kafka verstehen.Dial()
In der Funktion verwendetDial()
Die Funktion erfolgt durch Messages, Abonnements und andere Vorgänge. Diese Messages werden über die Server-End-Agenten des Protokolls an die Festplatte des Abonnenten weitergeleitet, so dass zuerst ein Server des Protokolls ausgeführt wird.
Der Abschnitt der API-Dokumentation, in dem die Dial-Funktion befindet:https://www.fmz.com/syntax-guide#fun_dial
Denken Sie daran, die Docker-Software zu installieren, bevor Sie das Docker-Mirror bereitstellen.
Lassen Sie uns gemeinsam die praktischen Anwendungen des FMZ-unterstützten Kommunikationsprotokolls erforschen.
MQTT (Message Queuing Telemetry Transport) ist ein leichter Messaging-Protokoll, das speziell für Netzwerkumgebungen mit geringer Bandbreite, hoher Verzögerung oder unzuverlässigem Netzwerk verwendet wird. Es wurde 1999 von Andy Stanford-Clark und Arlen Nipper von IBM vorgeschlagen und wurde später zum ISO-Standard (ISO/IEC PRF 20922) gemacht.
Hauptmerkmale des MQTT-Protokolls: Veröffentlichungs-/Abonnementmodell
Da wir den MQTT-Proxy-Server mit einem Docker-Objektiv (eclipse-mosquitto-Objektiv), das den MQTT-Protokoll unterstützt, implementieren, ist die Installation von Docker im Voraus unerheblich.
Bevor wir den Befehl zum Deployment Mirror ausführen, müssen wir ein Proxy-Server-Profil schreiben.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Dann führen Sie den Deployment-Befehl aus:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Die Proxy-Server-Images zeigen, wie es aussieht, wenn sie ausgeführt werden:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Dann können wir die Strategie testen und sie in die Praxis umsetzen.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Die Dial-Funktion wird in der Strategie-Code hauptsächlich verwendet:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Die Stringparameter der Dial-Funktion beginnen mitmqtt://
Der Name des Protokols, gefolgt von der Abhöradresse, des Ports, des Symbols "?" und dem Namen des Subscribe/Publishing-Themas.test_topic
。
Die oben beschriebenen Strategien werden bei der Veröffentlichung, beim Abonnieren und beim Laufen von Tests für ein Thema wie folgt ausgeführt:
Es ist auch möglich, sich mit zwei Festplatten zu abonnieren und Themeninformationen zu veröffentlichen, wie wir in der Praxis-Sektion des Nats-Protokolls verwendet haben, die in anderen Protokollen nicht mehr beschrieben wird.
Das NATS-Protokoll ist ein einfaches, textbasiertes Veröffentlichungs-/Abonnementprotokoll. Der Client verbindet sich mit gnatsd (NATS-Server) und kommuniziert mit gnatsd.
Jede Vereinbarung hat ihre eigenen Eigenschaften, und man kann sich die spezifischen Dokumente ansehen, die hier nicht beschrieben werden.
Die Nats-Protocol-Dienstleister werden hier eingesetzt:
Docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Der Docker-Befehl wird automatisch heruntergeladen und läuft das Nats-Bild auf Port 4222 für den Zugriff auf den Client.
Listening for client connections on 0.0.0.0:4222
Server is ready
Natts Server-End-Mirror läuft und überwacht den Port 4222.
Wir müssen zwei Strategien erstellen, und zwar die Politik A und die Politik B, die grundsätzlich den gleichen Code haben.
Strategie A
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Strategie B
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Die beiden Strategien sind im Wesentlichen identisch, nur dass sie unterschiedlich sind: Posting, Subscription, Subscribe-Theme, Posting-Theme, Posting-Information.
Das ist ein Beispiel für Strategie B:
1, VerwendungDial()
Funktionen zum Erstellen von Serverobjekten für die ClientverbindungconnPub
Die Nachrichten werden unter anderem über:
Ich bin nicht derjenige, der das Problem hat.127.0.0.1:4222?topic=pubRobotB”)
Die Parameterstring der Dial-Funktion beginnt mitnats://
Das bedeutet, dass wir mit dem Nats-Protokoll kommunizieren.admin
ist eine einfache Überprüfungsnachricht, die bei der Bereitstellung des Docker Mirrors festgelegt wirdauth admin
, mit dem Zeichen "@127.0.0.1:4222
Und schließlich:topic=pubRobotB
Bitte beachten Sie, dass zwischen den Anschriften und den vorhergehenden Anschriften ein "
2. VerwendungDial()
Funktionen zum Erstellen von Serverobjekten für die ClientverbindungconnSub
Die Nachrichten sind unter anderem auf:
Ich bin nicht derjenige, der das Problem hat.127.0.0.1:4222?topic=pubRobotA”)
Der Unterschied ist nur:topic=pubRobotA
Das ist ein sehr schwieriges Thema, denn es ist eine Subscription-Strategie, um Nachrichten zu senden.pubRobotA
。
Für die Erstellung und Nutzung von Subscription, Publishing Connection Objects in Strategie A ist dies mit der oben beschriebenen Symmetrie vergleichbar.
Strategie A läuft
Strategie B läuft
Dies ermöglicht ein einfaches Nats-Protokoll-Anwendungsbeispiel für die Kommunikation zwischen Festplatten A und B.
Bei asynchroner Kommunikation kommt die Nachricht nicht sofort an den Empfänger, sondern wird in einem Container gespeichert, in dem die Nachricht nach Erfüllung bestimmter Bedingungen vom Container an den Empfänger gesendet wird. Dieser Container ist die Nachrichtenschlange. Um diese Funktion zu erfüllen, müssen beide Seiten und der Container und seine Komponenten einheitliche Vereinbarungen und Regeln einhalten.
Jede Vereinbarung hat ihre eigenen Eigenschaften, und man kann sich die spezifischen Dokumente ansehen, die hier nicht beschrieben werden.
Die amqp-Prozess-Server-Server:
Docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Wenn Sie das Docker Mirror installieren, wird das Deployment automatisch heruntergeladen und dann angezeigt:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Nach der Einführung des Server-Mirror schreiben Sie einen Testbeispiel:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Es ist wichtig zu beachten, dass Nachrichten nach der Veröffentlichung in der Warteschlange bestehen, z. B. wenn wir den Beispielcode oben einmal ausführen. Wir schreiben 10 Nachrichten an die Warteschlange.
Wie man in dem Bild sehen kann, sind die beiden Login-Nachrichten, auf die der rote Pfeil in dem Bild zeigt, nicht zeitlich übereinstimmend, weil die rote Linie die gelesenen Nachrichten ist, die bei der ersten Ausführung des Strategic-Codes in die Warteschlange geschrieben wurden.
Aufgrund dieser Eigenschaft können einige Bedürfnisse erfüllt werden, z. B. dass nach dem Neustart der Strategie-Reality-Platte noch die aufgezeichneten Marktdaten aus der Warteschlange für die Initiierung von Berechnungen und andere Operationen abgerufen werden können.
Apache Kafka ist eine verteilte Datenspeicherung, die optimiert ist, um Stromdaten in Echtzeit zu extrahieren und zu verarbeiten.
Kafka bietet seinen Nutzern drei Hauptfunktionen:
Kafka wird hauptsächlich für den Aufbau von Echtzeit-Datenströmungs-Pipelungen und -Anwendungen verwendet, die sich an Datenströme anpassen. Es kombiniert Nachrichtenübertragung, Speicherung und Stromverarbeitung, um historische und Echtzeitdaten zu speichern.
Ein Docker-Bild, das Kafka-Vertreter bereitstellt:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Test mit Testcode:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Lassen Sie uns sehen, wie Sie mit dem Kafka-Protokoll Nachrichten in der Dial-Funktion veröffentlichen und abonnieren können.
Dial("kafka://localhost:9092/test_topic")
Wie bei einigen anderen Protokolle beginnt der Name des Protokolls.localhost:9092
Dann mit dem Symbol "/" als Intervall, schreiben Sie das Thema "Abonnieren / Veröffentlichen" hinterher, wo das Testthema auf "Abonnieren / Veröffentlichen" gesetzt ist.test_topic
。
Die Ergebnisse: