资源加载中... loading...

Dial

For primitive Socket access, supporting tcp, udp, tls, unix protocols. Support 4 popular communication protocols: mqtt, nats, amqp, kafka. Support for connecting to databases: sqlite3, mysql, postgres, clickhouse.

The Dial() function returns null if it times out. A normal call returns a connection object that has three methods: read, write and close. The read method is used to read data, the write method is used to send data and the close method is used to close the connection. The read method supports the following parameters:

  • When no parameters are passed, it blocks until a message is available and returns, such as ws.read().
  • When passed in as a parameter, the unit is milliseconds, specifying the message wait timeout period. For example: ws.read(2000) specifies a timeout of two seconds (2000 milliseconds).
  • The following two parameters are valid only for WebSocket: Passing the parameter -1 means that the function returns immediately, regardless of the presence or absence of messages, for example: ws.read(-1). Passing the parameter -2 means that the function returns immediately with or without a message, but only the latest message is returned, and the buffered message is discarded. For example, ws.read(-2).

read() function buffer description: The incoming data pushed by the WebSocket protocol may cause data accumulation if the time interval between strategy read() function calls is too long. These data are stored in the buffer, which has a data structure of a queue with a maximum of 2000. After 2000 is exceeded, the newest data enters the buffer and the oldest data is cleared out.

Scenario No parameter Parameter: -1 Parameter: -2 Parameter: 2000, in milliseconds
Data already in the buffer Return oldest data immediately Return oldest data immediately Return latest data immediately Return oldest data immediately
No data in the buffer Return when blocked to data Return null immediately Return null immediately Wait 2000 ms, return null if no data, return null if there is data
WebSocket connection is disconnected or reconnected by the underlying read() function returns the empty string, i.e.: “”, and write() function returns 0. The situation is detected. You can close the connection using the close() function, or if you have set up automatic reconnection, you don’t need to close it, the system underlying will reconnect it automatically.

object

Dial(address) Dial(address, timeout)

Request address. address true string timeout seconds, timeout false number

function main(){
    // Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
    var client = Dial("tls://www.baidu.com:443")  
    if (client) {
        // write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while (true) {
            // read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
            var buf = client.read()
            if (!buf) {
                 break
            }
            Log(buf)
        }
        client.close()
    }
}
def main():
    client = Dial("tls://www.baidu.com:443")
    if client:
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while True:
            buf = client.read()
            if not buf:
                break
            Log(buf)
        client.close()
void main() {
    auto client = Dial("tls://www.baidu.com:443");
    if(client.Valid) {
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
        while(true) {
            auto buf = client.read();
            if(buf == "") {
                break;
            }
            Log(buf);
        }
        client.close();
    }
}

Example of a Dial function call:

function main() {
    LogStatus("Connecting...")
    // Accessing WebSocket interface of Binance
    var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        // read returns only the data retrieved after the read call
        var buf = client.read()      
        if (!buf) {
            break
        }
        var table = {
            type: 'table',
            title: 'Ticker Chart',
            cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
            rows: []
        }
        var obj = JSON.parse(buf)
        _.each(obj, function(ticker) {
            table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
        })
        LogStatus('`' + JSON.stringify(table) + '`')
    }
    client.close()
}
import json
def main():
    LogStatus("Connecting...")
    client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if not client:
        Log("Connection failed, program exited")
        return 
    
    while True:
        buf = client.read()
        if not buf:
            break
        table = {
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'], 
            "rows" : [] 
        }
        obj = json.loads(buf)
        for i in range(len(obj)):
            table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
        LogStatus('`' + json.dumps(table) + '`')
    client.close()
void main() {
    LogStatus("Connecting...");
    auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
    if(!client.Valid) {
        Log("Connection failed, program exited");
        return;
    }
    
    while(true) {
        auto buf = client.read();
        if(buf == "") {
            break;
        }
        json table = R"({
            "type" : "table", 
            "title" : "Ticker Chart", 
            "cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"], 
            "rows" : []
        })"_json;
        json obj = json::parse(buf);
        for(auto& ele : obj.items()) {
            table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"], 
                ele.value()["q"], _D(ele.value()["E"])});
        }
        LogStatus("`" + table.dump() + "`");
    }
    client.close();
}

To access the WebSocket ticker interface of Binance:

var ws = null 
function main(){
    var param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    // When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        var pingCyc = 1000 * 20
        var lastPingTime = new Date().getTime()
        while(true){
            var nowTime = new Date().getTime()
            var ret = ws.read()
            Log("ret:", ret)
            if(nowTime - lastPingTime > pingCyc){
                var retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send : ping", "#FF0000")
            }
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("exit")
}
import json
import time              

ws = None
def main():
    global ws 
    param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        pingCyc = 1000 * 20
        lastPingTime = time.time() * 1000
        while True:
            nowTime = time.time() * 1000
            ret = ws.read()
            Log("ret:", ret)
            if nowTime - lastPingTime > pingCyc:
                retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Send: ping", "#FF0000")
            LogStatus("Current time:", _D())
            Sleep(1000)              

def onexit():
    ws.close()
    Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");              

void main() {
    json param = R"({
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    })"_json;
    
    objWS.write(param.dump());
    if(objWS.Valid) {
        uint64_t pingCyc = 1000 * 20;
        uint64_t lastPingTime = Unix() * 1000;
        while(true) {
            uint64_t nowTime = Unix() * 1000;
            auto ret = objWS.read();
            Log("ret:", ret);
            if(nowTime - lastPingTime > pingCyc) {
                auto retPing = objWS.write("ping");
                lastPingTime = nowTime;
                Log("Send: ping", "#FF0000");
            }
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    objWS.close();
    Log("exit");
}

Access to OKX’s WebSocket ticker interface:

var ws = null               

function main(){
    var param = {"sub": "market.btcusdt.detail", "id": "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
    if(ws){
        while(1){
            var ret = ws.read()
            Log("ret:", ret)
            // Respond to heartbeat packet operations
            try {
                var jsonRet = JSON.parse(ret)
                if(typeof(jsonRet.ping) == "number") {
                    var strPong = JSON.stringify({"pong" : jsonRet.ping})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
                }
            } catch(e) {
                Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
            }
            
            LogStatus("Current time:", _D())
            Sleep(1000)
        }
    }
}              

function onexit() {
    ws.close() 
    Log("Execute the ws.close() function")
}
import json
ws = None              

def main():
    global ws
    param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        while True:
            ret = ws.read()
            Log("ret:", ret)              
            # Respond to heartbeat packet operations
            try:
                jsonRet = json.loads(ret)
                if "ping" in jsonRet and type(jsonRet["ping"]) == int:
                    strPong = json.dumps({"pong" : jsonRet["ping"]})
                    ws.write(strPong)
                    Log("Respond to ping, send pong:", strPong, "#FF0000")
            except Exception as e:
                Log("e:", e)
                
            LogStatus("Current time:", _D())
            Sleep(1000)
    
def onexit():
    ws.close()
    Log("Execute the ws.close() function")  
using namespace std;
void main() {
    json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
    auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
    if(ws.Valid) {
        while(true) {
            auto ret = ws.read();
            Log("ret:", ret);              
            // Respond to heartbeat packet operations
            try 
            {
                auto jsonRet = json::parse(ret);
                if(jsonRet["ping"].is_number()) {
                    json pong = R"({"pong" : 0})"_json;
                    pong["pong"] = jsonRet["ping"];
                    auto strPong = pong.dump();
                    ws.write(strPong);
                    Log("Respond to ping, send pong:", strPong, "#FF0000");
                }
            } catch(exception &e) 
            {
                Log("e:", e.what());
            }
            
            LogStatus("Current time:", _D());
            Sleep(1000);
        }
    }
}              

void onexit() {
    // ws.close();
    Log("Execute the ws.close() function");
}

Access to Huobi’s WebSocket ticker interface:

function getLogin(pAccessKey, pSecretKey, pPassphrase) {
    // Signature function for login
    var ts = (new Date().getTime() / 1000).toString()
    var login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)   // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
        }]
    }    
    return login
}                

var client_private = null 
function main() {
    // Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
    SetErrorFilter("timeout")
    
    // Position channel subscription information
    var posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                

    var accessKey = "xxx"
    var secretKey = "xxx"
    var passphrase = "xxx"            

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)  // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
    client_private.write(JSON.stringify(posSubscribe))
    if (client_private) {
        var lastPingTS = new Date().getTime()
        while (true) {
            var buf = client_private.read(-1)
            if (buf) {
                Log(buf)
            }
            
            // Detect disconnection, reconnect
            if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
                Log("Disconnection detected, close connection, reconnect")
                client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(JSON.stringify(posSubscribe))
            }
            
            // Send heartbeat packets
            var nowPingTS = new Date().getTime()
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping")
                lastPingTS = nowPingTS
            }            
        }        
    }
}                

function onexit() {    
    var ret = client_private.close()
    Log("Close the connection!", ret)
}
import json
import time
  
def getLogin(pAccessKey, pSecretKey, pPassphrase):
    ts = str(time.time())
    login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
        }]
    }
    return login                 

client_private = None 
def main():
    global client_private
    SetErrorFilter("timeout")
    
    posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }                  

    accessKey = "xxx"
    secretKey = "xxx"
    passphrase = "xxx"
    
    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)
    client_private.write(json.dumps(posSubscribe))
    if client_private:
        lastPingTS = time.time() * 1000
        while True:
            buf = client_private.read(-1)
            if buf:
                Log(buf)
            
            if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
                Log("Disconnection detected, close connection, reconnect")
                ret = client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(json.dumps(posSubscribe))
            
            nowPingTS = time.time() * 1000
            if nowPingTS - lastPingTS > 10 * 1000:
                client_private.write("ping")
                lastPingTS = nowPingTS                

def onexit():
    ret = client_private.close()
    Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");                  

json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
    auto ts = std::to_string(Unix());
    json login = R"({
        "op": "login",
        "args": [{
            "apiKey": "",
            "passphrase": "",
            "timestamp": "",
            "sign": ""
        }]
    })"_json;
    login["args"][0]["apiKey"] = pAccessKey;
    login["args"][0]["passphrase"] = pPassphrase;
    login["args"][0]["timestamp"] = ts;
    login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
    return login;
}                  

void main() {
    SetErrorFilter("timeout");
    json posSubscribe = R"({
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    })"_json;
    
    auto accessKey = "xxx";
    auto secretKey = "xxx";
    auto passphrase = "xxx";
    
    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
    Sleep(3000);
    client_private.write(posSubscribe.dump());                

    if (client_private.Valid) {
        uint64_t lastPingTS = Unix() * 1000;                  

        while (true) {
            auto buf = client_private.read(-1);
            if (buf != "") {
                Log(buf);
            }
            if (buf == "") {
                if (client_private.write(posSubscribe.dump()) == 0) {
                    Log("Disconnection detected, close connection, reconnect");
                    client_private.close();
                    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
                    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
                    Sleep(3000);
                    client_private.write(posSubscribe.dump());
                }
            }
            
            uint64_t nowPingTS = Unix() * 1000;
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping");
                lastPingTS = nowPingTS;
            }
        }
    }
}                  

void onexit() {
    client_private.close();
    Log("exit");
}

To access OKX’s WebSocket authentication interface:

var client = null 
function main() {
    // client = Dial("sqlite3://:memory:")   // Using an in-memory database
    client = Dial("sqlite3://test1.db")      // Open/connect to the database file in the docker's directory
    
    // record handle
    var sqlite3Handle = client.fd()
    Log("sqlite3Handle:", sqlite3Handle)
    
    // Querying tables in the database
    var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
    Log(ret)
}

function onexit() {
    Log("Execute client.close()")
    client.close()
}
// Not supported
// Not supported

The connection object returned by the Dial function when connecting to a database has two method functions that are unique to it:

  • exec(sqlString): Used to execute SQL statements in a manner similar to the DBExec() function.
  • fd(): The fd() function returns a handle (e.g., the handle variable is handle) to be used by other threads to reconnect (even if the object created by Dial has already been closed by the execution of the close() function to close the connection) by passing the handle into the Dial() function, for example, Dial(handle) reuse connection. The following is an example of the Dial function connecting to a sqlite3 database.

Details of the address parameter, separated by the | symbol after the normal address: wss://ws.okx.com:8443/ws/v5/public. If there are | characters in the parameter string, then || is used as the separator sign. The part after that is some function parameter settings, and each parameter is connected with & characters. For example, the ss5 proxy and compression parameters can be set together as follows: Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")

Functions supported by the address parameter of the Dial function Parameter description
Parameters related to WebSocket protocol data compression: compress=parameter value compress is the compression method, compress parameter options are: gzip_raw, gzip, etc. If the gzip method is not standard gzip, you can use the extended method: gzip_raw
Parameters related to WebSocket protocol data compression: mode=parameter value mode is the compression mode, mode parameter can be dual, send, recv. dual is two-way compression, send compressed data, receive compressed data. send is send compressed data. recv is receive compressed data, local decompression.
WebSocket protocol sets the underlying auto-reconnect related parameters: reconnect=parameter value reconnect is whether to set reconnect, reconnect=true is to enable reconnect. The default is no reconnect when this parameter is not set.
WebSocket protocol sets the underlying auto-reconnect related parameters: interval=parameter value interval is the retry interval, in milliseconds, interval=10000 is the retry interval of 10 seconds, the default is 1 second when it’s not set, that is, interval=1000.
WebSocket protocol sets the underlying auto-reconnect related parameters: payload=parameter value payload is the subscription message that needs to be sent when the WebSocket is reconnected, e.g.: payload=okokok.
Parameters related to socks5 proxy: proxy=parameter value proxy is ss5 proxy setting, parameter value format: socks5://name:pwd@192.168.0.1:1080, name is ss5 server username, pwd is ss5 server login password, 1080 is ss5 service port.

The Dial() function is only supported for live trading. When connecting to a database using the Dial function, the connection string is written with reference to the go language driver project for each database.

Databases supported Driving projects Connection string Remarks
sqlite3 github.com/mattn/go-sqlite3 sqlite3://file:test.db?cache=shared&mode=memory The sqlite3:// prefix indicates that a sqlite3 database is being used, example call: Dial("sqlite3://test1.db")
mysql github.com/go-sql-driver/mysql mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4
postgres github.com/lib/pq postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432
clickhouse github.com/ClickHouse/clickhouse-go clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase

Please note that when the payload content set in the address parameter contains characters = or other special characters, it may affect the parsing of the address parameter of the Dial function, such as the following example.

backPack Exchange websocket private interface call example:

var client = null

function main() {
    // Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
    var base64ApiKey = "xxx"

    var ts = String(new Date().getTime())
    var data = "instruction=subscribe&timestamp=" + ts + "&window=5000"

    // Since signEd25519 returns a base64 encoding, it contains the character "="
    var signature = signEd25519(data)
    
    // The payload may contain the character "=" after being encoded by JSON
    payload = {
        "method": "SUBSCRIBE",
        "params": ["account.orderUpdate"],
        "signature": [base64ApiKey, signature, ts, "5000"]
    }

    client = Dial("wss://ws.backpack.exchange")
    client.write(JSON.stringify(payload))
    if (!client) {
        Log("Connection failed, program exited")
        return
    }
    
    while (true) {
        var buf = client.read()      
        Log(buf)
    }    
}

function onexit() {
    client.close()
}

function signEd25519(data) {
    return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}

The following call in the code works fine:

client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))

If you write it directly in payload, it will not work properly, for example:

client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))

Currently, only JavaScript supports the use of the mqtt, nats, amqp, and kafka communication protocols in the Dial function. The JavaScript language strategy code is used as an example to show the use of the four protocols: mqtt, nats, amqp, and kafka:

// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []

function main() {
    LogReset(1)
    conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
    conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
    conn_kafka = Dial("kafka://localhost:9092/test_topic")
    arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
    arrName = ["nats", "amqp", "mqtt", "kafka"]

    while (true) {
        for (var i in arrConn) {
            var conn = arrConn[i]
            var name = arrName[i]

            // Write data
            conn.write(name + ", time: " + _D() + ", test msg.")
            
            // Read data
            var readMsg = conn.read(1000)
            Log(name + " readMsg: ", readMsg, "#FF0000")
        }

        Sleep(1000)
    }
}

function onexit() {
    for (var i in arrConn) {
        arrConn[i].close()
        Log("close", arrName[i], "connect")
    }
}

Detailed documentation reference: Exploring FMZ: Practice of Communication Protocol Between Live Trading Strategies

GetMeta HttpQuery