The resource loading... loading...

Explore FMZ: Trading strategies and the practice of real-world communication protocols

Author: Inventors quantify - small dreams, Created: 2024-08-06 14:13:40, Updated: 2024-11-05 17:50:35

[TOC]

img

With the rapid development of financial markets and the popularity of quantitative trading, more and more traders are relying on automated strategies for trading. In this process, communication and coordination between strategies are particularly important. FMZ (Quantitative Trading Platform) helps traders achieve seamless pairing of strategies and real-time data sharing by providing efficient trading strategies with real-time communication protocols.

This article will delve into the real-world communication protocols for trading strategies in the FMZ platform, its design concepts, functional features, and advantages in practical applications. We will demonstrate through detailed case studies how to use this protocol to achieve efficient, stable strategic communication and improve the execution and earnings performance of trading strategies.

Whether you are a new enthusiast of quantitative trading in FMZ or an experienced professional programmer, this article will provide you with valuable insights and practical operating guides. Let's explore the powerful features of FMZ together and learn how to achieve synergy between strategies through efficient communication protocols, improve trading efficiency and capture market opportunities.


The demand scenario

    1. Multilateral cooperation The demand scenario: In a complex market environment, a single strategy may not be able to cope with a variety of emergencies and market changes. Traders want to run multiple strategies simultaneously, such as trend-tracking strategies, mean return strategies, and arbitrage strategies, and have real-time communication between these strategies to share market information and trading signals, thereby improving overall trading efficiency and stability.
    1. Cross-market leverage The demand scenario: Traders want to trade with leverage between different trading markets. For example, leverage the price difference between the A-Stock market and the Hong Kong stock market. When price abnormalities occur in one market, the strategy needs to notify the strategy in the other markets in a timely manner to make the corresponding buy-sell operation to capture leverage opportunities.
    1. Risk management and hedging The demand scenario: One strategy is responsible for finding and executing high-risk, high-return trades in the market, while the other strategy focuses on monitoring overall risk and executing hedging operations. To ensure that excessive losses are not incurred during high-risk trading, both strategies require real-time communication and data sharing in order to adjust positions and hedge risks in a timely manner.
    1. Distributed trading systems The demand scenario: Large exchanges want to run distributed trading systems on multiple physical servers to improve the fault tolerance and performance of the trading system. The strategy on these servers requires data synchronization and coordinated operations through communication protocols, thus ensuring the stable and efficient operation of the overall trading system.
    1. Market monitoring and early warning The demand scenario: One strategy is specifically responsible for real-time monitoring of market dynamics, and when there are significant changes in the market (such as a sudden price crash or spike), the strategy needs to promptly notify the other strategy to take corresponding countermeasures, such as placing, repositioning or raising the position, to reduce risk or seize trading opportunities.
    1. Management of combined strategies The demand scenario: Traders use a combination of strategies to manage investments in different asset classes, each focusing on a specific asset class (e.g. stocks, bonds, futures, etc.). These strategies need to be communicated and coordinated to achieve overall optimization of portfolio investments and maximize returns.

These demand scenarios demonstrate the many possibilities and advantages of FMZ trading strategies in real-world applications. Through effective strategic communication, traders can better respond to complex market environments, optimize trading strategies, and improve trading efficiency and returns.


FMZ wrapped communication protocol with Dial function

After understanding the communication needs between disks, it is necessary to consider how to achieve these needs. It is nothing more than the fact that disk A wants to interact with disk B, although the requirements seem simple. However, there are various details that need to be agreed upon using a set of communication protocols, and FMZ has packaged several more popular communication protocols.

mqtt / nats / amqp / kafka

Communication architecture

The communication architecture is:

  • The server ((agent) ⇒ the server (agent) ⇒ the server (agent) ⇒ the server (agent). A communication protocol server is required to relay messages between subscribers and publishers. The server can be deployed on the host's system (local disk communications) or a remote service (server-to-server disk communications).
  • The client ((subscriber, publisher) ‒ is the name of the user. The policy disk program on FMZ can be understood as a client of a communication protocol, the policy disk can be a publisher (pub) or a subscriber (sub).

Dial function

When implemented on the FMZ platform, these protocols are simply understood as mqtt/nats/amqp/kafka.Dial()In the function, useDial()The function performs messages, subscriptions, etc. These messages are sent via the protocol's server-side agent (relay) to the subscriber's disk, so a protocol server must first be run. For ease of demonstration, we use various protocol server-side mirror deployments in the following examples.

The section of the API documentation where the Dial function is located:https://www.fmz.com/syntax-guide#fun_dial

Before deploying the docker mirror, remember to install the docker software first.

img

Next, let's explore and practice the communication protocols supported by FMZ.


The FMZ platform is a real-time communication protocol.

The mqtt protocol

MQTT (Message Queuing Telemetry Transport) is a lightweight message transport protocol specifically designed for low bandwidth, high latency, or unreliable network environments. It was proposed by Andy Stanford-Clark and Arlen Nipper of IBM in 1999 and later became an ISO standard (ISO/IEC PRF 20922).

Key features of the MQTT protocol: publish/subscribe mode

  • Posting: News producers are sending messages to the topic.
  • Subscription: News consumers subscribe to a topic of interest and receive messages posted on that topic.
  • Mediator: MQTT uses a messaging agent (Broker) as an intermediary to forward messages, ensuring a disconnect between publisher and subscriber.

News and subscriptions

Since we are deploying the MQTT proxy server using a docker mirror (eclipse-mosquitto mirror) that supports the MQTT protocol, we install the docker in advance and do not discuss the follow-up.

We need to write a proxy server profile before we can run the command to deploy the mirror.mosquitto.conf

# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true

Then execute the deployment command:

docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

The proxy server mirror shows the following when it is up and running:

1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running

Then we can test the strategy and practice it.

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

The main use of the Dial function in the strategy code is:

Dial("mqtt://127.0.0.1:1883?topic=test_topic")

Dial has a string parameter that starts withmqtt://is the name of the protocol, followed by the listening address, port, and the symbol "?" followed by the name of the Subscribe/Publish topic, where the test topic is called:test_topic

The above strategies are used to publish, subscribe, and run tests on a topic:

img

It is also possible to use two disks to subscribe to each other, to publish topical information, an example of which we use in the practice section of the nats protocol, which is not described in this way in other protocols.


The nats agreement

The NATS protocol is a simple, text-based publish/subscribe-style protocol. The client connects to gnatsd (NATS server) and communicates with gnatsd, communicating based on the common TCP/IP suffix, and defining a very small set of operations, switching to termination. Unlike the traditional, text-based NATS protocol, which uses a binary message format, the text-based NATS protocol allows the client to easily choose from a variety of programming languages or scripting languages to implement.

Each agreement has its own characteristics, and you can consult specific documents and information, which are not described here.

Deploying the Nats protocol server:

docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin

This docker command automatically downloads and runs the nats mirror, port 4222 for the port the client is accessing. Once the mirror is deployed, there will also be an http monitor open on port 8222.

Listening for client connections on 0.0.0.0:4222
Server is ready

The nats server side mirror started working, listening to port 4222.

Communication between local device and real disk policy

We need to create two policies (real disk) and for now, let's call them Policy A and Policy B, both of which are basically the same code.

  • Strategy A

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    
  • Strategy B

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    

The two strategies are basically the same, except that they are different in terms of posting, subscribing, subscribing topics, posting topics, posting information.

The following is an example of strategy B:

  • 1, useDial()A function that creates client-to-server objectsconnPubIn the meantime, I'm going to share some of my thoughts on this topic:

    var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)

    The parametric string of the Dial function, starting withnats://It means that the Nats protocol is used for communication, and thenadminis a simple checkout message set when deploying a docker mirrorauth admin, using the character "@" spaced with the content after it, followed by the service address and port127.0.0.1:4222And finally, the topic of publishing/subscribe:topic=pubRobotBNote the spacing between the previous addresses with the "What?" symbol.

  • 2, useDial()A function that creates client-to-server objectsconnSubThis is the first time I've seen this post.

    var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)

    The only difference istopic=pubRobotAThis is because it requires a subscription to Strategy A to send messages.pubRobotA

The creation and use of subscription, publishing and connection objects in Policy A is the same as described above.

  • Strategy A is running.

    img

  • Strategy B is running.

    img

This enables a simple Nats protocol application where disk A and disk B subscribe to each other, send messages, and communicate.


The amqp protocol

queue for the amqp protocol

In asynchronous communication, the message does not reach the recipient immediately, but is stored in a container, which, when certain conditions are met, is sent by the container to the recipient, the container is the message queue, and to accomplish this function requires both parties and the container and its various components to adhere to uniform agreements and rules. AMQP is a protocol that allows for asynchronous communication between the sending and receiving parties. This protocol determines the format and functioning of the message.

Each agreement has its own characteristics, and you can consult specific documents and information, which are not described here.

Deploying the amqp protocol server:

docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management

When you deploy the docker mirror, it automatically downloads the deployment and displays:

2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms

After the server side mirror is deployed, write a test example test:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 读取
        Log("read:", conn.read(1000), "#FF0000")
        
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

It is important to note that the queue using the amqp protocol will have a message that will remain in the queue after the release, for example, if we run the example code above once; 10 messages will be written to the queue; and then the second time we run it, the message that was written the first time will be read again when it is found to be read.

img

You can see in the screenshot that the two log messages pointed to by the red arrow are not timed the same, because the red one is the read message that was written to the queue when the policy code was first run.

Some requirements can be realized based on this feature, for example: after the policy disk is restarted, it is still possible to retrieve the recorded transaction data from the queue for operations such as initialization calculations.


The Kafka agreement

Apache Kafka is a distributed data store that is optimized for real-time extraction and processing of stream data. Stream data refers to data that is continuously generated from thousands of data sources, and data records can often be sent at the same time. The stream platform needs to process this continuous flow of data, in a step-by-step order.

Kafka offers three main features to its users:

  • Post and subscribe to the record stream
  • Stores the record stream efficiently in the order in which the record is generated
  • Real-time processing of record streams

Kafka is primarily used to build real-time data stream pipelines and applications that adapt to data streams. It combines message forwarding, storage and stream processing capabilities to store historical and real-time data.

News and subscriptions

Docker image of the Kafka agent deployed:

docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
        -e KAFKA_CFG_NODE_ID=0 \
        -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
        -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
        -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
        -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
        -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
        -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
        bitnami/kafka:latest

Testing with test code:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("kafka://localhost:9092/test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

Let's see how to use the Kafka protocol to publish and subscribe to messages in Dial functions.

Dial("kafka://localhost:9092/test_topic")

As with several other protocols, the first part is the protocol name; then follow the listening address:localhost:9092Use the symbol "/" as an interval and write the Subscribe/Post topic next, where the test topic is set totest_topic

The test results:

img


More