[TOC]
С быстрым развитием финансовых рынков и популярностью количественной торговли все больше трейдеров начинают полагаться на автоматизированные стратегии торговли. В этом процессе особенно важны коммуникация и координация между стратегиями. FMZ (платформа количественной торговли) помогает трейдерам добиться бесшовной интеграции стратегий и обмена данными в реальном времени, предоставляя эффективный протокол связи между реальными торговыми стратегиями.
В этой статье подробно рассматривается протокол обмена данными в реальном времени для торговых стратегий на платформе FMZ, а также представлена концепция его дизайна, функциональные особенности и преимущества в практическом применении. С помощью подробного анализа случаев мы покажем, как использовать этот протокол для достижения эффективной и стабильной стратегической коммуникации, а также улучшения исполнения и прибыльности торговых стратегий.
Независимо от того, являетесь ли вы энтузиастом количественной торговли, который только начинает работать с FMZ, или опытным профессиональным программистом, эта статья предоставит вам ценную информацию и практические руководства по работе. Давайте рассмотрим мощные функции FMZ и узнаем, как добиться взаимодействия между стратегиями с помощью эффективных протоколов связи, повысить эффективность торговли и использовать рыночные возможности.
Эти сценарии спроса демонстрируют различные возможности и преимущества протокола связи в реальном времени торговой стратегии FMZ в практических приложениях. Благодаря эффективной коммуникации между стратегиями трейдеры могут лучше справляться со сложными рыночными условиями, оптимизировать торговые стратегии и повышать эффективность торговли и прибыль.
Поняв требования к связи между реальными дисками, нам необходимо рассмотреть, как реализовать эти требования. Это не более чем то, что реальный случай А надеется обменяться информацией с реальным случаем Б, хотя требование кажется простым. Однако необходимо согласовать различные детали с использованием набора протоколов связи. FMZ инкапсулировал несколько популярных протоколов связи.
mqtt / nats / amqp / kafka
Архитектура коммуникации:
При применении этих протоколов на платформе FMZ, это можно просто понимать как mqtt/nats/amqp/kafka. Эти протоколы интегрированы вDial()
В функции используйтеDial()
Функции используются для публикации и подписки сообщений. Эти опубликованные сообщения передаются (ретранслируются) на подписанный реальный диск через сервер протокола, поэтому сначала необходимо запустить сервер протокола. Для простоты демонстрации в следующих примерах мы используем различные развертывания образов серверов протоколов.
Раздел документации API функции набора номера: https://www.fmz.com/syntax-guide#fun_dial
Перед развертыванием образа Docker не забудьте сначала установить программное обеспечение Docker.
Далее давайте рассмотрим и попрактикуем приложения протокола связи, поддерживаемые FMZ.
MQTT (Message Queuing Telemetry Transport) — это облегченный протокол передачи сообщений, который особенно подходит для сетевых сред с низкой пропускной способностью, большой задержкой или ненадежностью. Он был предложен Энди Стэнфорд-Кларком и Арленом Ниппером из IBM в 1999 году и позднее стал стандартом ISO (ISO/IEC PRF 20922).
Основные возможности протокола MQTT: режим публикации/подписки
Поскольку для развертывания прокси-сервера MQTT мы используем образ docker (образ eclipse-mosquitto) программного обеспечения, поддерживающего протокол MQTT, мы установили docker заранее и не будем вдаваться в подробности позже.
Перед запуском команды по развертыванию образа нам необходимо написать файл конфигурации прокси-сервераmosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Затем выполните команду развертывания:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
После запуска образа прокси-сервера отображается следующий экран:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Затем мы можем протестировать стратегию и применить ее на практике.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Основное применение функции Dial в коде стратегии:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Строковый параметр функции Dial начинается сmqtt://
Это имя протокола, за которым следует адрес прослушивания и порт. За символом “?” следует название темы подписки/публикации. Название темы, протестированное здесь, следующее:test_topic
。
Вышеуказанная стратегия публикует и подписывается на тему одновременно. Тест выполнения показан на рисунке:
Вы также можете использовать два реальных диска для подписки друг на друга и публикации информации о теме. Мы используем такой пример в разделе практики протокола NATS и не будем повторять этот метод в других протоколах.
Протокол NATS — это простой текстовый протокол в стиле публикации/подписки. Клиент подключается к gnatsd (серверу NATS) и взаимодействует с gnatsd. Связь основана на обычных сокетах TCP/IP и определяет очень небольшой набор операций. Новая строка указывает на завершение. В отличие от традиционных систем обмена сообщениями, использующих двоичные форматы сообщений, текстовый протокол NATS упрощает реализацию клиента и может быть легко реализован на различных языках программирования или языках сценариев.
Каждый протокол имеет свои особенности. Вы можете обратиться к конкретным документам и материалам, которые не будут здесь подробно рассматриваться.
Разверните сервер протокола NATS:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Эта команда docker автоматически загрузит и запустит образ nats. Порт 4222 — это порт, к которому клиент хочет получить доступ. После развертывания образа на порту 8222 будет открыт http-монитор.
Listening for client connections on 0.0.0.0:4222
Server is ready
Образ сервера NATS запускается, прослушивая порт 4222.
Нам нужно создать две стратегии (реальная торговля), назовем их пока Стратегия А и Стратегия Б. Коды этих двух стратегий в основном одинаковы. Написано на Javascript — самом простом языке для использования на платформе FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Эти две стратегии по сути одинаковы, за исключением того, что они публикуют и подписываются друг на друга, а подписываемые темы, публикуемые темы и публикуемая информация различаются.
Возьмем в качестве примера стратегию Б:
Dial()
Функция создает объект сервера клиентского соединенияconnPub
, используется для публикации тематических сообщений:var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
Строка параметров функции Dial начинается сnats://
Указывает, что для связи используется протокол NATS, затемadmin
Это простая проверочная информация, которая устанавливается при развертывании образа Docker.auth admin
, используйте символ «@» для разделения следующего содержимого, а затем адрес службы и порт127.0.0.1:4222
и, наконец, тема публикации/подписки:topic=pubRobotB
Обратите внимание, что между предыдущим адресом стоит символ «?».
Dial()
Функция создает объект сервера клиентского соединенияconnSub
, используется для подписки на тематические сообщения:var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
Единственное отличиеtopic=pubRobotA
Отличается, потому что вам нужно подписаться на тему, где стратегия А отправляет информациюpubRobotA
。
Создание и использование объектов подписки и публикации подключений в стратегии А аналогичны описанным выше.
Таким образом, реализуется простой пример применения протокола NATS, в котором реальный диск A и реальный диск B подписываются и публикуют сообщения для связи друг с другом.
В асинхронной коммуникации сообщение не достигнет получателя немедленно, а будет сохранено в контейнере. При выполнении определенных условий сообщение будет отправлено получателю контейнером. Этот контейнер является очередью сообщений. Для выполнения этой функции , обе стороны должны Контейнер и его компоненты соответствовать единым соглашениям и правилам. AMQP — это такой протокол. И отправитель, и получатель сообщений могут достичь асинхронной связи, соблюдая этот протокол. Этот протокол определяет формат сообщений и принцип их работы.
Каждый протокол имеет свои особенности. Вы можете обратиться к конкретным документам и материалам, которые не будут здесь подробно рассматриваться.
Разверните сервер протокола amqp:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
При развертывании образа Docker он автоматически загрузится и развернется, а по завершении отобразится следующее:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
После развертывания образа сервера напишите тестовый пример:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
При использовании очереди протокола AMQP, обратите внимание, что опубликованные сообщения будут сохраняться в очереди. Например, давайте запустим пример кода выше один раз. В очередь будет записано 10 сообщений. Затем, когда мы запускаем его во второй раз, мы можем обнаружить, что при чтении информация, записанная в первый раз, будет прочитана снова. Как показано на рисунке:
Вы можете видеть, что два сообщения журнала, на которые указывают красные стрелки на снимке экрана, имеют несогласованное время. Причина в том, что красное сообщение — это то, которое было прочитано и записано в очередь, когда код стратегии был запущен в первый раз.
На основе этой функции могут быть удовлетворены некоторые требования. Например, после перезапуска стратегии записанные рыночные данные по-прежнему могут быть получены из очереди для расчета инициализации и других операций.
Apache Kafka — это распределенное хранилище данных, оптимизированное для приема и обработки потоковых данных в режиме реального времени. Потоковые данные — это данные, которые непрерывно генерируются тысячами источников данных, часто отправляющих записи данных одновременно. Стриминговая платформа должна обрабатывать эти непрерывно текущие данные и обрабатывать их шаг за шагом.
Kafka предоставляет своим пользователям три основные функции:
Kafka в основном используется для создания конвейеров потоковых данных в реальном времени и приложений, которые адаптируются к потокам данных. Он объединяет возможности обмена сообщениями, хранения и потоковой обработки для хранения как исторических, так и данных в реальном времени.
Разверните образ docker прокси-сервера Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Проверьте, используя тестовый код:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Давайте рассмотрим, как использовать протокол Kafka для публикации и подписки на сообщения в функции набора номера.
Dial("kafka://localhost:9092/test_topic")
Как и у некоторых других протоколов, первая часть — это имя протокола. Затем следуйте адресу прослушивания:localhost:9092
. Затем используйте символ “/” в качестве разделителя, а затем тему подписки/публикации. Здесь тестовая тема установлена наtest_topic
。
Результаты теста: