В процессе загрузки ресурсов... загрузка...

Назначьте

Для примитивныхSocketДоступ, поддержкаtcp, udp, tls, unixПоддержка 4 популярных протоколов связи:mqtt, nats, amqp, kafkaПоддержка подключения к базам данных:sqlite3, mysql, postgres, clickhouse.

ВDial()Обычный вызов возвращает объект соединения, который имеет три метода:read, writeиclose.readДля считывания данных используетсяwriteС помощью этого метода передаются данные иcloseметод используется для закрытия соединения. Вreadметод поддерживает следующие параметры:

  • Когда параметры не передаются, он блокирует до тех пор, пока сообщение не будет доступно и вернется, например:ws.read().
  • При передаче в качестве параметра единица составляет миллисекунды, указывая время ожидания сообщения.ws.read(2000)указывает время задержки в две секунды (2000 миллисекунд).
  • Следующие два параметра действительны только для WebSocket: Передача параметра-1означает, что функция возвращает сообщения немедленно, независимо от наличия или отсутствия сообщений, например:ws.read(-1)- Да. Передача параметра-2означает, что функция возвращает сразу с сообщением или без него, но возвращается только последнее сообщение, а буферизированное сообщение отбрасывается.ws.read(-2).

read()описание буфера функции: Входящие данные, отправляемые протоколом WebSocket, могут вызывать накопление данных, если временной интервал между стратегиейread()Эти данные хранятся в буфере, который имеет структуру данных очереди с максимальным числом 2000.

Сценарий Нет параметра Параметр: -1 Параметр: -2 Параметр: 2000, в миллисекундах
Данные уже в буфере Немедленно верните старые данные Немедленно верните старые данные Немедленно верните последние данные Немедленно верните старые данные
Нет данных в буфере Возвращение при блокировке данных Немедленно верните нуль Немедленно верните нуль Подождите 2000 мс, верните null, если нет данных, верните null, если есть данные
Соединение WebSocket отключено или повторно подключено базовым read() функция возвращает пустую строку, т.е.: , и write() функция возвращает 0. Ситуация обнаружена. Вы можете закрыть соединение с помощью функции close(), или если вы настроили автоматическое воссоединение, вам не нужно его закрывать, основной система автоматически воссоединит его.

объект

Выбрать адрес Назови (адрес, время выхода)

Адрес запроса. Адрес неправда строка секунды таймаута, Тайм-аут ложное Номер

function main(){
    // Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
    var client = Dial("tls://www.baidu.com:443")  
    if (client) {
        // write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while (true) {
            // read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
            var buf = client.read()
            if (!buf) {
                 break
            }
            Log(buf)
        }
        client.close()
    }
}
def main():
    client = Dial("tls://www.baidu.com:443")
    if client:
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while True:
            buf = client.read()
            if not buf:
                break
            Log(buf)
        client.close()
void main() {
    auto client = Dial("tls://www.baidu.com:443");
    if(client.Valid) {
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
        while(true) {
            auto buf = client.read();
            if(buf == "") {
                break;
            }
            Log(buf);
        }
        client.close();
    }
}

Пример вызова функции Dial:

function main() {
    LogStatus("Connecting...")
    // Accessing WebSocket interface of Binance
    var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        // read returns only the data retrieved after the read call
        var buf = client.read()      
        if (!buf) {
            break
        }
        var table = {
            type: 'table',
            title: 'Ticker Chart',
            cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
            rows: []
        }
        var obj = JSON.parse(buf)
        _.each(obj, function(ticker) {
            table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
        })
        LogStatus('`' + JSON.stringify(table) + '`')
    }
    client.close()
}
import json
def main():
    LogStatus("Connecting...")
    client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if not client:
        Log("Connection failed, program exited")
        return 
    
    while True:
        buf = client.read()
        if not buf:
            break
        table = {
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'], 
            "rows" : [] 
        }
        obj = json.loads(buf)
        for i in range(len(obj)):
            table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
        LogStatus('`' + json.dumps(table) + '`')
    client.close()
void main() {
    LogStatus("Connecting...");
    auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
    if(!client.Valid) {
        Log("Connection failed, program exited");
        return;
    }
    
    while(true) {
        auto buf = client.read();
        if(buf == "") {
            break;
        }
        json table = R"({
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"], 
            "rows" : []
        })"_json;
        json obj = json::parse(buf);
        for(auto& ele : obj.items()) {
            table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"], 
                ele.value()["q"], _D(ele.value()["E"])});
        }
        LogStatus("`" + table.dump() + "`");
    }
    client.close();
}

Для доступа к интерфейсу WebSocket Binance:

var ws = null 
function main(){
    var param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    // When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        var pingCyc = 1000 * 20
        var lastPingTime = new Date().getTime()
        while(true){
            var nowTime = new Date().getTime()
            var ret = ws.read()
            Log("ret:", ret)
            if(nowTime - lastPingTime > pingCyc){
                var retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send : ping", "#FF0000")
            }
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("exit")
}
import json
import time              

ws = None
def main():
    global ws 
    param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        pingCyc = 1000 * 20
        lastPingTime = time.time() * 1000
        while True:
            nowTime = time.time() * 1000
            ret = ws.read()
            Log("ret:", ret)
            if nowTime - lastPingTime > pingCyc:
                retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send: ping", "#FF0000")
            LogStatus("Current time:", _D())
            Sleep(1000)              

def onexit():
    ws.close()
    Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");              

void main() {
    json param = R"({
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    })"_json;
    
    objWS.write(param.dump());
    if(objWS.Valid) {
        uint64_t pingCyc = 1000 * 20;
        uint64_t lastPingTime = Unix() * 1000;
        while(true) {
            uint64_t nowTime = Unix() * 1000;
            auto ret = objWS.read();
            Log("ret:", ret);
            if(nowTime - lastPingTime > pingCyc) {
                auto retPing = objWS.write("ping");
                lastPingTime = nowTime;
                Log("Send: ping", "#FF0000");
            }
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    objWS.close();
    Log("exit");
}

Доступ к интерфейсу OKX WebSocket:

var ws = null               

function main(){
    var param = {"sub": "market.btcusdt.detail", "id": "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        while(1){
            var ret = ws.read()
            Log("ret:", ret)
            // Respond to heartbeat packet operations
            try {
                var jsonRet = JSON.parse(ret)
                if(typeof(jsonRet.ping) == "number") {
                    var strPong = JSON.stringify({"pong" : jsonRet.ping})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
                }
            } catch(e) {
                Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
            }
            
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("Execute the ws.close() function")
}
import json
ws = None              

def main():
    global ws
    param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        while True:
            ret = ws.read()
            Log("ret:", ret)              
            # Respond to heartbeat packet operations
            try:
                jsonRet = json.loads(ret)
                if "ping" in jsonRet and type(jsonRet["ping"]) == int:
                    strPong = json.dumps({"pong" : jsonRet["ping"]})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
            except Exception as e:
                Log("e:", e)
                
            LogStatus("Current time:", _D())
            Sleep(1000)
    
def onexit():
    ws.close()
    Log("Execute the ws.close() function")  
using namespace std;
void main() {
    json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
    auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
    if(ws.Valid) {
        while(true) {
            auto ret = ws.read();
            Log("ret:", ret);              
            // Respond to heartbeat packet operations
            try 
            {
                auto jsonRet = json::parse(ret);
                if(jsonRet["ping"].is_number()) {
                    json pong = R"({"pong" : 0})"_json;
                    pong["pong"] = jsonRet["ping"];
                    auto strPong = pong.dump();
                    ws.write(strPong);
                    Log("Respond to ping, send pong:", strPong, "#FF0000");
                }
            } catch(exception &e) 
            {
                Log("e:", e.what());
            }
            
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    // ws.close();
    Log("Execute the ws.close() function");
}

Доступ к интерфейсу Huobi's WebSocket:

function getLogin(pAccessKey, pSecretKey, pPassphrase) {
    // Signature function for login
    var ts = (new Date().getTime() / 1000).toString()
    var login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)   // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
        }]
    }    
    return login
}                

var client_private = null 
function main() {
    // Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
    SetErrorFilter("timeout")
    
    // Position channel subscription information
    var posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                

    var accessKey = "xxx"
    var secretKey = "xxx"
    var passphrase = "xxx"            

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)  // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
    client_private.write(JSON.stringify(posSubscribe))
    if (client_private) {
        var lastPingTS = new Date().getTime()
        while (true) {
            var buf = client_private.read(-1)
            if (buf) {
                Log(buf)
            }
            
            // Detect disconnection, reconnect
            if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
                Log("Disconnection detected, close connection, reconnect")
                client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(JSON.stringify(posSubscribe))
            }
            
            // Send heartbeat packets
            var nowPingTS = new Date().getTime()
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping")
                lastPingTS = nowPingTS
            }            
        }        
    }
}                

function onexit() {    
    var ret = client_private.close()
    Log("Close the connection!", ret)
}
import json
import time
  
def getLogin(pAccessKey, pSecretKey, pPassphrase):
    ts = str(time.time())
    login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
        }]
    }
    return login                 

client_private = None 
def main():
    global client_private
    SetErrorFilter("timeout")
    
    posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                  

    accessKey = "xxx"
    secretKey = "xxx"
    passphrase = "xxx"
    
    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)
    client_private.write(json.dumps(posSubscribe))
    if client_private:
        lastPingTS = time.time() * 1000
        while True:
            buf = client_private.read(-1)
            if buf:
                Log(buf)
            
            if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
                Log("Disconnection detected, close connection, reconnect")
                ret = client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(json.dumps(posSubscribe))
            
            nowPingTS = time.time() * 1000
            if nowPingTS - lastPingTS > 10 * 1000:
                client_private.write("ping")
                lastPingTS = nowPingTS                

def onexit():
    ret = client_private.close()
    Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");                  

json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
    auto ts = std::to_string(Unix());
    json login = R"({
        "op": "login",
        "args": [{
            "apiKey": "",
            "passphrase": "",
            "timestamp": "",
            "sign": ""
        }]
    })"_json;
    login["args"][0]["apiKey"] = pAccessKey;
    login["args"][0]["passphrase"] = pPassphrase;
    login["args"][0]["timestamp"] = ts;
    login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
    return login;
}                  

void main() {
    SetErrorFilter("timeout");
    json posSubscribe = R"({
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    })"_json;
    
    auto accessKey = "xxx";
    auto secretKey = "xxx";
    auto passphrase = "xxx";
    
    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
    Sleep(3000);
    client_private.write(posSubscribe.dump());                

    if (client_private.Valid) {
        uint64_t lastPingTS = Unix() * 1000;                  

        while (true) {
            auto buf = client_private.read(-1);
            if (buf != "") {
                Log(buf);
            }
            if (buf == "") {
                if (client_private.write(posSubscribe.dump()) == 0) {
                    Log("Disconnection detected, close connection, reconnect");
                    client_private.close();
                    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
                    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
                    Sleep(3000);
                    client_private.write(posSubscribe.dump());
                }
            }
            
            uint64_t nowPingTS = Unix() * 1000;
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping");
                lastPingTS = nowPingTS;
            }
        }
    }
}                  

void onexit() {
    client_private.close();
    Log("exit");
}

Для доступа к интерфейсу аутентификации WebSocket OKX:

var client = null 
function main() {
    // client = Dial("sqlite3://:memory:")   // Using an in-memory database
    client = Dial("sqlite3://test1.db")      // Open/connect to the database file in the docker's directory
    
    // record handle
    var sqlite3Handle = client.fd()
    Log("sqlite3Handle:", sqlite3Handle)
    
    // Querying tables in the database
    var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
    Log(ret)
}

function onexit() {
    Log("Execute client.close()")
    client.close()
}
// Not supported
// Not supported

Объект соединения, возвращаемый функцией Dial при подключении к базе данных, имеет две уникальные для него функции метода:

  • exec(sqlString): Используется для выполнения SQL заявлений таким же образом, какDBExec() function.
  • fd():fd()функция возвращает ручку (например, переменная ручка является рукой), которая будет использоваться другими потоками для повторного подключения (даже если объект, созданный Dial, уже был закрыт при исполненииclose()с функцией закрытия соединения) путем прохождения ручки вDial()функция, например,Dial(handle)Подключение для повторного использования. Ниже приведен пример функции Dial, соединяющейsqlite3 database.

Подробная информацияaddressпараметр, разделенный|символ после обычного адреса:wss://ws.okx.com:8443/ws/v5/publicЕсли есть.|символы в строке параметров, затем||Часть после этого - некоторые параметры параметров функции, и каждый параметр связан с&Например,ss5параметры прокси и сжатия могут быть установлены вместе следующим образом:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")

Функции, поддерживаемые параметром адреса функции Dial Описание параметров
Параметры, связанные с сжатием данных протокола WebSocket: compress=value parameter compress - это метод сжатия, параметры сжатия: gzip_raw, gzip и т. д. Если метод gzip не является стандартным gzip, вы можете использовать расширенный метод: gzip_raw
Параметры, относящиеся к сжатию данных протокола WebSocket: mode=value parameter режим - это режим сжатия, параметр режима может быть двойным, отправка, recv. dual - это двустороннее сжатие, отправка сжатых данных, прием сжатых данных. отправка - это отправка сжатых данных. recv - это прием сжатых данных, локальная декомпрессия.
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: reconnect=значение параметра reconnect означает установить reconnect, reconnect=true означает включить reconnect. По умолчанию reconnect отсутствует, если этот параметр не установлен.
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: interval=value parameter интервал - это интервал повторных попыток, в миллисекундах, интервал=10000 - это интервал повторных попыток 10 секунд, по умолчанию 1 секунда, когда он не установлен, то есть интервал=1000.
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: полезная нагрузка=значение параметра полезная нагрузка - это сообщение подписки, которое необходимо отправить при повторном подключении WebSocket, например: полезная нагрузка=okokok.
Параметры, относящиеся к носкам5 прокси: proxy=значение параметра Прокси - это настройка прокси ss5, формат значения параметра: socks5://name:pwd@192.168.0.1:1080, имя - это имя пользователя сервера ss5, pwd - это пароль входа на сервер ss5, 1080 - это порт службы ss5.

ВDial()Функция поддерживается только для торговли в реальном времени. При подключении к базе данных с помощью функции Dial строка подключения записывается со ссылкой на проект драйвера языка go для каждой базы данных.

Базы данных поддерживаются Движущие проекты Соединительная строка Примечания
Склайт3 github.com/mattn/go-sqlite3 sqlite3://файл:test.db?cache=shared&mode=memory Вsqlite3://Префикс указывает, что используется база данных sqlite3, пример вызова:Dial("sqlite3://test1.db")
mysql github.com/go-sql-driver/mysql mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4
послерост github.com/lib/pq postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432
Кликхаус github.com/ClickHouse/clickhouse-go clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase

Пожалуйста, обратите внимание, что когдаpayloadсодержание, установленное вaddressпараметр содержит символы=или других специальных символов, это может повлиять на анализaddressпараметрDialфункция, например следующий пример.

Пример вызова частного интерфейса websocket backPack Exchange:

var client = null

function main() {
    // Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
    var base64ApiKey = "xxx"

    var ts = String(new Date().getTime())
    var data = "instruction=subscribe&timestamp=" + ts + "&window=5000"

    // Since signEd25519 returns a base64 encoding, it contains the character "="
    var signature = signEd25519(data)
    
    // The payload may contain the character "=" after being encoded by JSON
    payload = {
        "method": "SUBSCRIBE",
        "params": ["account.orderUpdate"],
        "signature": [base64ApiKey, signature, ts, "5000"]
    }

    client = Dial("wss://ws.backpack.exchange")
    client.write(JSON.stringify(payload))
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        var buf = client.read()      
        Log(buf)
    }    
}

function onexit() {
    client.close()
}

function signEd25519(data) {
    return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}

Следующий вызов в коде работает нормально:

client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))

Если вы напишете это прямо вpayload, он будет работать неправильно, например:

client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))

В настоящее время только JavaScript поддерживает использованиеmqtt, nats, amqp, иkafkaКод стратегии языка JavaScript используется в качестве примера, чтобы показать использование четырех протоколов:mqtt, nats, amqp, иkafka:

// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []

function main() {
    LogReset(1)
    conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
    conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
    conn_kafka = Dial("kafka://localhost:9092/test_topic")
    arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
    arrName = ["nats", "amqp", "mqtt", "kafka"]

    while (true) {
        for (var i in arrConn) {
            var conn = arrConn[i]
            var name = arrName[i]

            // Write data
            conn.write(name + ", time: " + _D() + ", test msg.")
            
            // Read data
            var readMsg = conn.read(1000)
            Log(name + " readMsg: ", readMsg, "#FF0000")
        }

        Sleep(1000)
    }
}

function onexit() {
    for (var i in arrConn) {
        arrConn[i].close()
        Log("close", arrName[i], "connect")
    }
}

Подробная документация:Исследование FMZ: Практика протокола связи между стратегиями торговли в режиме реального времени

GetMeta HttpQuery