Für primitiveSocketZugang, Unterstützungtcp, udp, tls, unixUnterstützung von 4 gängigen Kommunikationsprotokollen:mqtt, nats, amqp, kafkaUnterstützung für die Verbindung zu Datenbanken:sqlite3, mysql, postgres, clickhouse.

DieDial()Ein normaler Aufruf gibt ein Verbindungsobjekt zurück, das drei Methoden hat:read, writeundclose. DiereadDie Methode wird zur Ablesung der Daten verwendet.writeDaten zu senden und diecloseMethode verwendet wird, um die Verbindung zu schließen. DiereadDas Verfahren unterstützt folgende Parameter: - Wenn keine Parameter übergeben werden, blockiert es, bis eine Nachricht verfügbar ist und zurückkehrt, z. B.ws.read()- Ich weiß. - Wenn als Parameter eingegeben, ist die Einheit Millisekunden, die die Wartezeit der Nachricht angibt.ws.read(2000)eine Zeitraffer von zwei Sekunden (2000 Millisekunden) angibt. - Die folgenden beiden Parameter sind nur für WebSocket gültig: Übergabe des Parameters-1bedeutet, dass die Funktion unabhängig vom Vorhandensein oder Fehlen von Nachrichten sofort zurückgibt, z. B.:ws.read(-1)- Ich weiß. Übergabe des Parameters-2bedeutet, dass die Funktion sofort mit oder ohne eine Nachricht zurückgibt, aber nur die letzte Nachricht zurückgegeben wird, und die pufferte Nachricht wird entsorgt.ws.read(-2).

The incoming data pushed by the WebSocket protocol may cause data accumulation if the time interval between strategy ```read()``` function calls is too long. These data are stored in the buffer, which has a data structure of a queue with a maximum of 2000. After 2000 is exceeded, the newest data enters the buffer and the oldest data is cleared out.
|Scenario|No parameter|Parameter: -1|Parameter: -2|Parameter: 2000, in milliseconds|
| - | - | - | - | - |
|Data already in the buffer|Return oldest data immediately|Return oldest data immediately|Return latest data immediately|Return oldest data immediately|
|No data in the buffer|Return when blocked to data|Return null immediately|Return null immediately|Wait 2000 ms, return null if no data, return null if there is data|
|WebSocket connection is disconnected or reconnected by the underlying |read() function returns the empty string, i.e.: "", and write() function returns 0. The situation is detected. You can close the connection using the close() function, or if you have set up automatic reconnection, you don't need to close it, the system underlying will reconnect it automatically.||||


Dial(address, timeout)

Request address.
timeout seconds,

function main(){
    // Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
    var client = Dial("tls://www.baidu.com:443")  
    if (client) {
        // write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while (true) {
            // read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
            var buf = client.read()
            if (!buf) {
def main():
    client = Dial("tls://www.baidu.com:443")
    if client:
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while True:
            buf = client.read()
            if not buf:
void main() {
    auto client = Dial("tls://www.baidu.com:443");
    if(client.Valid) {
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
        while(true) {
            auto buf = client.read();
            if(buf == "") {

Beispiel für einen Aufruf der Funktion "Dial":

function main() {
    // Accessing WebSocket interface of Binance
    var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if (!client) {
        Log("Connection failed, program exited")
    while (true) {
        // read returns only the data retrieved after the read call
        var buf = client.read()      
        if (!buf) {
        var table = {
            type: 'table',
            title: 'Ticker Chart',
            cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
            rows: []
        var obj = JSON.parse(buf)
        _.each(obj, function(ticker) {
            table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
        LogStatus('`' + JSON.stringify(table) + '`')
import json
def main():
    client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if not client:
        Log("Connection failed, program exited")
    while True:
        buf = client.read()
        if not buf:
        table = {
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'], 
            "rows" : [] 
        obj = json.loads(buf)
        for i in range(len(obj)):
            table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
        LogStatus('`' + json.dumps(table) + '`')
void main() {
    auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
    if(!client.Valid) {
        Log("Connection failed, program exited");
    while(true) {
        auto buf = client.read();
        if(buf == "") {
        json table = R"({
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"], 
            "rows" : []
        json obj = json::parse(buf);
        for(auto& ele : obj.items()) {
            table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"], 
                ele.value()["q"], _D(ele.value()["E"])});
        LogStatus("`" + table.dump() + "`");

Zugriff auf die WebSocket-Ticker-Schnittstelle von Binance:

var ws = null 
function main(){
    var param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
    // When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
        var pingCyc = 1000 * 20
        var lastPingTime = new Date().getTime()
            var nowTime = new Date().getTime()
            var ret = ws.read()
            Log("ret:", ret)
            if(nowTime - lastPingTime > pingCyc){
                var retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send : ping", "#FF0000")
            LogStatus("Current time:", _D())

function onexit() {
import json
import time              

ws = None
def main():
    global ws 
    param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        pingCyc = 1000 * 20
        lastPingTime = time.time() * 1000
        while True:
            nowTime = time.time() * 1000
            ret = ws.read()
            Log("ret:", ret)
            if nowTime - lastPingTime > pingCyc:
                retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send: ping", "#FF0000")
            LogStatus("Current time:", _D())

def onexit():
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");              

void main() {
    json param = R"({
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
    if(objWS.Valid) {
        uint64_t pingCyc = 1000 * 20;
        uint64_t lastPingTime = Unix() * 1000;
        while(true) {
            uint64_t nowTime = Unix() * 1000;
            auto ret = objWS.read();
            Log("ret:", ret);
            if(nowTime - lastPingTime > pingCyc) {
                auto retPing = objWS.write("ping");
                lastPingTime = nowTime;
                Log("Send: ping", "#FF0000");
            LogStatus("Current time:", _D());

void onexit() {

Zugriff auf die OKX WebSocket-Ticker-Schnittstelle:

var ws = null               

function main(){
    var param = {"sub": "market.btcusdt.detail", "id": "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
            var ret = ws.read()
            Log("ret:", ret)
            // Respond to heartbeat packet operations
            try {
                var jsonRet = JSON.parse(ret)
                if(typeof(jsonRet.ping) == "number") {
                    var strPong = JSON.stringify({"pong" : jsonRet.ping})
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
            } catch(e) {
                Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
            LogStatus("Current time:", _D())

function onexit() {
    Log("Execute the ws.close() function")
import json
ws = None              

def main():
    global ws
    param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        while True:
            ret = ws.read()
            Log("ret:", ret)              
            # Respond to heartbeat packet operations
                jsonRet = json.loads(ret)
                if "ping" in jsonRet and type(jsonRet["ping"]) == int:
                    strPong = json.dumps({"pong" : jsonRet["ping"]})
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
            except Exception as e:
                Log("e:", e)
            LogStatus("Current time:", _D())
def onexit():
    Log("Execute the ws.close() function")  
using namespace std;
void main() {
    json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
    auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
    if(ws.Valid) {
        while(true) {
            auto ret = ws.read();
            Log("ret:", ret);              
            // Respond to heartbeat packet operations
                auto jsonRet = json::parse(ret);
                if(jsonRet["ping"].is_number()) {
                    json pong = R"({"pong" : 0})"_json;
                    pong["pong"] = jsonRet["ping"];
                    auto strPong = pong.dump();
                    Log("Respond to ping, send pong:", strPong, "#FF0000");
            } catch(exception &e) 
                Log("e:", e.what());
            LogStatus("Current time:", _D());

void onexit() {
    // ws.close();
    Log("Execute the ws.close() function");

Zugriff auf Huobi's WebSocket Tickerschnittstelle:

function getLogin(pAccessKey, pSecretKey, pPassphrase) {
    // Signature function for login
    var ts = (new Date().getTime() / 1000).toString()
    var login = {
        "op": "login",
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)   // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
    return login

var client_private = null 
function main() {
    // Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
    // Position channel subscription information
    var posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"

    var accessKey = "xxx"
    var secretKey = "xxx"
    var passphrase = "xxx"            

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)  // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
    if (client_private) {
        var lastPingTS = new Date().getTime()
        while (true) {
            var buf = client_private.read(-1)
            if (buf) {
            // Detect disconnection, reconnect
            if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
                Log("Disconnection detected, close connection, reconnect")
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
            // Send heartbeat packets
            var nowPingTS = new Date().getTime()
            if (nowPingTS - lastPingTS > 10 * 1000) {
                lastPingTS = nowPingTS

function onexit() {    
    var ret = client_private.close()
    Log("Close the connection!", ret)
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
    ts = str(time.time())
    login = {
        "op": "login",
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
    return login                 

client_private = None 
def main():
    global client_private
    posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"

    accessKey = "xxx"
    secretKey = "xxx"
    passphrase = "xxx"
    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
    if client_private:
        lastPingTS = time.time() * 1000
        while True:
            buf = client_private.read(-1)
            if buf:
            if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
                Log("Disconnection detected, close connection, reconnect")
                ret = client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
            nowPingTS = time.time() * 1000
            if nowPingTS - lastPingTS > 10 * 1000:
                lastPingTS = nowPingTS                

def onexit():
    ret = client_private.close()
    Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");                  

json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
    auto ts = std::to_string(Unix());
    json login = R"({
        "op": "login",
        "args": [{
            "apiKey": "",
            "passphrase": "",
            "timestamp": "",
            "sign": ""
    login["args"][0]["apiKey"] = pAccessKey;
    login["args"][0]["passphrase"] = pPassphrase;
    login["args"][0]["timestamp"] = ts;
    login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
    return login;

void main() {
    json posSubscribe = R"({
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
    auto accessKey = "xxx";
    auto secretKey = "xxx";
    auto passphrase = "xxx";
    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());

    if (client_private.Valid) {
        uint64_t lastPingTS = Unix() * 1000;                  

        while (true) {
            auto buf = client_private.read(-1);
            if (buf != "") {
            if (buf == "") {
                if (client_private.write(posSubscribe.dump()) == 0) {
                    Log("Disconnection detected, close connection, reconnect");
                    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
                    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
            uint64_t nowPingTS = Unix() * 1000;
            if (nowPingTS - lastPingTS > 10 * 1000) {
                lastPingTS = nowPingTS;

void onexit() {

Zugriff auf die WebSocket-Authentifizierungsoberfläche von OKX:

var client = null 
function main() {
    // client = Dial("sqlite3://:memory:")   // Using an in-memory database
    client = Dial("sqlite3://test1.db")      // Open/connect to the database file in the docker's directory
    // record handle
    var sqlite3Handle = client.fd()
    Log("sqlite3Handle:", sqlite3Handle)
    // Querying tables in the database
    var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")

function onexit() {
    Log("Execute client.close()")
// Not supported
// Not supported

Das Verbindungsobjekt, das bei der Verbindung zu einer Datenbank durch die Funktion Dial zurückgegeben wird, hat zwei Methodenfunktionen, die einzigartig sind: - Was ist los?exec(sqlString): Wird verwendet, um SQL-Anweisungen ähnlich wie dieDBExec()Funktion. - Was ist los?fd()Diefd()Funktion gibt einen Handler zurück (z. B. die Handlervariable ist Handler), der von anderen Threads zur Wiederverbindung verwendet wird (auch wenn das von Dial erstellte Objekt bereits durch die Ausführung desclose()Funktion, um die Verbindung zu schließen) durch dasDial()Funktion, zum Beispiel,Dial(handle)Wiederverwendungsanschluss. Das folgende ist ein Beispiel für die Dial-Funktion, die eine Verbindung zu einemsqlite3 database.

Einzelheiten deraddressParameter, getrennt durch die|Symbol nach der normalen Adresse:wss://ws.okx.com:8443/ws/v5/publicWenn es welche gibt.|Zeichen in der Parameterzeile, dann||Der Teil danach sind einige Funktionsparameter-Einstellungen, und jeder Parameter ist mit&Siehe auch:ss5Proxy- und Kompressionsparameter können wie folgt zusammengestellt werden:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")

Funktionen, die durch den Adressparameter der Dial-Funktion unterstützt werden Beschreibung der Parameter
Parameter im Zusammenhang mit der Datenkomprimierung des WebSocket-Protokolls: compress=Parameterwert compress ist die Komprimierungsmethode, die Komprimierungsparameteroptionen sind: gzip_raw, gzip usw. Wenn die gzip-Methode nicht gzip ist, können Sie die erweiterte Methode verwenden: gzip_raw
Parameter im Zusammenhang mit der Datenkomprimierung des WebSocket-Protokolls: Mode=Parameterwert Modus ist der Komprimierungsmodus, Modusparameter kann dual sein, send, recv. dual ist Zwei-Wege-Komprimierung, send komprimierte Daten, empfangen komprimierte Daten. send send komprimierte Daten. recv empfangen komprimierte Daten, lokale Dekomprimierung.
Das WebSocket-Protokoll legt die zugrunde liegenden Parameter für die automatische Wiederverbindung fest: reconnect=Parameterwert reconnect ist, ob reconnect gesetzt wird, reconnect=true ist, ob reconnect aktiviert wird.
Das WebSocket-Protokoll setzt die zugrunde liegenden Parameter für die automatische Wiederverbindung: interval=Parameterwert Intervall ist das Wiederversuchsintervall, in Millisekunden, interval=10000 ist das Wiederversuchsintervall von 10 Sekunden, der Standardwert ist 1 Sekunde, wenn es nicht eingestellt ist, d. h. interval=1000.
Das WebSocket-Protokoll legt die zugrunde liegenden Parameter für die automatische Wiederverbindung fest: Nutzlast=Parameterwert Payload ist die Abonnementnachricht, die gesendet werden muss, wenn die WebSocket wieder verbunden wird, z. B.: payload=okokok.
Parameter im Zusammenhang mit socks5 proxy: proxy=Parameterwert Proxy ist die SS5 Proxy-Einstellung, Parameterwertformat: socks5://name:pwd@, Name ist der SS5 Server Benutzername, pwd ist das SS5 Server Login Passwort, 1080 ist der SS5 Service-Port.

DieDial()Die Funktion wird nur für den Live-Handel unterstützt. Bei der Verbindung mit einer Datenbank mit der Dial-Funktion wird die Verbindungszeile mit Bezug auf das Go-Sprachtreiberprojekt für jede Datenbank geschrieben.

Unterstützte Datenbanken Antriebsprojekte Verbindungsstring Anmerkungen
Schnitt 3 github.com/mattn/go-sqlite3 sqlite3://Datei:test.db?Cache=shared&mode=Speicher Diesqlite3://Prefix zeigt an, dass eine sqlite3-Datenbank verwendet wird, Beispielruf:Dial("sqlite3://test1.db")
MySQL github.com/go-sql-driver/mysql Das ist ein sehr schwieriger Fall.
Nachwuchs github.com/lib/pq Postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432
Klickhaus github.com/ClickHouse/clickhouse-go Sie müssen sich an die Datenbank wenden, die Sie benötigen.

Bitte beachten Sie, daß diepayloadInhalte in deraddressParameter enthält Zeichen=oder andere Sonderzeichen, kann es die Parsierung deraddressParameter derDialFunktion, wie zum Beispiel im folgenden Beispiel.

Beispiel für den Aufruf der privaten Schnittstelle von backPack Exchange websocket:

var client = null

function main() {
    // Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
    var base64ApiKey = "xxx"

    var ts = String(new Date().getTime())
    var data = "instruction=subscribe&timestamp=" + ts + "&window=5000"

    // Since signEd25519 returns a base64 encoding, it contains the character "="
    var signature = signEd25519(data)
    // The payload may contain the character "=" after being encoded by JSON
    payload = {
        "method": "SUBSCRIBE",
        "params": ["account.orderUpdate"],
        "signature": [base64ApiKey, signature, ts, "5000"]

    client = Dial("wss://ws.backpack.exchange")
    if (!client) {
        Log("Connection failed, program exited")
    while (true) {
        var buf = client.read()      

function onexit() {

function signEd25519(data) {
    return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")

Der folgende Anruf im Code funktioniert:

client = Dial("wss://ws.backpack.exchange")

Wenn Sie es direkt inpayload, wird es nicht richtig funktionieren, zum Beispiel:

client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))

Derzeit unterstützt nur JavaScript die Verwendung dermqtt, nats, amqp, undkafkaDer JavaScript-Strategiekode wird als Beispiel verwendet, um die Verwendung der vier Protokolle zu zeigen:mqtt, nats, amqp, undkafka:

// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []

function main() {
    conn_nats = Dial("nats://admin@")
    conn_mqtt = Dial("mqtt://")
    conn_amqp = Dial("amqp://q:admin@")
    conn_kafka = Dial("kafka://localhost:9092/test_topic")
    arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
    arrName = ["nats", "amqp", "mqtt", "kafka"]

    while (true) {
        for (var i in arrConn) {
            var conn = arrConn[i]
            var name = arrName[i]

            // Write data
            conn.write(name + ", time: " + _D() + ", test msg.")
            // Read data
            var readMsg = conn.read(1000)
            Log(name + " readMsg: ", readMsg, "#FF0000")


function onexit() {
    for (var i in arrConn) {
        Log("close", arrName[i], "connect")

Ausführliche Dokumentation:Erforschung von FMZ: Praxis des Kommunikationsprotokolls zwischen Live-Handelsstrategien

