[TOC]
金融市場の急速な発展と定量取引の普及により、ますます多くのトレーダーが取引の自動化戦略に頼り始めています。このプロセスでは、戦略間のコミュニケーションと調整が特に重要です。 FMZ (定量取引プラットフォーム) は、実際の取引戦略間の効率的な通信プロトコルを提供することで、トレーダーがシームレスな戦略統合とリアルタイムのデータ共有を実現できるように支援します。
この記事では、FMZ プラットフォームの取引戦略のリアルタイム通信プロトコルを詳細に検討し、その設計コンセプト、機能的特徴、および実際のアプリケーションにおける利点を紹介します。詳細なケース分析を通じて、このプロトコルを使用して効率的で安定した戦略コミュニケーションを実現し、取引戦略の実行と収益性を向上させる方法を説明します。
FMZ を使い始めたばかりの定量取引愛好家であっても、経験豊富なプロのプログラマーであっても、この記事は貴重な洞察と実用的な操作ガイドを提供します。 FMZ の強力な機能を探り、効率的な通信プロトコルを通じて戦略間の連携を実現し、取引効率を向上させ、市場機会を獲得する方法を学びましょう。
これらの需要シナリオは、実際のアプリケーションにおける FMZ 取引戦略のリアルタイム通信プロトコルのさまざまな可能性と利点を示しています。戦略間の効果的なコミュニケーションを通じて、トレーダーは複雑な市場環境にうまく対応し、取引戦略を最適化し、取引の効率と利益を向上させることができます。
実際のディスク間の通信要件を理解した後、これらの要件をどのように実現するかを検討する必要があります。要求は単純に見えますが、それは実市場 A が実市場 B と情報交換することを望んでいるに過ぎません。ただし、一連の通信プロトコルを使用してさまざまな詳細について合意する必要があります。FMZ は、いくつかの一般的な通信プロトコルをカプセル化しています。
mqtt / nats / amqp / kafka
通信アーキテクチャは次のとおりです。
これらのプロトコルをFMZプラットフォームに適用する場合、それは単にmqtt/nats/amqp/kafkaとして理解することができます。これらのプロトコルは、Dial()
関数では、Dial()
関数は、メッセージの公開やサブスクリプションなどの操作を実行します。これらの公開されたメッセージは、プロトコル サーバーを介してサブスクライブされた実際のディスクにプロキシ (中継) されるため、最初にプロトコル サーバーを実行する必要があります。デモンストレーションを簡単にするために、次の例ではさまざまなプロトコル サーバー イメージの展開を使用します。
ダイヤル機能 API ドキュメント セクション: https://www.fmz.com/syntax-guide#fun_dial
Docker イメージを展開する前に、まず Docker ソフトウェアをインストールすることを忘れないでください。
次に、FMZ でサポートされている通信プロトコル アプリケーションを調べて実践してみましょう。
MQTT (Message Queuing Telemetry Transport) は、低帯域幅、高遅延、または信頼性の低いネットワーク環境に特に適した軽量のメッセージ伝送プロトコルです。これは 1999 年に IBM の Andy Stanford-Clark 氏と Arlen Nipper 氏によって提案され、後に ISO 標準 (ISO/IEC PRF 20922) になりました。
MQTTプロトコルの主な機能:パブリッシュ/サブスクライブモード
MQTT プロキシ サーバーの導入には、MQTT プロトコルに対応したソフトウェアの docker イメージ (eclipse-mosquitto イメージ) を使用するため、事前に docker をインストールしておき、後ほど詳しく説明しません。
イメージを展開するコマンドを実行する前に、プロキシサーバーの設定ファイルを作成する必要があります。mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
次に、デプロイメント コマンドを実行します。
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
プロキシ サーバー イメージが実行されると、次の画面が表示されます。
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
その後、戦略をテストして実践することができます。
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
戦略コードにおける Dial 関数の主な用途は次のとおりです。
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial関数の文字列パラメータはmqtt://
プロトコル名の後にリスニング アドレスとポートが続きます。 「?」記号の後にサブスクリプション/公開トピック名が続きます。ここでテストされるトピック名は次のとおりです。test_topic
。
上記の戦略では、トピックの公開とサブスクライブを同時に行います。実行中のテストは図のようになります。
2 つの実際のディスクを使用して相互にサブスクライブし、トピック情報を公開することもできます。NATS プロトコルの実践セクションではこのような例を使用しますが、他のプロトコルではこの方法を繰り返すことはありません。
NATS プロトコルは、シンプルなテキストベースのパブリッシュ/サブスクライブ スタイルのプロトコルです。クライアントは gnatsd (NATS サーバー) に接続し、gnatsd と通信します。通信は通常の TCP/IP ソケットに基づいており、非常に小さな操作セットを定義します。改行は終了を示します。バイナリ メッセージ形式を使用する従来のメッセージング システムとは異なり、テキストベースの NATS プロトコルではクライアントの実装が簡単になり、さまざまなプログラミング言語やスクリプト言語で簡単に実装できます。
各プロトコルには独自の特徴があります。ここでは詳しく説明しませんので、具体的な文書や資料を参照してください。
NATS プロトコル サーバーを展開します。
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
この docker コマンドは、nats イメージを自動的にダウンロードして実行します。ポート 4222 は、クライアントがアクセスするポートです。イメージがデプロイされると、ポート 8222 で http モニターが開かれます。
Listening for client connections on 0.0.0.0:4222
Server is ready
NATS サーバー イメージが実行を開始し、ポート 4222 でリッスンします。
2 つの戦略 (実際の取引) を作成する必要があります。ここでは、戦略 A と戦略 B と名付けます。これら 2 つの戦略のコードは基本的に同じです。 FMZ プラットフォームで最も使いやすい言語である JavaScript で書かれています。
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
これら 2 つの戦略は、相互に公開およびサブスクライブすることと、サブスクライブされたトピック、公開されたトピック、および公開された情報が異なることを除いて、基本的に同じです。
戦略 B を例に挙げます。
Dial()
関数はクライアント接続サーバーオブジェクトを作成しますconnPub
トピックメッセージの公開に使用されます:var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)
Dial関数のパラメータ文字列はnats://
NATSプロトコルが通信に使用されていることを示します。admin
Dockerイメージをデプロイする際に設定される簡単な検証情報ですauth admin
、文字「@」を使用して次のコンテンツを区切り、その後にサービスアドレスとポートを入力します。127.0.0.1:4222
最後に、パブリッシュ/サブスクライブトピックです。topic=pubRobotB
前のアドレスの間に「?」記号があることに注意してください。
Dial()
関数はクライアント接続サーバーオブジェクトを作成しますconnSub
トピックメッセージのサブスクリプションに使用されます:var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)
唯一の違いはtopic=pubRobotA
異なるのは、戦略Aが情報を送信するトピックを購読する必要があるためです。pubRobotA
。
戦略 A でのサブスクリプションおよびパブリッシング接続オブジェクトの作成と使用は、上記で説明したものと同じです。
このようにして、実ディスク A と実ディスク B がメッセージをサブスクライブおよび公開して相互に通信する、NATS プロトコル アプリケーションの簡単な例が実装されます。
非同期通信では、メッセージはすぐに受信者に届くのではなく、コンテナに保存されます。特定の条件が満たされると、メッセージはコンテナによって受信者に送信されます。このコンテナがメッセージキューです。この機能を完了するには、双方ともコンテナとそのコンポーネントが統一された規則とルールに準拠する必要があります。AMQP はそのようなプロトコルです。メッセージの送信者と受信者の両方がこのプロトコルに準拠することで非同期通信を実現できます。このプロトコルは、メッセージの形式とその動作方法を指定します。
各プロトコルには独自の特徴があります。ここでは詳しく説明しませんので、具体的な文書や資料を参照してください。
amqp プロトコル サーバーをデプロイします。
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Docker イメージをデプロイすると、自動的にダウンロードおよびデプロイされ、完了すると次のように表示されます。
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
サーバー イメージがデプロイされたら、テスト例を記述します。
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
AMQP プロトコル キューを使用する場合、公開されたメッセージはキューに保持されることに注意してください。たとえば、上記のサンプル コードを 1 回実行してみましょう。 10 件のメッセージがキューに書き込まれます。次に 2 回目に実行すると、読み取り時に最初に書き込まれた情報が再度読み取られることがわかります。図に示すように:
スクリーンショットの赤い矢印で示されている 2 つのログ メッセージの時間が一致していないことがわかります。その理由は、赤いメッセージは、戦略コードが最初に実行されたときに読み取られ、キューに書き込まれたメッセージだからです。
この機能に基づいて、いくつかの要件を満たすことができます。たとえば、戦略を再起動した後でも、記録された市場データをキューから取得して、初期化計算やその他の操作を行うことができます。
Apache Kafka は、ストリーミング データをリアルタイムで取り込んで処理するために最適化された分散データ ストアです。ストリーミング データとは、何千ものデータ ソースによって継続的に生成され、多くの場合データ レコードが同時に送信されるデータのことです。ストリーミング プラットフォームは、この継続的に流れるデータを処理し、順番に段階的に処理する必要があります。
Kafka はユーザーに 3 つの主な機能を提供します。
Kafka は主に、データ ストリームに適応するリアルタイム ストリーミング データ パイプラインとアプリケーションの構築に使用されます。メッセージング、ストレージ、ストリーム処理機能を組み合わせて、履歴データとリアルタイムデータの両方を保存します。
Kafka プロキシの Docker イメージをデプロイします。
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
テストコードを使用してテストします。
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Dial 関数で Kafka プロトコルを使用してメッセージを公開およびサブスクライブする方法を見てみましょう。
Dial("kafka://localhost:9092/test_topic")
他のいくつかのプロトコルと同様に、最初の部分はプロトコル名です。次に、リスニング アドレスに従います。localhost:9092
。次に、記号「/」を区切りとして使用し、その後に購読/出版のトピックを続けます。ここでは、テストトピックは次のように設定されています。test_topic
。
テスト結果: