[TOC]
With the rapid development of financial markets and the popularity of quantitative trading, more and more traders are relying on automated strategies for trading. In this process, communication and coordination between strategies are particularly important. FMZ (Quantitative Trading Platform) helps traders achieve seamless pairing of strategies and real-time data sharing by providing efficient trading strategies with real-time communication protocols.
This article will delve into the real-world communication protocols for trading strategies in the FMZ platform, its design concepts, functional features, and advantages in practical applications. We will demonstrate through detailed case studies how to use this protocol to achieve efficient, stable strategic communication and improve the execution and earnings performance of trading strategies.
Whether you are a new enthusiast of quantitative trading in FMZ or an experienced professional programmer, this article will provide you with valuable insights and practical operating guides. Let's explore the powerful features of FMZ together and learn how to achieve synergy between strategies through efficient communication protocols, improve trading efficiency and capture market opportunities.
These demand scenarios demonstrate the many possibilities and advantages of FMZ trading strategies in real-world applications. Through effective strategic communication, traders can better respond to complex market environments, optimize trading strategies, and improve trading efficiency and returns.
After understanding the communication needs between disks, it is necessary to consider how to achieve these needs. It is nothing more than the fact that disk A wants to interact with disk B, although the requirements seem simple. However, there are various details that need to be agreed upon using a set of communication protocols, and FMZ has packaged several more popular communication protocols.
mqtt / nats / amqp / kafka
The communication architecture is:
When implemented on the FMZ platform, these protocols are simply understood as mqtt/nats/amqp/kafka.Dial()
In the function, useDial()
The function performs messages, subscriptions, etc. These messages are sent via the protocol's server-side agent (relay) to the subscriber's disk, so a protocol server must first be run. For ease of demonstration, we use various protocol server-side mirror deployments in the following examples.
The section of the API documentation where the Dial function is located:https://www.fmz.com/syntax-guide#fun_dial
Before deploying the docker mirror, remember to install the docker software first.
Next, let's explore and practice the communication protocols supported by FMZ.
MQTT (Message Queuing Telemetry Transport) is a lightweight message transport protocol specifically designed for low bandwidth, high latency, or unreliable network environments. It was proposed by Andy Stanford-Clark and Arlen Nipper of IBM in 1999 and later became an ISO standard (ISO/IEC PRF 20922).
Key features of the MQTT protocol: publish/subscribe mode
Since we are deploying the MQTT proxy server using a docker mirror (eclipse-mosquitto mirror) that supports the MQTT protocol, we install the docker in advance and do not discuss the follow-up.
We need to write a proxy server profile before we can run the command to deploy the mirror.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Then execute the deployment command:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
The proxy server mirror shows the following when it is up and running:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Then we can test the strategy and practice it.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
The main use of the Dial function in the strategy code is:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial has a string parameter that starts withmqtt://
is the name of the protocol, followed by the listening address, port, and the symbol "?" followed by the name of the Subscribe/Publish topic, where the test topic is called:test_topic
。
The above strategies are used to publish, subscribe, and run tests on a topic:
It is also possible to use two disks to subscribe to each other, to publish topical information, an example of which we use in the practice section of the nats protocol, which is not described in this way in other protocols.
The NATS protocol is a simple, text-based publish/subscribe-style protocol. The client connects to gnatsd (NATS server) and communicates with gnatsd, communicating based on the common TCP/IP suffix, and defining a very small set of operations, switching to termination. Unlike the traditional, text-based NATS protocol, which uses a binary message format, the text-based NATS protocol allows the client to easily choose from a variety of programming languages or scripting languages to implement.
Each agreement has its own characteristics, and you can consult specific documents and information, which are not described here.
Deploying the Nats protocol server:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
This docker command automatically downloads and runs the nats mirror, port 4222 for the port the client is accessing. Once the mirror is deployed, there will also be an http monitor open on port 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
The nats server side mirror started working, listening to port 4222.
We need to create two policies (real disk) and for now, let's call them Policy A and Policy B, both of which are basically the same code.
Strategy A
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Strategy B
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
The two strategies are basically the same, except that they are different in terms of posting, subscribing, subscribing topics, posting topics, posting information.
The following is an example of strategy B:
1, useDial()
A function that creates client-to-server objectsconnPub
In the meantime, I'm going to share some of my thoughts on this topic:
var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
The parametric string of the Dial function, starting withnats://
It means that the Nats protocol is used for communication, and thenadmin
is a simple checkout message set when deploying a docker mirrorauth admin
, using the character "@127.0.0.1:4222
And finally, the topic of publishing/subscribe:topic=pubRobotB
Note the spacing between the previous addresses with the "What?" symbol.
2, useDial()
A function that creates client-to-server objectsconnSub
This is the first time I've seen this post.
var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
The only difference istopic=pubRobotA
This is because it requires a subscription to Strategy A to send messages.pubRobotA
。
The creation and use of subscription, publishing and connection objects in Policy A is the same as described above.
Strategy A is running.
Strategy B is running.
This enables a simple Nats protocol application where disk A and disk B subscribe to each other, send messages, and communicate.
In asynchronous communication, the message does not reach the recipient immediately, but is stored in a container, which, when certain conditions are met, is sent by the container to the recipient, the container is the message queue, and to accomplish this function requires both parties and the container and its various components to adhere to uniform agreements and rules. AMQP is a protocol that allows for asynchronous communication between the sending and receiving parties. This protocol determines the format and functioning of the message.
Each agreement has its own characteristics, and you can consult specific documents and information, which are not described here.
Deploying the amqp protocol server:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
When you deploy the docker mirror, it automatically downloads the deployment and displays:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
After the server side mirror is deployed, write a test example test:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
It is important to note that the queue using the amqp protocol will have a message that will remain in the queue after the release, for example, if we run the example code above once; 10 messages will be written to the queue; and then the second time we run it, the message that was written the first time will be read again when it is found to be read.
You can see in the screenshot that the two log messages pointed to by the red arrow are not timed the same, because the red one is the read message that was written to the queue when the policy code was first run.
Some requirements can be realized based on this feature, for example: after the policy disk is restarted, it is still possible to retrieve the recorded transaction data from the queue for operations such as initialization calculations.
Apache Kafka is a distributed data store that is optimized for real-time extraction and processing of stream data. Stream data refers to data that is continuously generated from thousands of data sources, and data records can often be sent at the same time. The stream platform needs to process this continuous flow of data, in a step-by-step order.
Kafka offers three main features to its users:
Kafka is primarily used to build real-time data stream pipelines and applications that adapt to data streams. It combines message forwarding, storage and stream processing capabilities to store historical and real-time data.
Docker image of the Kafka agent deployed:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Testing with test code:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Let's see how to use the Kafka protocol to publish and subscribe to messages in Dial functions.
Dial("kafka://localhost:9092/test_topic")
As with several other protocols, the first part is the protocol name; then follow the listening address:localhost:9092
Use the symbol "/" as an interval and write the Subscribe/Post topic next, where the test topic is set totest_topic
。
The test results: