[TOC]
С быстрым развитием финансовых рынков и распространением количественных сделок все больше и больше трейдеров начинают полагаться на автоматизированные стратегии для торговли. В этом процессе коммуникация и координация между стратегиями становятся особенно важными. FMZ (Quantitative Trading Platform) помогает трейдерам реализовать стратегию беспроблемной паровки и обмена данными в режиме реального времени, предоставляя эффективные протоколы междисциплинарной связи между торговыми стратегиями.
В данной статье мы подробно рассмотрим протоколы межрельсовой коммуникации между стратегиями торговли на платформе FMZ, а также их концепцию, функциональные характеристики и преимущества в практическом применении. Мы продемонстрируем, как использовать протокол для эффективной, стабильной стратегии коммуникации, повышения эффективности и эффективности стратегии торговли с помощью детального анализа случаев.
Независимо от того, новичок в области количественных операций FMZ или опытный профессиональный программист, эта статья даст вам ценные идеи и практические руководства. Давайте вместе рассмотрим мощные возможности FMZ и узнаем, как с помощью эффективных коммуникационных протоколов достичь синхронного сотрудничества между стратегиями, повысить эффективность транзакций и захватить рыночные возможности.
Эти сценарии потребности демонстрируют многочисленные возможности и преимущества ППД в практическом применении. Благодаря эффективной стратегической коммуникации трейдеры могут лучше реагировать на сложную рыночную среду, оптимизировать торговые стратегии, повышать эффективность и прибыль торговли.
После понимания потребностей в связи между дисками необходимо подумать о том, как их реализовать. Это не что иное, как то, что диска А хочет взаимодействовать с диском Б, хотя это кажется простым. Однако есть различные детали, которые требуют согласования с помощью набора протоколов связи, FMZ уже упаковывает несколько более популярных протоколов связи.
mqtt / nats / amqp / kafka
Архитектура связи:
При использовании этих протоколов на платформе FMZ, их можно просто понять как mqtt / nats / amqp / kafka.Dial()
Использование функцийDial()
Функции выполняют такие действия, как публикация сообщений, подписка и т. д. Эти сообщения передаются через протокольные серверные агенты (ретрансляции) на дисковый диск подписки, поэтому сначала нужно запустить серверного протокола. Для удобства демонстрации в следующих примерах мы используем различные протокольные развертывания зеркальных серверных окон.
Раздел API-документации, в котором расположена функция Dial:https://www.fmz.com/syntax-guide#fun_dial
Перед тем, как развернуть Docker Mirror, помните, что сначала нужно установить Docker Software.
В следующей статье мы рассмотрим практические применения протоколов связи, поддерживаемых FMZ.
MQTT (Message Queuing Telemetry Transport) - это легкий протокол передачи сообщений, особенно подходящий для сетевых условий с низкой пропускной способностью, высокой задержкой или ненадежностью. Он был предложен в 1999 году Энди Стэнфорд-Кларком и Арленом Ниппером из IBM и позже стал стандартом ISO (ISO/IEC PRF 20922).
Основные особенности протокола MQTT: модель публикации/подписки
Поскольку мы развертываем прокси-сервер MQTT с помощью программного обеспечения, поддерживающего протокол MQTT (eclipse-mosquitto mirror), мы заранее устанавливаем docker, после чего ничего не обсуждается.
Перед тем, как запустить команду развертывать зеркало, нам нужно написать профиль прокси-сервера.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Затем выполните команду развертывания:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
После запуска прокси-сервер показывает следующее:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
И тогда мы сможем проверить стратегию и применить её.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
В стратегическом коде в основном используется функция Dial:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Начало параметров строки функции Dialmqtt://
Название темы тестирования называется: Subscribe/Publish.test_topic
。
Показаны следующие стратегии, которые используются для тестирования темы, когда она публикуется, подписывается и работает:
Также можно использовать два диска для подписки друг на друга, публиковать тематические сообщения, примеры которых мы используем в разделе практики протокола nats, но не упоминаются в других протоколах.
Протокол NATS - это простой, текстовый протокол в стиле публикации/подписки. Клиент подключается к gnatsd (NATS-сервер) и общается с gnatsd, общаясь на основе обычного TCP/IP-приложения, и определяет очень маленький набор операций, переменный знак окончания. В отличие от традиционных систем обмена сообщениями, использующих двоичный формат сообщений, используется текстовый протокол NATS, который позволяет клиенту легко реализовывать и легко выбирать несколько языков программирования или языков сценариев для реализации.
Каждое соглашение имеет свои особенности, и вы можете ознакомиться с конкретными документами и материалами, которые не описаны здесь.
Направление сервиса протокола nats:
Docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Команда docker автоматически загружает и запускает нац-образ, порт 4222 - это порт, к которому клиент хочет получить доступ.
Listening for client connections on 0.0.0.0:4222
Server is ready
На портах 4222 можно прослушивать.
Нам нужно создать две стратегии (включая реальные диски) и назвать их на данный момент A и B. Эти две стратегии имеют в основном одинаковый код.
Стратегия А
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Стратегия Б
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Эти две стратегии, по сути, одинаковы, но они отличаются друг от друга: публикация, подписка, подписка на тему, публикация на тему, публикация информации.
Приведем пример с стратегии Б:
1. ИспользованиеDial()
Функция создания объекта для подключения к клиентуconnPub
Например, вы можете посмотреть на видео, сделанное в Интернете.
var connPub = набрать номер127.0.0.1:4222?topic=pubRobotB”)
Параметровые строки функции Dial, начинающиеся сnats://
Это означает, что используется протокол Nats для общения, а затемadmin
Это простая проверка, которая устанавливается при развертывании Docker Mirror.auth admin
, с использованием символа "@127.0.0.1:4222
В конце концов, публикация/подписка:topic=pubRobotB
Обратите внимание, что между предыдущим адресом и предыдущим адресом помещены промежутки между символами "Как?" и "Как?"
2. ИспользованиеDial()
Функция создания объекта для подключения к клиентуconnSub
Например, вы можете подписаться на Facebook или Twitter.
var connSub = набрать номер127.0.0.1:4222?topic=pubRobotA”)
Разница лишь в том,topic=pubRobotA
Поскольку мы должны подписаться на стратегию A, мы отправляем сообщения на темы, которые нам не нужны.pubRobotA
。
Создание и использование объектов подписки, публикации и подключения в политике А аналогично описанию выше.
Стратегия А работает.
Стратегия B работает
Таким образом, реализуется простой пример применения протокола Nats для обмена сообщениями между дисками A и B.
В асинхронной коммуникации сообщение не сразу доходит до получателя, а хранится в контейнере, после выполнения определенных условий сообщение отправляется контейнером получателю, контейнеру, который является последовательностью сообщений, а для выполнения этой функции требуется, чтобы стороны и контейнер и его компоненты соблюдали единые соглашения и правила. AMQP - это такой протокол, который позволяет осуществлять асинхронную коммуникацию между сторонами отправления и получения. Этот протокол определяет формат и работу сообщения.
Каждое соглашение имеет свои особенности, и вы можете ознакомиться с конкретными документами и материалами, которые не описаны здесь.
Для развертывания amqp протокольного сервера:
Docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
При развертывании Docker Mirror автоматически загружается развертывание, после чего отображается:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
После того, как зеркало на сервере будет хорошо развернуто, напишите пример тестирования:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
При использовании протокола amqp следует иметь в виду, что сообщения после публикации остаются в очереди, например, мы сначала выполняем примерный код выше. Мы записываем в очередь 10 сообщений. Затем мы вторым раундом обнаруживаем, что в момент чтения мы снова читаем сообщения, которые были написаны впервые.
Как вы можете видеть в скриншоте, два сообщения о журнале, на которые указывают красные стрелки, не совпадают во времени, поскольку красный цвет означает, что сообщение, которое было прочитано, было написано в очередь при первом запуске кода стратегии.
На основе этой характеристики могут быть реализованы некоторые потребности, например: после перезагрузки диска политики, все еще можно получить зарегистрированные данные рынка из очереди для таких операций, как инициализация вычислений.
Apache Kafka - это распределенное хранилище данных, оптимизированное для извлечения и обработки потоковых данных в режиме реального времени. Потоковые данные - это данные, непрерывно генерируемые из тысяч источников данных, которые обычно могут быть отправлены одновременно.
Кафка предоставляет своим пользователям три основных возможности:
Kafka используется в основном для создания каналов и приложений для реального времени потока данных, адаптированных к потоку данных. Он сочетает в себе функции передачи сообщений, хранения и потоковой обработки, которые позволяют хранить исторические и реальные данные.
Снимок докера, в котором развернуты агенты Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Проверка с помощью тестового кода:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Давайте посмотрим, как можно использовать протокол Kafka для публикации и подписки на сообщения в Dial-функции.
Dial("kafka://localhost:9092/test_topic")
Как и в случае с некоторыми другими протоколами, вначале протокол называется именем; затем следует адрес прослушки:localhost:9092
После этого используйте символ "/" в качестве интервала, а затем напишите тему подписки / публикации, где тестовая тема настроена наtest_topic
。
Результаты теста: