Tài nguyên đang được tải lên... tải...

Nhập

Đối với nguyên thủySockettiếp cận, hỗ trợtcp, udp, tls, unixHỗ trợ 4 giao thức truyền thông phổ biến:mqtt, nats, amqp, kafka. Hỗ trợ kết nối với cơ sở dữ liệu:sqlite3, mysql, postgres, clickhouse.

CácDial()hàm trả về null nếu nó hết thời gian. Một cuộc gọi bình thường trả về một đối tượng kết nối có ba phương thức:read, writeclose.readphương pháp được sử dụng để đọc dữ liệu,writephương pháp được sử dụng để gửi dữ liệu vàclosephương pháp được sử dụng để đóng kết nối. Cácreadphương pháp hỗ trợ các thông số sau: - Khi không có tham số được truyền, nó chặn cho đến khi một thông điệp có sẵn và trả về, chẳng hạn nhưws.read(). - Khi được chuyển vào như một tham số, đơn vị là milliseconds, xác định thời gian chờ tin nhắn.ws.read(2000)chỉ định thời gian nghỉ là hai giây (2000 millisecond). - Hai tham số sau đây chỉ có giá trị cho WebSocket: Chuyển parameter-1có nghĩa là chức năng trả về ngay lập tức, bất kể có hoặc không có tin nhắn, ví dụ:ws.read(-1). Chuyển parameter-2có nghĩa là hàm trả về ngay lập tức với hoặc không có tin nhắn, nhưng chỉ có tin nhắn mới nhất được trả về, và tin nhắn đệm được loại bỏ. Ví dụ:ws.read(-2).

The incoming data pushed by the WebSocket protocol may cause data accumulation if the time interval between strategy ```read()``` function calls is too long. These data are stored in the buffer, which has a data structure of a queue with a maximum of 2000. After 2000 is exceeded, the newest data enters the buffer and the oldest data is cleared out.
|Scenario|No parameter|Parameter: -1|Parameter: -2|Parameter: 2000, in milliseconds|
| - | - | - | - | - |
|Data already in the buffer|Return oldest data immediately|Return oldest data immediately|Return latest data immediately|Return oldest data immediately|
|No data in the buffer|Return when blocked to data|Return null immediately|Return null immediately|Wait 2000 ms, return null if no data, return null if there is data|
|WebSocket connection is disconnected or reconnected by the underlying |read() function returns the empty string, i.e.: "", and write() function returns 0. The situation is detected. You can close the connection using the close() function, or if you have set up automatic reconnection, you don't need to close it, the system underlying will reconnect it automatically.||||

object

Dial(address)
Dial(address, timeout)

Request address.
address
true
string
timeout seconds,
timeout
false
number

```javascript
function main(){
    // Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
    var client = Dial("tls://www.baidu.com:443")  
    if (client) {
        // write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while (true) {
            // read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
            var buf = client.read()
            if (!buf) {
                 break
            }
            Log(buf)
        }
        client.close()
    }
}
def main():
    client = Dial("tls://www.baidu.com:443")
    if client:
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while True:
            buf = client.read()
            if not buf:
                break
            Log(buf)
        client.close()
void main() {
    auto client = Dial("tls://www.baidu.com:443");
    if(client.Valid) {
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
        while(true) {
            auto buf = client.read();
            if(buf == "") {
                break;
            }
            Log(buf);
        }
        client.close();
    }
}

Ví dụ về một cuộc gọi hàm Dial:

function main() {
    LogStatus("Connecting...")
    // Accessing WebSocket interface of Binance
    var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        // read returns only the data retrieved after the read call
        var buf = client.read()      
        if (!buf) {
            break
        }
        var table = {
            type: 'table',
            title: 'Ticker Chart',
            cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
            rows: []
        }
        var obj = JSON.parse(buf)
        _.each(obj, function(ticker) {
            table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
        })
        LogStatus('`' + JSON.stringify(table) + '`')
    }
    client.close()
}
import json
def main():
    LogStatus("Connecting...")
    client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if not client:
        Log("Connection failed, program exited")
        return 
    
    while True:
        buf = client.read()
        if not buf:
            break
        table = {
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'], 
            "rows" : [] 
        }
        obj = json.loads(buf)
        for i in range(len(obj)):
            table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
        LogStatus('`' + json.dumps(table) + '`')
    client.close()
void main() {
    LogStatus("Connecting...");
    auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
    if(!client.Valid) {
        Log("Connection failed, program exited");
        return;
    }
    
    while(true) {
        auto buf = client.read();
        if(buf == "") {
            break;
        }
        json table = R"({
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"], 
            "rows" : []
        })"_json;
        json obj = json::parse(buf);
        for(auto& ele : obj.items()) {
            table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"], 
                ele.value()["q"], _D(ele.value()["E"])});
        }
        LogStatus("`" + table.dump() + "`");
    }
    client.close();
}

Để truy cập giao diện WebSocket ticker của Binance:

var ws = null 
function main(){
    var param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    // When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        var pingCyc = 1000 * 20
        var lastPingTime = new Date().getTime()
        while(true){
            var nowTime = new Date().getTime()
            var ret = ws.read()
            Log("ret:", ret)
            if(nowTime - lastPingTime > pingCyc){
                var retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send : ping", "#FF0000")
            }
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("exit")
}
import json
import time              

ws = None
def main():
    global ws 
    param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        pingCyc = 1000 * 20
        lastPingTime = time.time() * 1000
        while True:
            nowTime = time.time() * 1000
            ret = ws.read()
            Log("ret:", ret)
            if nowTime - lastPingTime > pingCyc:
                retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send: ping", "#FF0000")
            LogStatus("Current time:", _D())
            Sleep(1000)              

def onexit():
    ws.close()
    Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");              

void main() {
    json param = R"({
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    })"_json;
    
    objWS.write(param.dump());
    if(objWS.Valid) {
        uint64_t pingCyc = 1000 * 20;
        uint64_t lastPingTime = Unix() * 1000;
        while(true) {
            uint64_t nowTime = Unix() * 1000;
            auto ret = objWS.read();
            Log("ret:", ret);
            if(nowTime - lastPingTime > pingCyc) {
                auto retPing = objWS.write("ping");
                lastPingTime = nowTime;
                Log("Send: ping", "#FF0000");
            }
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    objWS.close();
    Log("exit");
}

Truy cập vào giao diện ticker WebSocket OKX:

var ws = null               

function main(){
    var param = {"sub": "market.btcusdt.detail", "id": "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        while(1){
            var ret = ws.read()
            Log("ret:", ret)
            // Respond to heartbeat packet operations
            try {
                var jsonRet = JSON.parse(ret)
                if(typeof(jsonRet.ping) == "number") {
                    var strPong = JSON.stringify({"pong" : jsonRet.ping})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
                }
            } catch(e) {
                Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
            }
            
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("Execute the ws.close() function")
}
import json
ws = None              

def main():
    global ws
    param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        while True:
            ret = ws.read()
            Log("ret:", ret)              
            # Respond to heartbeat packet operations
            try:
                jsonRet = json.loads(ret)
                if "ping" in jsonRet and type(jsonRet["ping"]) == int:
                    strPong = json.dumps({"pong" : jsonRet["ping"]})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
            except Exception as e:
                Log("e:", e)
                
            LogStatus("Current time:", _D())
            Sleep(1000)
    
def onexit():
    ws.close()
    Log("Execute the ws.close() function")  
using namespace std;
void main() {
    json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
    auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
    if(ws.Valid) {
        while(true) {
            auto ret = ws.read();
            Log("ret:", ret);              
            // Respond to heartbeat packet operations
            try 
            {
                auto jsonRet = json::parse(ret);
                if(jsonRet["ping"].is_number()) {
                    json pong = R"({"pong" : 0})"_json;
                    pong["pong"] = jsonRet["ping"];
                    auto strPong = pong.dump();
                    ws.write(strPong);
                    Log("Respond to ping, send pong:", strPong, "#FF0000");
                }
            } catch(exception &e) 
            {
                Log("e:", e.what());
            }
            
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    // ws.close();
    Log("Execute the ws.close() function");
}

Truy cập vào giao diện ticker WebSocket của Huobi:

function getLogin(pAccessKey, pSecretKey, pPassphrase) {
    // Signature function for login
    var ts = (new Date().getTime() / 1000).toString()
    var login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)   // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
        }]
    }    
    return login
}                

var client_private = null 
function main() {
    // Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
    SetErrorFilter("timeout")
    
    // Position channel subscription information
    var posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                

    var accessKey = "xxx"
    var secretKey = "xxx"
    var passphrase = "xxx"            

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)  // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
    client_private.write(JSON.stringify(posSubscribe))
    if (client_private) {
        var lastPingTS = new Date().getTime()
        while (true) {
            var buf = client_private.read(-1)
            if (buf) {
                Log(buf)
            }
            
            // Detect disconnection, reconnect
            if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
                Log("Disconnection detected, close connection, reconnect")
                client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(JSON.stringify(posSubscribe))
            }
            
            // Send heartbeat packets
            var nowPingTS = new Date().getTime()
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping")
                lastPingTS = nowPingTS
            }            
        }        
    }
}                

function onexit() {    
    var ret = client_private.close()
    Log("Close the connection!", ret)
}
import json
import time
  
def getLogin(pAccessKey, pSecretKey, pPassphrase):
    ts = str(time.time())
    login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
        }]
    }
    return login                 

client_private = None 
def main():
    global client_private
    SetErrorFilter("timeout")
    
    posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                  

    accessKey = "xxx"
    secretKey = "xxx"
    passphrase = "xxx"
    
    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)
    client_private.write(json.dumps(posSubscribe))
    if client_private:
        lastPingTS = time.time() * 1000
        while True:
            buf = client_private.read(-1)
            if buf:
                Log(buf)
            
            if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
                Log("Disconnection detected, close connection, reconnect")
                ret = client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(json.dumps(posSubscribe))
            
            nowPingTS = time.time() * 1000
            if nowPingTS - lastPingTS > 10 * 1000:
                client_private.write("ping")
                lastPingTS = nowPingTS                

def onexit():
    ret = client_private.close()
    Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");                  

json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
    auto ts = std::to_string(Unix());
    json login = R"({
        "op": "login",
        "args": [{
            "apiKey": "",
            "passphrase": "",
            "timestamp": "",
            "sign": ""
        }]
    })"_json;
    login["args"][0]["apiKey"] = pAccessKey;
    login["args"][0]["passphrase"] = pPassphrase;
    login["args"][0]["timestamp"] = ts;
    login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
    return login;
}                  

void main() {
    SetErrorFilter("timeout");
    json posSubscribe = R"({
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    })"_json;
    
    auto accessKey = "xxx";
    auto secretKey = "xxx";
    auto passphrase = "xxx";
    
    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
    Sleep(3000);
    client_private.write(posSubscribe.dump());                

    if (client_private.Valid) {
        uint64_t lastPingTS = Unix() * 1000;                  

        while (true) {
            auto buf = client_private.read(-1);
            if (buf != "") {
                Log(buf);
            }
            if (buf == "") {
                if (client_private.write(posSubscribe.dump()) == 0) {
                    Log("Disconnection detected, close connection, reconnect");
                    client_private.close();
                    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
                    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
                    Sleep(3000);
                    client_private.write(posSubscribe.dump());
                }
            }
            
            uint64_t nowPingTS = Unix() * 1000;
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping");
                lastPingTS = nowPingTS;
            }
        }
    }
}                  

void onexit() {
    client_private.close();
    Log("exit");
}

Để truy cập giao diện xác thực WebSocket OKX:

var client = null 
function main() {
    // client = Dial("sqlite3://:memory:")   // Using an in-memory database
    client = Dial("sqlite3://test1.db")      // Open/connect to the database file in the docker's directory
    
    // record handle
    var sqlite3Handle = client.fd()
    Log("sqlite3Handle:", sqlite3Handle)
    
    // Querying tables in the database
    var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
    Log(ret)
}

function onexit() {
    Log("Execute client.close()")
    client.close()
}
// Not supported
// Not supported

Đối tượng kết nối được trả về bởi hàm Dial khi kết nối với cơ sở dữ liệu có hai hàm phương thức độc đáo cho nó: - Không.exec(sqlString): Được sử dụng để thực thi lệnh SQL theo cách tương tự nhưDBExec()chức năng. - Không.fd(): Cácfd()hàm trả về một xử lý (ví dụ, biến xử lý là xử lý) để được sử dụng bởi các luồng khác để kết nối lại (ngay cả khi đối tượng được tạo bởi Dial đã được đóng bởi việc thực hiệnclose()chức năng để đóng kết nối) bằng cách đi tay cầm vàoDial()chức năng, ví dụ,Dial(handle)Kết nối tái sử dụng. Sau đây là một ví dụ về chức năng Dial kết nối với mộtsqlite3 database.

Thông tin chi tiếtaddresstham số, tách biệt với|ký hiệu sau địa chỉ thông thường:wss://ws.okx.com:8443/ws/v5/publicNếu có.|ký tự trong chuỗi tham số, sau đó||Phần sau đó là một số cài đặt tham số chức năng, và mỗi tham số được kết nối với&Ví dụ, cácss5Các thông số thay thế và nén có thể được đặt cùng nhau như sau:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")

Các chức năng được hỗ trợ bởi tham số địa chỉ của chức năng Dial Mô tả tham số
Các tham số liên quan đến nén dữ liệu giao thức WebSocket: compress=parameter value compress là phương pháp nén, các tùy chọn tham số nén là: gzip_raw, gzip, vv Nếu phương pháp gzip không phải là gzip tiêu chuẩn, bạn có thể sử dụng phương pháp mở rộng: gzip_raw
Các tham số liên quan đến nén dữ liệu giao thức WebSocket: mode=parameter value chế độ là chế độ nén, tham số chế độ có thể là kép, gửi, recv. kép là nén hai chiều, gửi dữ liệu nén, nhận dữ liệu nén. gửi là gửi dữ liệu nén. recv là nhận dữ liệu nén, giải nén cục bộ.
Giao thức WebSocket thiết lập các thông số liên quan đến tự động kết nối: reconnect=parameter value reconnect là đặt reconnect, reconnect=true là bật reconnect. mặc định là không reconnect khi tham số này không được đặt.
Giao thức WebSocket thiết lập các thông số liên quan đến tự động kết nối: interval=parameter value interval là khoảng thời gian thử lại, trong milliseconds, interval=10000 là khoảng thời gian thử lại 10 giây, mặc định là 1 giây khi nó không được thiết lập, tức là interval=1000.
Giao thức WebSocket thiết lập các thông số liên quan đến tự động kết nối: payload=parameter value payload là thông báo đăng ký cần được gửi khi WebSocket được kết nối lại, ví dụ: payload=okokok.
Các tham số liên quan đến vớ5 proxy: proxy=giá trị tham số proxy là cài đặt proxy ss5, định dạng giá trị tham số: socks5://name:pwd@192.168.0.1:1080, tên là tên người dùng máy chủ ss5, pwd là mật khẩu đăng nhập máy chủ ss5, 1080 là cổng dịch vụ ss5.

CácDial()chức năng chỉ được hỗ trợ cho giao dịch trực tiếp. Khi kết nối với cơ sở dữ liệu bằng chức năng Dial, chuỗi kết nối được viết với tham chiếu đến dự án trình điều khiển ngôn ngữ go cho mỗi cơ sở dữ liệu.

Cơ sở dữ liệu được hỗ trợ Các dự án thúc đẩy Dây kết nối Nhận xét
sqlite3 github.com/mattn/go-sqlite3 sqlite3://file:test.db?cache=shared&mode=memory Cácsqlite3://tiền tố chỉ ra rằng một cơ sở dữ liệu sqlite3 đang được sử dụng, ví dụ gọi:Dial("sqlite3://test1.db")
MySQL github.com/go-sql-driver/mysql mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4
sau github.com/lib/pq postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432
clickhouse github.com/ClickHouse/clickhouse-go clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase

Xin lưu ý rằng khipayloadnội dung được đặt trongaddresstham số chứa ký tự=hoặc các ký tự đặc biệt khác, nó có thể ảnh hưởng đến việc phân tích củaaddresstham số củaDialchức năng, chẳng hạn như ví dụ sau.

ví dụ gọi backPack Exchange websocket giao diện riêng:

var client = null

function main() {
    // Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
    var base64ApiKey = "xxx"

    var ts = String(new Date().getTime())
    var data = "instruction=subscribe&timestamp=" + ts + "&window=5000"

    // Since signEd25519 returns a base64 encoding, it contains the character "="
    var signature = signEd25519(data)
    
    // The payload may contain the character "=" after being encoded by JSON
    payload = {
        "method": "SUBSCRIBE",
        "params": ["account.orderUpdate"],
        "signature": [base64ApiKey, signature, ts, "5000"]
    }

    client = Dial("wss://ws.backpack.exchange")
    client.write(JSON.stringify(payload))
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        var buf = client.read()      
        Log(buf)
    }    
}

function onexit() {
    client.close()
}

function signEd25519(data) {
    return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}

Các cuộc gọi sau trong mã hoạt động tốt:

client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))

Nếu bạn viết nó trực tiếp vàopayload, nó sẽ không hoạt động đúng cách, ví dụ:

client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))

Hiện tại, chỉ có JavaScript hỗ trợ việc sử dụngmqtt, nats, amqp, vàkafkaMã chiến lược ngôn ngữ JavaScript được sử dụng như một ví dụ để hiển thị việc sử dụng bốn giao thức:mqtt, nats, amqp, vàkafka:

// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []

function main() {
    LogReset(1)
    conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
    conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
    conn_kafka = Dial("kafka://localhost:9092/test_topic")
    arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
    arrName = ["nats", "amqp", "mqtt", "kafka"]

    while (true) {
        for (var i in arrConn) {
            var conn = arrConn[i]
            var name = arrName[i]

            // Write data
            conn.write(name + ", time: " + _D() + ", test msg.")
            
            // Read data
            var readMsg = conn.read(1000)
            Log(name + " readMsg: ", readMsg, "#FF0000")
        }

        Sleep(1000)
    }
}

function onexit() {
    for (var i in arrConn) {
        arrConn[i].close()
        Log("close", arrName[i], "connect")
    }
}

Tài liệu chi tiết:Khám phá FMZ: Thực hành giao thức giao tiếp giữa các chiến lược giao dịch trực tiếp

GetMeta HttpQuery