संसाधन लोड हो रहा है... लोड करना...

एफएमजेड का अन्वेषण करेंः लेनदेन की रणनीति और वास्तविक डिस्क के बीच संचार प्रोटोकॉल का अभ्यास

लेखक:आविष्कारक मात्रा - छोटे सपने, बनाया गयाः 2024-08-06 14:13:40, अद्यतनः 2024-08-07 15:30:13

[TOC]

img

वित्तीय बाजारों के तेजी से विकास और मात्रात्मक लेनदेन की लोकप्रियता के साथ, अधिक से अधिक व्यापारी स्वचालित रणनीतियों पर भरोसा करना शुरू कर रहे हैं। इस प्रक्रिया में, रणनीतियों के बीच संचार और समन्वय विशेष रूप से महत्वपूर्ण है। FMZ (quantitative trading platform) कुशल व्यापार रणनीतियों को प्रदान करके वास्तविक डिस्क के बीच संचार प्रोटोकॉल प्रदान करता है, जिससे व्यापारियों को रणनीतियों के निर्बाध मिलान और वास्तविक समय डेटा साझा करने में मदद मिलती है।

इस लेख में एफएमजेड प्लेटफॉर्म में ट्रेडिंग रणनीतियों के लिए वास्तविक डिस्क संचार प्रोटोकॉल पर गहराई से विचार किया जाएगा, इसके डिजाइन विचार, कार्यात्मक विशेषताओं और वास्तविक अनुप्रयोगों में लाभों के बारे में बताया जाएगा। हम विस्तृत मामले विश्लेषण के माध्यम से दिखाएंगे कि इस प्रोटोकॉल का उपयोग कैसे किया जा सकता है ताकि कुशल, स्थिर रणनीतिक संचार प्राप्त किया जा सके, और ट्रेडिंग रणनीतियों के निष्पादन और लाभ प्रदर्शन में सुधार किया जा सके।

चाहे आप एफएमजेड के लिए नए हैं या अनुभवी पेशेवर प्रोग्रामर हैं, यह लेख आपको मूल्यवान अंतर्दृष्टि और व्यावहारिक संचालन गाइड प्रदान करेगा। आइए हम एफएमजेड के शक्तिशाली कार्यों का पता लगाएं और जानें कि कुशल संचार प्रोटोकॉल के माध्यम से रणनीतियों के बीच सहकार्य कैसे किया जाए, लेनदेन की दक्षता में सुधार और बाजार के अवसरों को कैसे पकड़ें।


मांग का परिदृश्य

    1. बहु-रणनीतिक सहयोग जरूरत के परिदृश्यः जटिल बाजार परिवेश में, एक एकल रणनीति विभिन्न प्रकार की आकस्मिकताओं और बाजार परिवर्तनों का सामना करने में असमर्थ हो सकती है। व्यापारी कई रणनीतियों को एक साथ चलाना चाहते हैं, जैसे कि ट्रेंड ट्रैकिंग रणनीति, औसत रिटर्न रणनीति और सूट रणनीति, और इन रणनीतियों के बीच वास्तविक समय में संचार करना चाहते हैं ताकि बाजार की जानकारी और व्यापार संकेत साझा किए जा सकें, जिससे समग्र व्यापार दक्षता और स्थिरता में सुधार हो सके।
    1. पार बाजार लाभ जरूरत के परिदृश्यः व्यापारी विभिन्न व्यापारिक बाजारों के बीच लाभप्रद व्यापार करना चाहते हैं; उदाहरण के लिए, ए शेयर बाजार और हांगकांग शेयर बाजार के बीच मूल्य अंतर का लाभ उठाते हैं; जब किसी बाजार में मूल्य असामान्य होता है, तो रणनीतिकारों को लाभप्रद अवसरों को पकड़ने के लिए अन्य बाजारों की रणनीतियों को समय पर सूचित करने की आवश्यकता होती है।
    1. जोखिम प्रबंधन और प्रतिभूति जरूरत के परिदृश्यः एक रणनीति बाजार में उच्च जोखिम वाले और उच्च रिटर्न वाले ट्रेडों को खोजने और निष्पादित करने के लिए जिम्मेदार होती है, जबकि दूसरी रणनीति समग्र जोखिम की निगरानी और हेजिंग ऑपरेशन करने पर केंद्रित होती है। उच्च जोखिम वाले ट्रेडिंग के दौरान अत्यधिक नुकसान नहीं होने के लिए, दोनों रणनीतियों को वास्तविक समय में संचार और डेटा साझा करने की आवश्यकता होती है ताकि समय पर स्थिति और हेजिंग जोखिम को समायोजित किया जा सके।
    1. वितरित लेनदेन प्रणाली मांग का परिदृश्यः बड़े ट्रेडिंग संस्थानों को कई भौतिक सर्वरों पर वितरित ट्रेडिंग सिस्टम चलाने की इच्छा होती है ताकि ट्रेडिंग सिस्टम की त्रुटिशीलता और प्रदर्शन में सुधार हो सके। इन सर्वरों पर रणनीतियों को संचार प्रोटोकॉल के माध्यम से डेटा सिंक्रनाइज़ और समन्वयित संचालन की आवश्यकता होती है, जिससे समग्र ट्रेडिंग सिस्टम का स्थिर और कुशल संचालन सुनिश्चित होता है।
    1. बाजार की निगरानी और चेतावनी जरूरत के परिदृश्यः एक रणनीति वास्तविक समय में बाजार की गतिशीलता की निगरानी करने के लिए विशेष रूप से जिम्मेदार है, और जब बाजार में महत्वपूर्ण परिवर्तन होते हैं (जैसे अचानक मूल्य गिरना या बढ़ना) तो रणनीति को अन्य रणनीतियों को तुरंत सूचित करने की आवश्यकता होती है ताकि जोखिम को कम करने या व्यापार के अवसरों को पकड़ने के लिए उचित प्रतिक्रिया कार्रवाई की जा सके, जैसे कि समतल, स्थानांतरित या बढ़ाना।
    1. संयोजन रणनीति प्रबंधन जरूरत के परिदृश्यः व्यापारी विभिन्न प्रकार के परिसंपत्ति वर्गों में निवेश का प्रबंधन करने के लिए रणनीतियों के एक संयोजन का उपयोग करते हैं, जिनमें से प्रत्येक एक विशिष्ट परिसंपत्ति वर्ग (जैसे स्टॉक, बॉन्ड, फ्यूचर्स आदि) पर केंद्रित होता है। इन रणनीतियों को समग्र निवेश अनुकूलन और अधिकतम लाभ प्राप्त करने के लिए संचार और समन्वय की आवश्यकता होती है।

इन मांग परिदृश्यों ने वास्तविक अनुप्रयोगों में एफएमजेड ट्रेडिंग रणनीतियों के वास्तविक डिस्क संचार प्रोटोकॉल की कई संभावनाओं और लाभों को दिखाया है। प्रभावी रणनीतिक संचार के माध्यम से, व्यापारी जटिल बाजार वातावरण का बेहतर सामना कर सकते हैं, व्यापार रणनीतियों को अनुकूलित कर सकते हैं, और व्यापार दक्षता और आय को बढ़ा सकते हैं।


FMZ में समाहित संचार प्रोटोकॉल और डायल फ़ंक्शन

एक बार जब आप डिस्क के बीच संचार की आवश्यकताओं को समझ लेते हैं, तो आप उन आवश्यकताओं को कैसे पूरा कर सकते हैं, इस पर विचार करना शुरू कर देते हैं। डिस्क ए केवल डिस्क बी के साथ बातचीत करना चाहता है, हालांकि यह बहुत सरल दिखता है। हालांकि, इसके लिए विभिन्न विवरणों को एक संचार प्रोटोकॉल का उपयोग करने की आवश्यकता होती है। एफएमजेड ने कुछ लोकप्रिय संचार प्रोटोकॉल को शामिल किया है।

mqtt / nats / amqp / kafka

संचार संरचना

संचार ढांचाः

  • सर्वर (एजेंट) । एक संचार प्रोटोकॉल के साथ एक सर्वर को चलाने की आवश्यकता होती है, जिसका उपयोग संदेशों को रिले करने के लिए किया जाता है जो ग्राहकों और प्रकाशकों के बीच भेजे जाते हैं। यह सर्वर होस्ट के सिस्टम में स्थानीय ("स्थानीय डिस्क के बीच संचार") या दूरस्थ सेवा ("सर्वर के बीच संचार") पर तैनात किया जा सकता है।
  • ग्राहक (सदस्य, प्रकाशक) । एफएमजेड पर नीतिगत डिस्क प्रोग्राम को एक संचार प्रोटोकॉल के ग्राहक के रूप में समझा जा सकता है, नीतिगत डिस्क एक प्रकाशक (pub) या एक ग्राहक (sub) हो सकता है।

डायल फ़ंक्शन

एफएमजेड प्लेटफॉर्म पर इन प्रोटोकॉल को लागू करते समय, यह आसानी से समझा जा सकता है कि mqtt / nats / amqp / kafka इन प्रोटोकॉल को एकीकृत किया गया हैDial()फ़ंक्शन में, उपयोगDial()फ़ंक्शन संदेश भेजता है, सदस्यता लेता है, आदि। ये संदेश प्रोटोकॉल के सर्वर-टर्म एजेंटों द्वारा रिले किए जाते हैं, इसलिए पहले एक प्रोटोकॉल सर्वर को चलाना पड़ता है। उदाहरण के लिए, हम विभिन्न प्रोटोकॉल सर्वर-टर्म दर्पण तैनाती का उपयोग करते हैं।

डायल फ़ंक्शन के लिए एपीआई दस्तावेज़ अनुभागःhttps://www.fmz.com/syntax-guide#fun_dial

डॉकर मिरर को तैनात करने से पहले, डॉकर सॉफ़्टवेयर स्थापित करना याद रखें।

img

अब हम FMZ द्वारा समर्थित संचार प्रोटोकॉल के अनुप्रयोगों का अन्वेषण और अभ्यास करेंगे।


एफएमजेड प्लेटफॉर्म पर वास्तविक संचार प्रोटोकॉल का अभ्यास

mqtt प्रोटोकॉल

MQTT (Message Queuing Telemetry Transport) एक हल्का संदेश परिवहन प्रोटोकॉल है, जो विशेष रूप से कम बैंडविड्थ, उच्च विलंबता या अविश्वसनीय नेटवर्क वातावरण के लिए उपयुक्त है। यह 1999 में आईबीएम के एंडी स्टैनफोर्ड-क्लार्क और अर्लेन निपर द्वारा प्रस्तावित किया गया था, जो बाद में आईएसओ मानक (आईएसओ/आईईसी पीआरएफ 20922) बन गया।

एमक्यूटीटी प्रोटोकॉल की मुख्य विशेषताएंः प्रकाशन/सदस्यता मोड

  • पोस्टिंगः समाचार निर्माताओं ने विषय पर संदेश भेजा।
  • Subscribe: संदेश उपभोक्ताओं को उन विषयों के लिए Subscribe करना चाहिए जिनकी वे रुचि रखते हैं ताकि वे उस विषय पर प्रकाशित संदेश प्राप्त कर सकें।
  • मध्यस्थः MQTT संदेशों को प्रसारित करने के लिए मध्यस्थ के रूप में एक संदेश एजेंट (ब्रोकर) का उपयोग करता है, जिससे प्रकाशक और ग्राहक के बीच की खाई को सुनिश्चित किया जा सकता है।

समाचार और सदस्यता

चूंकि हम एमक्यूटीटी प्रोक्सी सर्वर को एमक्यूटीटी प्रोटोकॉल का समर्थन करने वाले सॉफ़्टवेयर का उपयोग करते हैं, इसलिए हम पहले से डॉकर स्थापित करते हैं।

हम एक प्रोक्सी सर्वर प्रोफाइल लिखने की जरूरत है कि हम आदेश चलाने से पहले एक दर्पण तैनात करने के लिएmosquitto.conf

# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true

फिर, तैनाती कमांड निष्पादित करेंः

docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

प्रोक्सी सर्वर का आईडिया काम करने के बाद इस तरह दिखता हैः

1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running

और फिर हम रणनीति का परीक्षण कर सकते हैं और अभ्यास कर सकते हैं।

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

रणनीति कोड में मुख्य रूप से डायल फ़ंक्शन का उपयोग किया जाता हैः

Dial("mqtt://127.0.0.1:1883?topic=test_topic")

डायल फ़ंक्शन के स्ट्रिंग पैरामीटर की शुरुआतmqtt://प्रोटोकॉल का नाम है, इसके बाद मॉनिटरिंग पता, पोर्ट, "? प्रतीक" के बाद सब्सक्राइब / रिलीज विषय का नाम है, जहां परीक्षण विषय का नाम हैःtest_topic

एक विषय के लिए पोस्टिंग, सदस्यता और परीक्षण के लिए उपरोक्त रणनीतियाँ इस प्रकार हैंः

img

यह भी संभव है कि दो वास्तविक डिस्क एक दूसरे को सब्सक्राइब करें, विषय पर जानकारी प्रकाशित करें, जैसा कि हम नेट्स प्रोटोकॉल के अभ्यास अनुभाग में उपयोग करते हैं, अन्य प्रोटोकॉल में इस तरह से नहीं बताया गया है।


nats समझौता

NATS का प्रोटोकॉल एक सरल, पाठ-आधारित प्रकाशन/सदस्यता शैली का प्रोटोकॉल है। क्लाइंट gnatsd (NATS सर्वर) से जुड़ा होता है और gnatsd के साथ संवाद करता है। यह सामान्य TCP/IP इंटरफेस पर आधारित होता है और बहुत छोटे ऑपरेशन सेट को परिभाषित करता है।

प्रत्येक समझौते की अपनी विशेषताएं हैं, और आप विशिष्ट दस्तावेजों, सूचनाओं को देख सकते हैं।

नट्स प्रोटोकॉल सर्वर पर तैनात करेंः

docker run name nats rm -p 4222:4222 -p 8222:8222 nats http_port 8222 auth admin

यह डॉकर कमांड स्वचालित रूप से nats छवि को डाउनलोड करता है और चलाता है, पोर्ट 4222 को उस पोर्ट के लिए चुना जाता है जिसे क्लाइंट एक्सेस करना चाहता है।

Listening for client connections on 0.0.0.0:4222
Server is ready

nats सर्वर टर्मिनल मिरर चालू हो जाता है, पोर्ट 42222 पर निगरानी करता है।

स्थानीय डिवाइस वास्तविक डिस्क रणनीति के बीच संचार

हमें दो नीतियां बनाने की आवश्यकता है (रियल डिस्क) और अभी के लिए नीति ए और नीति बी का नाम दें, दोनों नीतियां मूल रूप से एक ही कोड में हैं।

  • रणनीति ए

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    
  • रणनीति बी

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    

दोनों रणनीतियाँ मूल रूप से समान हैं, केवल एक-दूसरे को पोस्ट करने, सब्सक्राइब करने, सब्सक्राइब करने, पोस्ट करने, पोस्ट करने के लिए अलग-अलग हैं।

उदाहरण के लिए, रणनीति बीः

  • १, उपयोगDial()फ़ंक्शन क्लाइंट से कनेक्ट सर्वर ऑब्जेक्ट्स बनाता हैconnPubइस लेख के माध्यम से, हम आपको कुछ महत्वपूर्ण जानकारी प्रदान करते हैंः

    var connPub = डायल करें127.0.0.1:4222?topic=pubRobotB”)

    डायल फ़ंक्शन के लिए पैरामीटर संख्या स्ट्रिंग,nats://यह दर्शाता है कि हम NATS प्रोटोकॉल का उपयोग कर रहे हैं, और फिरadminDocker Mirror को तैनात करते समय सेट किए गए सरल सत्यापन संदेशauth admin, "@" वर्ण का उपयोग करके इसके बाद सामग्री के साथ अंतराल, और फिर सेवा का पता और पोर्ट127.0.0.1:4222और अंत में, पोस्ट / सब्सक्राइब करेंःtopic=pubRobotBकृपया ध्यान दें कि पहले के पते के बीच में "क्या" संकेत का अंतराल है।

  • 2। उपयोगDial()फ़ंक्शन क्लाइंट से कनेक्ट सर्वर ऑब्जेक्ट्स बनाता हैconnSubइस लेख में, हम आपको कुछ महत्वपूर्ण समाचारों के बारे में बताएंगे।

    var connSub = डायल करें127.0.0.1:4222?topic=pubRobotA”)

    फर्क सिर्फ इतना हैtopic=pubRobotAयह एक अलग विषय है, क्योंकि यह सब्सक्राइब करने के लिए आवश्यक है।pubRobotA

नीति A में सदस्यता, प्रकाशन और कनेक्शन ऑब्जेक्ट के निर्माण और उपयोग के लिए ऊपर वर्णित के समान है।

  • रणनीति ए चल रही है

    img

  • रणनीति बी चल रही है

    img

इस प्रकार एक सरल NATS प्रोटोकॉल अनुप्रयोग का उदाहरण प्राप्त होता है जिसमें डिस्क A और डिस्क B के बीच एक-दूसरे को सब्सक्राइब करने, संदेश भेजने और संवाद करने के लिए एक-दूसरे को भेजने का कार्य होता है।


amqp प्रोटोकॉल

amqp प्रोटोकॉल की कतार

असिंक्रोनस संचार में, संदेश तुरंत प्राप्तकर्ता तक नहीं पहुंचता है, बल्कि एक कंटेनर में संग्रहीत किया जाता है, जब कुछ शर्तें पूरी हो जाती हैं, तो संदेश को कंटेनर द्वारा प्राप्तकर्ता को भेजा जाता है, यह कंटेनर संदेश कतार है, और इस कार्य को पूरा करने के लिए दोनों पक्षों और कंटेनर और इसके विभिन्न घटकों को एक समान समझौते और नियमों का पालन करने की आवश्यकता होती है, एएमक्यूपी एक ऐसा प्रोटोकॉल है, जो संदेश भेजने वाले और प्राप्त करने वाले दोनों पक्षों का पालन करता है। यह प्रोटोकॉल संदेश के प्रारूप और काम करने के तरीके को निर्धारित करता है।

प्रत्येक समझौते की अपनी विशेषताएं हैं, और आप विशिष्ट दस्तावेजों, सूचनाओं को देख सकते हैं।

amqp प्रोटोकॉल सर्वर पर तैनात करेंः

docker run rm hostname my-rabbit name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-प्रबंधन

Docker Mirror को तैनात करते समय, यह स्वचालित रूप से डाउनलोड हो जाता है और जब यह पूरा हो जाता है तो यह प्रदर्शित होता हैः

2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms

एक बार जब सर्वर परमिट की छवि अच्छी तरह से तैनात हो जाती है, तो एक परीक्षण उदाहरण लिखेंः

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 读取
        Log("read:", conn.read(1000), "#FF0000")
        
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

यह ध्यान रखना महत्वपूर्ण है कि एएमक्यूपी प्रोटोकॉल का उपयोग करने वाली कतारों में संदेश जारी होने के बाद कतारों में स्थायी रूप से मौजूद होते हैं, उदाहरण के लिए, हम ऊपर दिए गए उदाहरण कोड को एक बार चलाते हैं। हम कतार में 10 संदेश लिखते हैं। फिर हम दूसरी बार पढ़ सकते हैं कि जब हम पढ़ते हैं तो पहली बार लिखे गए संदेश को फिर से पढ़ते हैं। जैसा कि चित्र में दिखाया गया हैः

img

आप देख सकते हैं कि स्क्रीनशॉट में लाल तीर से इंगित किए गए दो लॉग संदेश समय के साथ मेल नहीं खाते हैं, क्योंकि लाल रंग में यह पढ़ा गया संदेश है जो नीति कोड को पहली बार चलाने पर कतार में लिखा गया था।

इस विशेषता के आधार पर कुछ आवश्यकताओं को पूरा किया जा सकता है, उदाहरण के लिएः नीति वास्तविक डिस्क को फिर से शुरू करने के बाद, क्यू से रिकॉर्ड किए गए बाजार डेटा को अभी भी प्राप्त किया जा सकता है, जिसे प्रारंभिक गणना जैसे कार्यों के लिए उपयोग किया जा सकता है।


काफ्का समझौता

अपाचे काफ्का एक वितरित डेटा भंडारण है जो वास्तविक समय में स्ट्रीम डेटा को निकालने और संसाधित करने के लिए अनुकूलित है। स्ट्रीम डेटा का अर्थ है हजारों डेटा स्रोतों से लगातार उत्पन्न डेटा, जो आमतौर पर एक साथ डेटा रिकॉर्ड भेज सकते हैं। स्ट्रीम प्लेटफार्मों को इन लगातार आने वाले डेटा को संसाधित करने की आवश्यकता होती है, जो क्रम में चरणबद्ध तरीके से संसाधित होते हैं।

Kafka अपने उपयोगकर्ताओं के लिए तीन मुख्य सुविधाएँ प्रदान करता हैः

  • प्रकाशन और सदस्यता रिकॉर्ड स्ट्रीम
  • रिकॉर्ड के निर्माण क्रम के अनुसार रिकॉर्ड स्ट्रीम को कुशलता से संग्रहीत करें
  • रीयलटाइम प्रसंस्करण

काफ्का मुख्य रूप से डेटा प्रवाह के अनुकूल वास्तविक समय में प्रवाह डेटा पाइपलाइनों और अनुप्रयोगों के निर्माण के लिए उपयोग किया जाता है। यह संदेश रिसीवर, भंडारण और प्रवाह प्रसंस्करण कार्यक्षमताओं को जोड़ता है, जो ऐतिहासिक और वास्तविक समय डेटा को संग्रहीत कर सकता है।

समाचार और सदस्यता

काफ्का एजेंट को तैनात करने वाले डॉकर का चित्रः

docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
        -e KAFKA_CFG_NODE_ID=0 \
        -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
        -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
        -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
        -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
        -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
        -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
        bitnami/kafka:latest

परीक्षण कोड का उपयोग करके परीक्षण करेंः

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("kafka://localhost:9092/test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

आइए देखें कि डायल फ़ंक्शन में Kafka प्रोटोकॉल का उपयोग करके संदेश पोस्ट करने और सदस्यता लेने के लिए कैसे करें।

Dial("kafka://localhost:9092/test_topic")

कुछ अन्य प्रोटोकॉल की तरह, प्रोटोकॉल का नाम शुरू होता है; इसके बाद एक निगरानी पते के साथःlocalhost:9092◎ फिर "/" प्रतीक का उपयोग करके अंतराल के रूप में, इसके बाद सदस्यता / प्रकाशन विषय लिखें, जहां परीक्षण विषय सेट हैtest_topic

परीक्षण के परिणामः

img


अधिक