[TOC]
Dengan perkembangan pesat pasar keuangan dan popularitas perdagangan kuantitatif, semakin banyak pedagang yang mulai mengandalkan strategi otomatis untuk berdagang. Dalam proses ini, komunikasi dan koordinasi antara strategi menjadi sangat penting. FMZ (Quantitative Trading Platform) membantu pedagang mencapai perpaduan seamless strategi dan berbagi data real-time dengan menyediakan protokol komunikasi antar piringan strategi perdagangan yang efisien.
Artikel ini akan membahas secara mendalam protokol komunikasi real-time antara strategi perdagangan di platform FMZ, memperkenalkan konsep desain, fitur fungsional, dan keunggulan dalam aplikasi praktis. Kami akan menunjukkan melalui analisis kasus rinci bagaimana protokol ini dapat digunakan untuk mencapai komunikasi strategis yang efisien dan stabil, meningkatkan pelaksanaan dan kinerja pendapatan strategi perdagangan.
Apakah Anda seorang penggemar perdagangan kuantitatif baru FMZ atau seorang programmer profesional yang berpengalaman, artikel ini akan memberi Anda wawasan yang berharga dan panduan operasional yang praktis. Mari kita bersama-sama mengeksplorasi fungsi FMZ yang kuat dan belajar bagaimana melalui protokol komunikasi yang efisien, mencapai kolaborasi sinergis antar strategi, meningkatkan efisiensi perdagangan, dan menangkap peluang pasar.
Skenario permintaan ini menunjukkan berbagai kemungkinan dan keuntungan dari protokol komunikasi antar piringan dalam aplikasi praktis dari strategi perdagangan FMZ. Dengan komunikasi antar piringan strategis yang efektif, pedagang dapat lebih baik menangani lingkungan pasar yang kompleks, mengoptimalkan strategi perdagangan, meningkatkan efisiensi dan keuntungan perdagangan.
Setelah memahami kebutuhan komunikasi antara disk, maka perlu untuk mempertimbangkan bagaimana untuk mencapai kebutuhan tersebut. Tidak ada yang lain selain disk A yang ingin berinteraksi dengan disk B, meskipun kebutuhannya tampak sederhana. Namun, ada berbagai detail yang perlu disepakati dengan menggunakan seperangkat protokol komunikasi, FMZ telah mengemas beberapa protokol komunikasi yang lebih populer.
mqtt / nats / amqp / kafka
Arsitektur komunikasi adalah:
Ketika menggunakan protokol ini pada platform FMZ, yang dapat dipahami dengan mudah sebagai mqtt / nats / amqp / kafka.Dial()
Dalam fungsi, gunakanDial()
Fungsi ini melakukan pengiriman pesan, berlangganan, dan lain-lain. Pesan yang dikirimkan melalui agen server protokol (relay) ke disk langganan, jadi pertama-tama harus menjalankan server protokol. Untuk kemudahan demonstrasi, dalam contoh berikut kita menggunakan berbagai penyebaran mirror server protokol.
Bagian dari dokumen API di mana fungsi dial berada:https://www.fmz.com/syntax-guide#fun_dial
Sebelum Anda menginstal Docker Mirror, ingat untuk menginstal perangkat lunak Docker terlebih dahulu.
Setelah itu, mari kita bersama-sama mengeksplorasi dan menerapkan protokol komunikasi yang didukung FMZ.
MQTT (Message Queuing Telemetry Transport) adalah protokol pengiriman pesan ringan yang sangat cocok untuk lingkungan jaringan dengan bandwidth rendah, keterlambatan tinggi, atau tidak dapat diandalkan. Ini diusulkan oleh Andy Stanford-Clark dan Arlen Nipper dari IBM pada tahun 1999 dan kemudian menjadi standar ISO (ISO/IEC PRF 20922);
Fitur utama protokol MQTT: model penerbitan/langganan
Karena kami menggunakan Docker Mirror (eclipse-mosquitto mirror) yang mendukung protokol MQTT untuk mendistribusikan server proxy MQTT, maka docker harus diinstal terlebih dahulu dan tidak ada keterangan lebih lanjut.
Sebelum menjalankan perintah untuk mengimplementasikan mirror, kita harus menulis profil proxy server terlebih dahulu.mosquitto.conf
。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Setelah itu, lakukan perintah penyebaran:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Setelah server proxy berjalan, gambar berikut ini muncul:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Setelah itu, kita bisa menguji strategi dan mempraktikkannya.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Dalam kode strategi, fungsi dial digunakan terutama:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial memiliki string yang dimulai dengan parametermqtt://
Ini adalah nama protokol, diikuti dengan alamat pemantauan, port, tanda "?" dan kemudian nama subjek berlangganan / rilis, di mana nama subjek pengujian disebut:test_topic
。
Kebijakan di atas adalah untuk menjalankan tes pada saat posting, berlangganan, dan menjalankan sebuah topik:
Anda juga dapat menggunakan dua real disk untuk saling berlangganan, memposting informasi topik, contoh seperti yang kita gunakan di bagian praktik protokol nats, yang tidak lagi dijelaskan dalam protokol lainnya.
Protokol NATS adalah protokol yang sederhana, berbasis teks yang diterbitkan / gaya berlangganan. Klien terhubung ke gnatsd (NATS server) dan berkomunikasi dengan gnatsd, komunikasi didasarkan pada antarmuka TCP / IP biasa, dan mendefinisikan satu set operasi yang sangat kecil, mengubah baris yang berarti berakhir. Tidak seperti sistem komunikasi pesan tradisional, yang menggunakan format pesan biner, protokol NATS berbasis teks digunakan, yang memungkinkan klien untuk menerapkannya dengan sangat sederhana dan dapat dengan mudah memilih beberapa bahasa pemrograman atau bahasa skrip untuk menerapkannya.
Setiap protokol memiliki karakteristiknya sendiri, dan dokumen dan informasi spesifik dapat dilihat, tetapi tidak diuraikan di sini.
Menginstal server protokol nats:
Docker run
name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin
Perintah docker ini akan secara otomatis men-download dan menjalankan nats mirror pada port 4222 sebagai port yang akan diakses klien.
Listening for client connections on 0.0.0.0:4222
Server is ready
Saat ini, server-side mirror nats sudah berjalan dan memata-matai port 4222.
Kami harus membuat dua kebijakan (disk nyata) yang akan kami sebutkan sebagai kebijakan A dan kebijakan B, yang pada dasarnya memiliki kode yang sama.
Strategi A
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Strategi B
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Kedua strategi ini pada dasarnya sama, hanya saja posting, berlangganan, topik berlangganan, topik posting, posting informasi berbeda.
Sebagai contoh, strategi B:
1. PenggunaanDial()
Fungsi untuk membuat objek server yang terhubung ke klienconnPub
Di sini ada beberapa artikel yang membahas hal-hal penting yang harus dilakukan.
Var connPub = Digit127.0.0.1:4222?topic=pubRobotB”)
String parameter dari fungsi dial, dimulai dengannats://
Ini menunjukkan bahwa komunikasi menggunakan protokol Nats, dan kemudianadmin
Ini adalah pesan verifikasi sederhana yang disetel saat Anda menerapkan Docker Mirror.auth admin
, menggunakan karakter "@127.0.0.1:4222
Dan terakhir, posting/langganan:topic=pubRobotB
Perhatikan bahwa alamat di depan Anda harus diisi dengan tanda "
2, PenggunaanDial()
Fungsi untuk membuat objek server yang terhubung ke klienconnSub
Untuk berlangganan berita tentang topik ini:
var connSub = Digit127.0.0.1:4222?topic=pubRobotA”)
Perbedaannya hanyatopic=pubRobotA
Strategi A adalah untuk mengirim pesan dengan topik yang berbeda karena Anda harus berlangganan.pubRobotA
。
Untuk membuat dan menggunakan Subscribe, Publish Connection Object dalam Kebijakan A adalah sama seperti yang dijelaskan di atas.
Strategi A berjalan
Strategi B berjalan
Dengan demikian, sebuah contoh penggunaan protokol nats sederhana yang memungkinkan untuk saling berlangganan, mengirim pesan, dan berkomunikasi antara disk A dan disk B.
Dalam komunikasi asinkron, pesan tidak langsung sampai ke penerima, tetapi disimpan ke dalam sebuah wadah, setelah memenuhi kondisi tertentu, pesan akan dikirim oleh wadah ke penerima, wadah ini adalah antrian pesan, dan untuk mencapai fungsi ini memerlukan kedua belah pihak dan wadah serta komponen-komponen yang ada di dalamnya mematuhi perjanjian dan aturan yang seragam, AMQP adalah protokol seperti itu, pesan mengirim dan menerima pihak mematuhi protokol ini dapat mencapai komunikasi asinkron. Protokol ini menyetujui format dan cara kerja pesan.
Setiap protokol memiliki karakteristiknya sendiri, dan dokumen dan informasi spesifik dapat dilihat, tetapi tidak diuraikan di sini.
Mengimplementasikan server protokol amqp:
docker run
rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Ketika Anda menginstal Docker Mirror, aplikasi tersebut akan diunduh secara otomatis dan akan menampilkan:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Setelah server mirror sudah terpasang, tulis contoh tes:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Perlu dicatat bahwa queue yang menggunakan protokol amqp memiliki pesan yang permanen di queue setelah dipublikasikan, misalnya kita menjalankan kode contoh di atas sekali; kita akan menulis 10 pesan ke queue; kemudian kita akan membaca kembali pesan yang ditulis pertama kali saat kita menjalankan kedua kali.
Anda dapat melihat dua pesan log yang ditunjuk panah merah di layar, waktu yang tidak sama, karena warna merah adalah pesan yang dibaca adalah pesan yang ditulis di antrian saat kode kebijakan pertama kali dijalankan.
Fitur ini dapat memenuhi beberapa kebutuhan, misalnya: setelah kebijakan disk restart, Anda masih dapat mengambil data pasar yang tercatat dari antrian untuk operasi seperti perhitungan inisialisasi.
Apache Kafka adalah penyimpanan data terdistribusi yang dioptimalkan untuk mengekstrak dan memproses data aliran secara real-time. Data aliran adalah data yang terus-menerus dihasilkan oleh ribuan sumber data, yang biasanya dapat mengirim catatan data secara bersamaan. Platform aliran perlu memproses data yang terus-menerus masuk ini, diproses secara bertahap dalam urutan.
Kafka menawarkan tiga fitur utama kepada para penggunanya:
Kafka terutama digunakan untuk membangun pipa dan aplikasi data streaming real-time yang sesuai dengan aliran data. Ini menggabungkan fungsi pengiriman, penyimpanan, dan pemrosesan aliran untuk menyimpan data historis dan real-time.
Gambar Docker yang digunakan oleh pro Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Percobaan menggunakan kode tes:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Mari kita lihat bagaimana menggunakan protokol kafka untuk mengirim dan berlangganan pesan di fungsi dial.
Dial("kafka://localhost:9092/test_topic")
Seperti beberapa protokol lainnya, bagian awal adalah nama protokol.localhost:9092
Kemudian gunakan simbol "/" sebagai interval, kemudian tulis Subscribe/Publish topik, di mana topik uji diatur sebagaitest_topic
。
Hasil tes: