[TOC]
随着金融市场的快速发展和量化交易的普及,越来越多的交易者开始依赖自动化策略进行交易。在这个过程中,策略之间的通信和协调显得尤为重要。FMZ(量化交易平台)通过提供高效的交易策略实盘间通信协议,帮助交易者实现策略的无缝对接和实时数据共享。
本篇文章将深入探讨FMZ平台中的交易策略实盘间通信协议,介绍其设计理念、功能特点以及在实际应用中的优势。我们将通过详细的案例分析,展示如何利用这一协议实现高效、稳定的策略通信,提升交易策略的执行力和收益表现。
无论你是刚入门FMZ的量化交易爱好者,还是经验丰富的专业程序员老手,本篇文章都将为你提供有价值的见解和实用的操作指南。让我们一同探索FMZ的强大功能,了解如何通过高效的通信协议,实现策略间的协同合作,提升交易效率,捕捉市场机遇。
这些需求场景展示了FMZ交易策略实盘间通信协议在实际应用中的多种可能性和优势。通过有效的策略间通信,交易者可以更好地应对复杂的市场环境,优化交易策略,提升交易效率和收益。
了解过实盘之间的通信需求后,就是要考虑如何实现这些需求了。无非就是实盘A希望可以和实盘B交互信息,虽然看上去需求很简单。不过其中有各种细节就需要使用一套通信协议来约定,FMZ已经封装了几种比较流行的通信协议。
mqtt / nats / amqp / kafka
通信架构为:
在FMZ平台上应用这些协议时,可以简单理解为 mqtt / nats / amqp / kafka 这些协议都集成在了Dial()
函数中,使用Dial()
函数进行消息发布、订阅等操作。这些发布的消息通过协议的服务端代理(中继)给订阅的实盘,所以首先要运行起来一个协议的服务端。为了便于演示,以下例子中我们都使用各种协议服务端镜像部署。
Dial函数所在API文档章节: https://www.fmz.com/syntax-guide#fun_dial
在部署docker镜像之前,要记得先安装docker软件。
接下来就让我们一起来探索、实践FMZ支持的通信协议应用。
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,特别适用于低带宽、高延迟或不可靠的网络环境。它由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 在 1999 年提出,后来成为 ISO 标准(ISO/IEC PRF 20922)。
MQTT 协议的主要特点:发布/订阅模式
因为我们使用支持MQTT协议的软件的docker镜像(eclipse-mosquitto镜像)部署MQTT代理服务器,所以提前安装好docker,后续不在赘述。
在运行部署镜像的命令之前,我们需要先写好一个代理服务器配置文件mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
然后执行部署命令:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
代理服务器镜像运行起来后有如下显示:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
然后我们可以测试策略,来实践一下。
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
策略代码中主要是Dial函数的使用:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial函数的字符串参数开头的mqtt://
是协议名称,然后紧跟监听地址、端口。符号”?“后是订阅/发布主题名称,这里测试的主题名称为:test_topic
。
以上策略针对一个主题一边发布、一边订阅,运行测试如图:
也可以使用2个实盘相互订阅、发布主题信息,在nats协议实践章节我们使用这样的例子,其它的协议中不再赘述该方式。
NATS的协议是一个简单的、基于文本的发布/订阅风格的协议。客户端连接到 gnatsd(NATS服务器),并与 gnatsd 进行通信,通信基于普通的 TCP/IP 套接字,并定义了很小的操作集,换行表示终止。与传统的、使用了二进制消息格式的消息通信系统不同,使用了基于文本的 NATS 协议,使得客户端实现很简单,可以方便地选择多种编程语言或脚本语言来实现。
每个协议都有各自的特点,可以查阅具体文档、资料,这里不在赘述。
部署 nats 协议服务端:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
这条docker命令会自动下载、运行 nats 镜像,4222端口为客户端所要访问的端口。镜像部署好之后也会有一个http monitor在8222端口开放。
Listening for client connections on 0.0.0.0:4222
Server is ready
nats服务端镜像就开始运行了,监听4222端口。
我们需要创建两个策略(实盘),暂且就命名策略A、策略B吧,这两个策略代码基本一样。使用FMZ平台上最易上手的Javascript语言编写。
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
这两个策略基本上一样,只不过是相互发布、订阅,订阅主题、发布主题、发布信息不一样而已。
以策略B举例:
Dial()
函数创建客户端连接服务端对象connPub
,用于主题消息发布:var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
Dial函数的参数字符串,开头的nats://
表示使用的是nats协议通信,然后admin
是在部署docker镜像时设置的简单校验信息auth admin
,使用字符”@“与后面的内容间隔,然后是服务地址和端口127.0.0.1:4222
,最后是发布/订阅主题:topic=pubRobotB
注意与前面的地址之间用”?“符号间隔。
Dial()
函数创建客户端连接服务端对象connSub
,用于主题消息订阅:var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
差别只是topic=pubRobotA
不同,因为需要订阅策略A发送信息的主题pubRobotA
。
对于策略A中的订阅、发布连接对象的创建和使用与以上描述同理。
这样就实现了一个简单的实盘A与实盘B之间相互订阅、发布消息进行通信的nats协议应用例子。
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
每个协议都有各自的特点,可以查阅具体文档、资料,这里不在赘述。
部署 amqp 协议服务端:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
部署docker镜像时,会自动下载部署,完成以后会显示:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
服务端镜像部署好之后,编写测试例子测试:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
使用amqp协议的queue需要注意,发布后的消息会持久存在于queue中,例如我们先运行一次上面的例子代码。会向queue写入10条信息。然后我们第二次运行可以发现读取的时候会再次读取第一次写入的信息。如图所示:
可以看到截图中红色箭头指向的两条日志信息,时间并不一致,原因是红色的这条是读取出的消息是策略代码第一次运行时写入queue的。
基于此特性可以实现一些需求,例如:策略实盘重启后,依然可以从queue中获取已记录的行情数据,用于初始化计算等操作。
Apache Kafka 是一种分布式数据存储,经过优化以实时提取和处理流数据。流数据是指由数千个数据源持续生成的数据,通常可同时发送数据记录。流平台需要处理这些持续流入的数据,按照顺序逐步处理。
Kafka 为其用户提供三项主要功能: - 发布和订阅记录流 - 按照记录的生成顺序高效地存储记录流 - 实时处理记录流
Kafka 主要用于构建适应数据流的实时流数据管道和应用程序。它结合了消息收发、存储和流处理功能,能够存储历史和实时数据。
部署Kafka代理的docker镜像:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
使用测试代码测试:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
我们来看一下如何在Dial函数中使用kafka协议进行消息发布与订阅。
Dial("kafka://localhost:9092/test_topic")
与其它几种协议一样,开头部分是协议名称。然后紧跟监听地址:localhost:9092
。接着使用符号”/“作为间隔,后面写订阅/发布的主题,这里测试主题设置为test_topic
。
测试结果: