[TOC]
С быстрым развитием финансовых рынков и популярностью количественной торговли все больше и больше трейдеров начинают полагаться на автоматизированные стратегии для торговли.
В этой статье мы рассмотрим протокол прямой торговли для коммуникации торговых стратегий на платформе FMZ, познакомимся с концепцией его проектирования, функциональными особенностями и преимуществами в практическом применении.
Независимо от того, являетесь ли вы энтузиастом по количественному трейдингу, который только начинает с FMZ или опытным профессиональным программистом, эта статья предоставит вам ценные идеи и практические руководства по работе.
Эти сценарии спроса демонстрируют различные возможности и преимущества протокола прямой торговли в практическом применении.
После понимания требований к коммуникации между живыми трейдерами, мы должны рассмотреть, как реализовать эти требования. Это не что иное, как живая торговля A, надеясь обменяться информацией с живой торговлей B. Хотя требования кажутся простыми, есть различные детали, которые необходимо согласовать при использовании набора протоколов связи. FMZ включил несколько популярных протоколов связи.
mqtt / nats / amqp / kafka
Архитектура связи:
Сервер (прокси).
Для передачи сообщений между подписчиками и издателями необходим сервер, который выполняет протокол связи. Этот сервер может быть развернут локально в системе docker
Клиент (подписчик, издатель). Программа стратегии в режиме реального времени на FMZ может быть понята как клиент протокола связи.
При применении этих протоколов на платформе FMZ можно просто понять, что протоколы mqtt / nats / amqp / kafkap интегрированы вDial()
Функция иDial()
Для демонстрации мы используем различные развертывания изображений протокольного сервера в следующих примерах.
Функция набора в разделе документации API:https://www.fmz.com/syntax-guide#fun_dial
Прежде чем развернуть изображение докера, помните, чтобы сначала установить программное обеспечение докера.
Далее, давайте изучим и практикуем приложения протокола связи, поддерживаемые FMZ.
MQTT (Message Queuing Telemetry Transport) - это легкий протокол передачи сообщений, который особенно подходит для сетей с низкой пропускной способностью, высокой задержкой или ненадежной сетью.
Основные особенности протокола MQTT: режим публикации/подписки
Публикация: производитель сообщения отправляет сообщение на тему.
Подписка: потребитель сообщений подписывается на интересующую тему, тем самым получая сообщения, опубликованные на эту тему.
Брокер: MQTT использует брокера сообщений в качестве посредника для пересылки сообщений, обеспечивая разъединение между издателями и подписчиками.
Поскольку для развертывания прокси-сервера MQTT мы используем докер-изображение (eclipse-mosquitto image) программного обеспечения, поддерживающего протокол MQTT, мы заранее установили докер и не будем вдаваться в подробности позже.
Прежде чем запустить команду развернуть изображение, мы должны написать файл конфигурации прокси-сервераmosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
Затем выполните команду развертывания:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
После запуска изображения прокси-сервера отображается следующее:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Затем мы можем проверить стратегию, чтобы воплотить ее в жизнь.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Основное использование функции Dial в коде стратегии:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Параметр строки функции Dial начинается сmqtt://
, что является названием протокола, за которым следует адрес прослушивания и порт. Символ test_topic
.
Вышеприведенная стратегия публикует и подписывается на тему одновременно.
Мы также можем использовать два живых трейдинга для подписки друг на друга и публикации тематической информации. Мы используем такой пример в разделе практики протокола nats и не будем повторять этот метод в других протоколах.
Протокол NATS представляет собой простой, текстовый протокол стиля publish/subscribe. Клиент подключается к gnatsd (сервер NATS) и общается с gnatsd. Связь основана на обычных сокетах TCP/IP и определяет очень небольшой набор операций. Newline указывает на завершение. В отличие от традиционных систем связи сообщений, которые используют двоичные форматы сообщений, текстовый протокол NATS делает реализацию клиента очень простой и может быть легко реализована в различных языках программирования или языках сценариев.
Каждый протокол имеет свои особенности. Вы можете ознакомиться с конкретными документами и материалами, которые не будут подробно описаны здесь.
Разверните сервер протокола NATS:
Docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Эта команда автоматически загружает и запускает изображение nats, а порт 4222 является портом, к которому клиент должен получить доступ. После развертывания изображения также будет открыт http-монитор на порту 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
Изображение сервера НАТС начинает работать, прослушивая по порту 4222.
Нам нужно создать две стратегии (живая торговля), давайте назовем их Стратегия A и Стратегия B. Коды этих двух стратегий в основном одинаковы. Они написаны на Javascript, который является самым простым языком для использования на платформе FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Эти две стратегии практически одинаковы, за исключением того, что они публикуют и подписываются друг на друга, а подписанные темы, опубликованные темы и опубликованная информация отличаются.
Возьмем, к примеру, стратегию Б:
Dial()
функция для создания объекта сервера клиентского соединенияconnPub
для публикации тематических сообщений:var connPub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotB )
Параметрная строка функции Dial начинается сnats://
указывая, что протокол NATS используется для связи.admin
является простой информацией о проверкеauth admin
вставляется при развертывании изображения Docker. Символ 127.0.0.1:4222
Наконец, есть тема публикации/подписки:topic=pubRobotB
Обратите внимание, что символ
Dial()
функция для создания объекта сервера клиентского соединенияconnSub
для подписки на сообщение по теме:var connSub = Dial ((
nats://admin@127.0.0.1:4222?topic=pubRobotA )
Разница только в том,topic=pubRobotA
, потому что мы должны подписаться на темуpubRobotA
где стратегия A отправляет информацию.
Создание и использование объектов подписных и публикующих подключений в стратегии А совпадают с описанными выше.
Таким образом, реализуется простой пример применения протокола NATS, в котором прямая торговля A и прямая торговля B подписываются и публикуют сообщения для общения друг с другом.
В асинхронной связи сообщение не достигает получателя сразу, но хранится в контейнере. Когда выполняются определенные условия, сообщение будет отправлено получателю контейнером. Этот контейнер является очередью сообщения. Для выполнения этой функции обе стороны и контейнер и его компоненты должны соблюдать единые соглашения и правила. AMQP - такой протокол. И отправитель, и получатель сообщения могут реализовать асинхронную связь, соблюдая этот протокол. Этот протокол устанавливает формат и способ работы сообщения.
Каждый протокол имеет свои особенности. Вы можете ознакомиться с конкретными документами и материалами, которые не будут подробно описаны здесь.
Разверните сервер протокола amqp:
Docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
При развертывании изображения докера оно будет загружаться и развертываться автоматически, и когда оно будет завершено, оно будет отображать:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
После развертывания изображения сервера напишите тестовый пример:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
При использовании очереди протокола AMQP обратите внимание, что опубликованные сообщения будут сохраняться в очереди. Например, если мы запустим вышеприведенный примерный код, 10 сообщений будут записаны в очередь. Затем, когда мы запустим его во второй раз, мы можем обнаружить, что первое написанное сообщение будет прочитано снова при чтении. Как показано на рисунке:
Мы можем видеть, что два сообщения журнала, на которые указывают красные стрелки на скриншоте, имеют несоответствующее время. Причина в том, что красное сообщение - это то, что было прочитано и записано в очередь, когда код стратегии был впервые запущен.
На основе этой особенности могут быть выполнены некоторые требования. Например, после перезагрузки стратегии, записанные данные рынка все еще могут быть получены из очереди для расчета инициализации и других операций.
Apache Kafka - это распределенное хранилище данных, оптимизированное для приема и обработки потоковых данных в режиме реального времени.
Кафка предоставляет своим пользователям три основные функции:
Kafka используется в основном для создания потоковых каналов передачи данных в реальном времени и приложений, которые адаптируются к потокам данных.
Развернуть докерный образ прокси Кафки:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Испытание с использованием кода испытания:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Давайте посмотрим, как использовать протокол Кафки для публикации и подписки сообщений в функции Dial.
Dial("kafka://localhost:9092/test_topic")
Как и другие протоколы, первая часть - это название протокола. За ним следует адрес прослушивания:localhost:9092
. Затем используйте символ test_topic
.
Результаты испытаний: