[TOC]
금융 시장의 급속한 발전과 양적 거래의 인기와 함께 점점 더 많은 거래자가 거래에 대한 자동화된 전략에 의존하기 시작합니다. 이 과정에서 전략 간의 통신과 조정은 특히 중요합니다. FMZ 양적 거래 플랫폼은 거래 전략 간의 효율적인 통신 프로토콜을 제공함으로써 거래자가 원활한 전략 도킹과 실시간 데이터 공유를 달성하는 데 도움이됩니다.
이 문서에서는 FMZ 플랫폼에서 거래 전략의 라이브 거래 통신 프로토콜을 탐구하고, 실제 응용 프로그램에서 설계 개념, 기능 기능 및 장점을 소개합니다. 자세한 사례 분석을 통해 효율적이고 안정적인 전략 통신을 달성하고 거래 전략의 실행 및 수익 성능을 향상시키기 위해이 프로토콜을 사용하는 방법을 보여줍니다.
당신이 FMZ를 시작하는 양적 거래 애호가 또는 경험이 많은 전문 프로그래머이든, 이 기사는 당신에게 귀중한 통찰력과 실용적인 운영 가이드를 제공합니다. FMZ 플랫폼의 강력한 기능을 탐구하고 효율적인 통신 프로토콜을 통해 전략 간의 협력을 달성하고 거래 효율성을 향상시키고 시장 기회를 포착하는 방법을 배우십시오.
이러한 수요 시나리오는 실제 응용 분야에서 FMZ 거래 전략 라이브 거래 통신 프로토콜의 다양한 가능성과 장점을 보여줍니다. 효과적인 전략 간 커뮤니케이션을 통해 거래자는 복잡한 시장 환경에 더 잘 대처하고 거래 전략을 최적화하고 거래 효율성과 수익을 향상시킬 수 있습니다.
라이브 트레이딩 사이의 통신 요구 사항을 이해 한 후, 이러한 요구 사항을 구현하는 방법을 고려해야합니다. 라이브 트레이딩 A와 라이브 트레이딩 B와 정보를 교환하기를 희망하는 라이브 트레이딩 A 이상의 것이 아닙니다. 요구 사항이 간단해 보이지만 일련의 통신 프로토콜을 사용하는 데 동의해야 할 다양한 세부 사항이 있습니다. FMZ는 여러 인기있는 통신 프로토콜을 포괄했습니다.
mqtt / nats / amqp / kafka
통신 구조는 다음과 같습니다.
FMZ 플랫폼에서 이러한 프로토콜을 적용 할 때 mqtt / nats / amqp / kafkap 프로토콜이Dial()
기능, 그리고Dial()
이 함수는 메시지를 게시하고 구독하는 데 사용됩니다. 이러한 게시 된 메시지는 프로토콜 서버를 통해 구독 된 라이브 트레이딩에 프록시 (전송) 됩니다. 따라서 프로토콜 서버가 먼저 실행되어야 합니다. 예시를 위해 다음과 같은 예제에서 다양한 프로토콜 서버 이미지 배포를 사용합니다.
API 문서 섹션에서 다이얼 함수:https://www.fmz.com/syntax-guide#fun_dial
도커 이미지를 배포하기 전에 먼저 도커 소프트웨어를 설치하는 것을 기억하세요.
다음으로 FMZ가 지원하는 통신 프로토콜 애플리케이션을 탐구하고 연습해보겠습니다.
MQTT (Message Queuing Telemetry Transport) 는 낮은 대역폭, 높은 지연 시간 또는 신뢰할 수없는 네트워크 환경에 특히 적합한 가벼운 메시지 전송 프로토콜이다. 1999년 IBM의 앤디 스탠퍼드 클라크와 아르렌 닛퍼가 제안했으며 나중에 ISO 표준 (ISO/IEC PRF 20922) 이되었습니다.
MQTT 프로토콜의 주요 특징: 게시/구독 모드
우리는 MQTT 프록시 서버를 배포하기 위해 MQTT 프로토콜을 지원하는 소프트웨어의 도커 이미지 (eclipse-mosquitto image) 를 사용하기 때문에 우리는 미리 도커를 설치했으며 나중에 세부 사항에 대해 설명하지 않을 것입니다.
이미지를 배포하기 위해 명령을 실행하기 전에, 우리는 프록시 서버 구성 파일을 작성해야합니다mosquitto.conf
.
# Configure port number and remote access IP
listener 1883 0.0.0.0
# Setting up anonymous access
allow_anonymous true
다음 배포 명령어를 실행합니다:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
프록시 서버 이미지가 실행되면 다음이 표시됩니다.
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
그 다음에는 전략을 시험해서 실행에 옮길 수 있습니다.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
전략 코드에서 다이얼 함수의 주요 사용은:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
다이얼 함수의 문자열 매개 변수는mqtt://
, 즉 프로토콜 이름, 그 다음 듣기 주소와 포트. 기호 test_topic
.
위의 전략은 동시에 주제에 게재하고 가입합니다. 실행 테스트는 그림에서 표시됩니다.
우리는 또한 두 라이브 트레이드를 사용하여 서로 가입하고 주제 정보를 게시 할 수 있습니다. 우리는 nats 프로토콜 연습 섹션에서 그러한 예를 사용합니다. 다른 프로토콜에서이 방법을 반복하지 않을 것입니다.
NATS 프로토콜은 간단한 텍스트 기반 게시 / 구독 스타일 프로토콜입니다. 클라이언트는 gnatsd (NATS 서버) 에 연결되고 gnatsd와 통신합니다. 통신은 일반적인 TCP / IP 소켓에 기반하고 매우 작은 일련의 작업을 정의합니다. 뉴 라인은 종료를 나타냅니다. 바이너리 메시지 형식을 사용하는 전통적인 메시지 통신 시스템과 달리 텍스트 기반 NATS 프로토콜은 클라이언트 구현을 매우 간단하게하고 다양한 프로그래밍 언어 또는 스크립트 언어로 쉽게 구현 할 수 있습니다.
각 프로토콜은 각자의 특성을 가지고 있습니다.
NATS 프로토콜 서버를 배포합니다.
docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
이 도커 명령어는 자동으로 nats 이미지를 다운로드 하 고 실행 하 고, 포트 4222는 클라이언트가 액세스 해야 하는 포트 이다. 이미지가 배포 된 후, http 모니터도 포트 8222에 열릴 것이다.
Listening for client connections on 0.0.0.0:4222
Server is ready
Nats 서버 이미지가 4222 포트에서 듣고 실행됩니다.
우리는 두 가지 전략 (라이브 트레이딩) 을 만들어야 합니다. 전략 A와 전략 B라고 부르겠습니다. 이 두 가지 전략의 코드는 기본적으로 동일합니다. 그들은 FMZ 플랫폼에서 사용하는 가장 쉬운 언어인 자바스크립트로 작성되었습니다.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotA, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("Current live trading robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("Communication failure!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("Communication failure!")
return
}
while (true) {
connPub.write("Message posted by robotB, robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
이 두 가지 전략은 거의 같지만, 서로 게시하고 구독하고 있으며, 구독된 주제, 게시된 주제 및 게시된 정보는 다릅니다.
전략 B를 예로 들어보죠.
Dial()
클라이언트 연결 서버 객체를 생성하는 함수connPub
주제 메시지의 게시물:var connPub = 다이얼127.0.0.1:4222?topic=pubRobotB”)
다이얼 함수의 매개 변수 문자열은nats://
NATS 프로토콜이 통신에 사용된다는 것을 나타냅니다.admin
간단한 확인 정보입니다.auth admin
도커 이미지를 배포할 때 설정됩니다. 다음 콘텐츠를 분리하기 위해 127.0.0.1:4222
마지막으로, 게시/구독 주제가 있습니다.topic=pubRobotB
Dial()
클라이언트 연결 서버 객체를 생성하는 함수connSub
주제 메시지 구독:var connSub = 다이얼127.0.0.1:4222?topic=pubRobotA”)
유일한 차이점은topic=pubRobotA
, 우리는 주제에 가입해야 하기 때문에pubRobotA
전략 A가 정보를 전송하는 곳.
전략 A에서 구독 및 출판 연결 객체의 생성 및 사용은 위에서 설명한 것과 동일합니다.
이 방법으로, 라이브 트레이딩 A와 라이브 트레이딩 B가 서로 통신하기 위해 메시지를 구독하고 게시하는 간단한 NATS 프로토콜 응용 예가 구현됩니다.
비동기 통신에서는 메시지가 수신자에게 즉시 도달하지 않지만 컨테이너에 저장됩니다. 특정 조건이 충족되면 컨테이너에 의해 수신자에게 메시지가 전송됩니다. 이 컨테이너는 메시지 대기열입니다. 이 기능을 완료하기 위해 당사자와 컨테이너 및 구성 요소 모두 통일된 계약과 규칙을 준수해야합니다. AMQP는 그러한 프로토콜입니다. 메시지의 발신자와 수신자 모두이 프로토콜을 준수하여 비동기 통신을 구현 할 수 있습니다. 이 프로토콜은 메시지의 형식 및 작업 방법을 규정합니다.
각 프로토콜은 각자의 특성을 가지고 있습니다.
amqp 프로토콜 서버를 배포합니다.
docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-관리
도커 이미지를 배포할 때, 자동으로 다운로드하고 배포하고, 완료되면 표시됩니다:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
서버 이미지가 배포된 후 테스트 예제를 작성합니다.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Read
Log("read:", conn.read(1000), "#FF0000")
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
AMQP 프로토콜 큐를 사용할 때, 게시 된 메시지가 큐에 지속 될 것을 유의하십시오. 예를 들어 위의 예제 코드를 실행하면 10 개의 메시지가 큐에 기록됩니다. 두 번째 실행하면 첫 번째 작성 된 메시지가 읽었을 때 다시 읽히는 것을 볼 수 있습니다. 그림에서 보이는 것처럼:
우리는 스크린 샷의 빨간 화살표로 표시된 두 로그 메시지가 일치하지 않는 시간을 가지고 있음을 볼 수 있습니다. 이유는 빨간 메시지가 전략 코드가 처음 실행되었을 때 대기열에 읽고 작성 된 것입니다.
이 특징을 기반으로, 몇 가지 요구 사항이 충족 될 수 있습니다. 예를 들어, 전략이 다시 시작 된 후, 기록 된 시장 데이터는 초기화 계산 및 기타 작업을위한 대기열에서 여전히 얻을 수 있습니다.
아파치 카프카 (Apache Kafka) 는 실시간으로 스트리밍 데이터를 섭취하고 처리하는 데 최적화된 분산 데이터 저장소이다. 스트리밍 데이터는 수천 개의 데이터 소스에 의해 지속적으로 생성되는 데이터이며, 종종 데이터 레코드를 동시에 전송한다. 스트리밍 플랫폼은 이러한 지속적인 데이터 유입을 처리하고 순차적으로 처리해야 한다.
카프카는 사용자에게 세 가지 주요 기능을 제공합니다.
카프카는 주로 실시간 스트리밍 데이터 파이프라인과 데이터 스트림에 적응하는 응용 프로그램을 구축하는 데 사용됩니다. 메시징, 저장 및 스트림 처리 기능을 결합하고 역사적 및 실시간 데이터를 저장할 수 있습니다.
카프카 프록시의 도커 이미지를 배포:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
테스트 코드를 사용하여 테스트:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("Current live trading robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("Communication failure!")
return
}
for (var i = 0; i < 10; i++) {
// Write
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("Write a message to testQueue:", msg)
// Read
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("close conn")
}
Dial("kafka://localhost:9092/test_topic")
다른 프로토콜과 마찬가지로, 첫 번째 부분은 프로토콜 이름입니다. 그 다음에는 듣기 주소가 있습니다.localhost:9092
. 다음 test_topic
.
검사 결과: