[TOC]
金融市場の急速な発展と量化取引の普及により,ますます多くのトレーダーは自動化戦略に頼り始めています. この過程で,戦略間のコミュニケーションと調整が特に重要になります. FMZ (量化取引プラットフォーム) は,効率的な取引戦略のリアルディスク通信プロトコルを提供することで,トレーダーが戦略のシームレスペアリングとリアルタイムデータ共有を実現するのを助けています.
この記事では,FMZプラットフォームにおける取引戦略実地間通信プロトコルを深く検討し,その設計理念,機能特性,そして実用的な応用上の利点について説明します.詳細な事例分析を通じて,このプロトコルを活用して効率的で安定した戦略通信を実現し,取引戦略の実行力と収益性の向上を図る方法を示します.
FMZの量化取引に初心者であろうと,経験豊富なプロのプログラマーであろうと,この記事では貴重な洞察と実践的な操作ガイドを提供します. FMZの強力な機能を一緒に探検し,効率的な通信プロトコルによって戦略間の協働を実現し,取引効率を向上させ,市場機会を把握する方法について学びましょう.
これらの需要シナリオは,FMZ取引戦略の実用的な応用における実用的な通信協定の可能性と利点を示している.効果的な戦略間通信によって,トレーダーは複雑な市場環境に対応し,取引戦略を最適化し,取引効率と収益性を向上させることができる.
リアルディスク間の通信のニーズを理解すると,それらのニーズを実現する方法について考える. リアルディスクAはリアルディスクBと通信を希望するだけで,要求は単純に見える. しかし,さまざまな詳細は通信プロトコルのセットを使用して合意する必要があります. FMZはいくつかの一般的な通信プロトコルをパッケージしています.
mqtt / nats / amqp / カフカ
通信アーキテクチャは:
FMZプラットフォームでは,これらのプロトコルを mqtt / nats / amqp / kafka と簡単に理解することができます.Dial()
函数で使うDial()
関数はメッセージ,サブスクリプションなどの操作を行います. これらのメッセージは,プロトコルのサーバー端代理 (リュール) によりサブスクリプションの実盤に送信されます. そのため,まずプロトコルのサーバーを起動する必要があります. デモの便宜のために,以下の例では,様々なプロトコルのサーバー端ミラーデプロイメントを使用します.
Dial 関数があるAPI ドキュメントのセクション:https://www.fmz.com/syntax-guide#fun_dial
Dockerのミラーを展開する前に,dockerソフトウェアをインストールすることを忘れないでください.
この記事へのトラックバック一覧です: FMZがサポートする通信プロトコルの応用を一緒に探してみましょう.
MQTT (Message Queuing Telemetry Transport) は,特に低帯域幅,高遅延,または信頼性の低いネットワーク環境で使用される軽量なメッセージ通信プロトコルである.これはIBMのAndy Stanford-ClarkとArlen Nipperによって1999年に提案され,後にISO標準 (ISO/IEC PRF 20922) になった.
MQTT プロトコルの主要特徴: 公開/サブスクリプション モジュール
MQTTのプログマは,MQTTプロトコルをサポートするソフトウェア (eclipse-mosquitto mirror) を使って展開されているため,Dockerを事前にインストールし,その後は説明しません.
代理サーバーのプロフィールを書く必要があります.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
部署命令を実行します:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
代理サーバーのミラーが起動すると,以下のように表示されます.
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
戦略をテストして実践してみましょう.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
策略コードは主にDial関数を使用しています.
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
ダイアル関数の文字列参数から始まるmqtt://
試行錯誤は,プロトコルの名前,監視アドレス,ポート、符号"?"に続いて,サブスクリプション/リリース主題名,ここでテスト主題名は:test_topic
。
投稿,購読,テストの際には,以下の手順が適用されます.
また,2つの実態台を使って相互にサブスクリプションしたり,主題情報を公開したりすることもできます.この例はnatsプロトコルの実践章で使いますが,他のプロトコルはこれ以上この方法について語っていません.
NATSのプロトコルは,シンプルな,テキストベースのリリース/サブスクリプションスタイルのプロトコルである.クライアントはgnatsd (NATSサーバー) に接続され,gnatsdと通信し,通信は通常のTCP/IP接字に基づいており,非常に小さな操作セットを定義し,交代表示を終了する.従来の,二次メッセージ形式を使用するメッセージ通信システムとは異なり,テキストベースのNATSプロトコルは,クライアントが簡単に実装し,複数のプログラミング言語またはスクリプト言語を簡単に選択することができます.
各協定にはそれぞれの特徴があり,具体的な文書や情報については,こちらに掲載していません.
Nats プロトコルのサーバーを展開する:
ドッカーラン
名前ナッツ rm -p 4222:4222 -p 8222:8222 ナッツ http_port 8222 auth admin
このdocker命令は,自動的に nats 画像をダウンロードし,実行します. 4222 ポートはクライアントがアクセスするポートです. 画像を展開すると, 8222 ポートで開いている http モニターも表示されます.
Listening for client connections on 0.0.0.0:4222
Server is ready
nats 端末ミラーが起動し,ポート4222を監視します.
基本的には同じコードで作成されている. FMZプラットフォーム上で最も使いやすいJavaScriptを使って作成されている.
戦略A
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
戦略B
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
この2つの戦略は基本的には同じですが,相互投稿,サブスクリプション,サブスクリプションテーマ,投稿テーマ,投稿情報は異なります.
戦略Bの例として:
1、使用Dial()
関数 クライアントに接続するサーバー端オブジェクトを作成するconnPub
メディアは,この記事の内容について,
ワルコンパブ = ダイヤル127.0.0.1:4222?topic=pubRobotB”)
ダイアル関数の参数文字列は,nats://
通信はNATSプロトコルでadmin
Docker Mirror をデプロイするときに設定する簡単なチェックメッセージですauth admin
,文字"@127.0.0.1:4222
投稿/サブスクリプション:topic=pubRobotB
前の住所との間には"
2、使用Dial()
関数 クライアントに接続するサーバー端オブジェクトを作成するconnSub
この記事へのトラックバック一覧です.
変数で表示する127.0.0.1:4222?topic=pubRobotA”)
違うのはtopic=pubRobotA
メッセージを送信するテーマは,A戦略のサブスクリプションが必要だからです.pubRobotA
。
策略Aのサブスクリプション,公開接続オブジェクトの作成と使用は,上記と同義である.
戦略Aが実行される
戦略Bが実行される
これは,ディスクAとディスクBの間の相互サブスクリプション,メッセージ発信,通信のためのシンプルなnatsプロトコルのアプリケーション例を実現する.
異動通信では,メッセージは即座に受信者に届かないが,容器に保存され,特定の条件が満たされると,メッセージは容器によって受信者に送信される.この容器はメッセージキュープである.この機能を達成するには,両者と容器とその各構成要素が統一された協定と規則に従う必要がある.AMQPは,メッセージを送信する側と受信する側がこのプロトコルに従うことを遵守するプロトコルである.このプロトコルは,メッセージの形式と動作を合意する.
各協定にはそれぞれの特徴があり,具体的な文書や情報については,こちらに掲載していません.
amqp プロトコルのサーバーを展開する:
docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-管理
Docker Mirror は,Docker Mirror をインストールするときに,自動的にダウンロードされ,完了すると,以下のように表示されます.
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
サーバ端ミラーが配備された後,テストの例を記述します.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
amqp プロトコルを使用するキューでは,リリースされたメッセージがキューに永続的に存在することを注意する必要があります. 例えば,上記の例コードを最初に実行すると,10 つのメッセージがキューに書き込まれます. 次に,読み取りが発見されるときに2 回目に実行すると,最初の書き込みが再び読み取れます.図のように:
スクリーンショットでは,赤い矢印が指している2つのログのメッセージが,この赤い文字が,最初の実行時に列に記入された,読み取られたメッセージであるため,タイムが一致していないことがわかります.
この特性によって,いくつかの要求が実現される.例えば: 戦略本格ディスクを再起動した後に,始動計算などの操作のために,キューから記録された市場データを取得することが可能である.
Apache Kafkaは,流のデータをリアルタイムに抽出し処理するために最適化された分散データストレージである. 流のデータは,数千のデータ源から継続的に生成されるデータであり,通常は同時にデータレコーディングを送ることができます. 流のプラットフォームは,これらの継続的な流のデータを処理し,順序順に段階的に処理する必要があります.
カフカは,ユーザーに3つの主要機能を提供しています.
カフカは,データ流に対応するリアルタイムストリームデータパイプラインとアプリケーションを構築するために使用される.メッセージ送信,保存,ストリーム処理機能を組み合わせ,歴史的データとリアルタイムデータを保存することができます.
カフカ代理を展開するドッカー画像:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
テストコードでテスト:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の Dial の
Dial("kafka://localhost:9092/test_topic")
他のいくつかのプロトコルのように,最初の部分はプロトコルの名前です.localhost:9092
◎次に"/"を間隔として使用し,その後にサブスクリプション/投稿のトピックを書き,テストトピックを設定します.test_topic
。
テスト結果: