Para el primitivoSocket
acceso, apoyotcp
, udp
, tls
, unix
Soporte para 4 protocolos de comunicación populares:mqtt
, nats
, amqp
, kafka
Apoyo para conectar a bases de datos:sqlite3
, mysql
, postgres
, clickhouse
.
ElDial()
Una llamada normal devuelve un objeto de conexión que tiene tres métodos:read
, write
yclose
El.read
El método se utiliza para leer los datos, elwrite
El método se utiliza para enviar datos y elclose
El método se utiliza para cerrar la conexión.
Elread
el método admite los siguientes parámetros:
ws.read()
.ws.read(2000)
especifica un tiempo de espera de dos segundos (2000 milisegundos).-1
significa que la función devuelve inmediatamente, independientemente de la presencia o ausencia de mensajes, por ejemplo:ws.read(-1)
- ¿ Por qué?
Pasando el parámetro-2
significa que la función devuelve inmediatamente con o sin un mensaje, pero sólo el último mensaje se devuelve, y el mensaje tamponado se descarta.ws.read(-2)
.read()
Descripción del búfer de función:
Los datos entrantes empujados por el protocolo WebSocket pueden causar acumulación de datos si el intervalo de tiempo entre la estrategiaread()
Estos datos se almacenan en el búfer, que tiene una estructura de datos de una cola con un máximo de 2000.
Escenario | No hay parámetro | Parámetro: -1 | Parámetro: -2 | Parámetro: 2000, en milisegundos |
---|---|---|---|---|
Datos ya en el búfer | Devuelva los datos más antiguos inmediatamente | Devuelva los datos más antiguos inmediatamente | Devuelva los últimos datos inmediatamente. | Devuelva los datos más antiguos inmediatamente |
No hay datos en el búfer | Regresar cuando se bloquea a los datos | Devuelva nulo inmediatamente | Devuelva nulo inmediatamente | Espera 2000 ms, devuelve nulo si no hay datos, devuelve nulo si hay datos |
La conexión WebSocket se desconecta o se vuelve a conectar por el subyacente | read() devuelve la cadena vacía, es decir: |
objetos
Número de teléfono y dirección Número de teléfono (dirección, tiempo límite)
Solicita la dirección. Dirección verdadero la cuerda segundos de tiempo de espera, tiempo de espera falsos Número
function main(){
// Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
Ejemplo de llamada de la función Dial:
function main() {
LogStatus("Connecting...")
// Accessing WebSocket interface of Binance
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
// read returns only the data retrieved after the read call
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: 'Ticker Chart',
cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("Connecting...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("Connection failed, program exited")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "Ticker Chart",
"cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("Connecting...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("Connection failed, program exited");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "Ticker Chart",
"cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
Para acceder a la interfaz WebSocket de Binance:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send : ping", "#FF0000")
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("exit")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send: ping", "#FF0000")
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("Send: ping", "#FF0000");
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("exit");
}
Acceso a la interfaz del ticker de WebSocket de OKX
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// Respond to heartbeat packet operations
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("Execute the ws.close() function")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# Respond to heartbeat packet operations
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("Execute the ws.close() function")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// Respond to heartbeat packet operations
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("Respond to ping, send pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("Execute the ws.close() function");
}
Acceso a la interfaz de ticker de WebSocket de Huobi:
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// Signature function for login
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
}]
}
return login
}
var client_private = null
function main() {
// Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
SetErrorFilter("timeout")
// Position channel subscription information
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// Detect disconnection, reconnect
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("Disconnection detected, close connection, reconnect")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// Send heartbeat packets
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("Close the connection!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("Disconnection detected, close connection, reconnect")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("Disconnection detected, close connection, reconnect");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("exit");
}
Para acceder a la interfaz de autenticación de WebSocket de OKX
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // Using an in-memory database
client = Dial("sqlite3://test1.db") // Open/connect to the database file in the docker's directory
// record handle
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// Querying tables in the database
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("Execute client.close()")
client.close()
}
// Not supported
// Not supported
El objeto de conexión devuelto por la función Dial al conectarse a una base de datos tiene dos funciones de método que son únicas para él:
exec(sqlString)
: Se utiliza para ejecutar instrucciones SQL de una manera similar a laDBExec()
function.fd()
Elfd()
función devuelve un mango (por ejemplo, la variable mango es mango) para ser utilizado por otros hilos para reconectar (incluso si el objeto creado por Dial ya ha sido cerrado por la ejecución de laclose()
La función de cierre de la conexión) mediante el paso de la manija en elDial()
función, por ejemplo,Dial(handle)
Conexión de reutilización.
El siguiente es un ejemplo de la función Dial que se conecta a unsqlite3
database.Detalles de laaddress
Parámetro, separado por el|
símbolo después de la dirección normal:wss://ws.okx.com:8443/ws/v5/public
Si hay|
caracteres en la cadena de parámetros, entonces||
La parte después de eso es algunas configuraciones de parámetros de función, y cada parámetro está conectado con&
Por ejemplo, elss5
Los parámetros de sustitución y compresión se pueden establecer de la siguiente manera:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")
Funciones soportadas por el parámetro de dirección de la función Dial | Descripción del parámetro |
---|---|
Parámetros relacionados con la compresión de datos del protocolo WebSocket: compress=valor del parámetro | comprimir es el método de compresión, las opciones de parámetros comprimir son: gzip_raw, gzip, etc. Si el método gzip no es gzip estándar, se puede utilizar el método extendido: gzip_raw |
Parámetros relacionados con la compresión de datos del protocolo WebSocket: modo=valor del parámetro | el modo es el modo de compresión, el parámetro de modo puede ser dual, enviar, recv. dual es compresión bidireccional, enviar datos comprimidos, recibir datos comprimidos. enviar es enviar datos comprimidos. recv es recibir datos comprimidos, descompresión local. |
El protocolo WebSocket establece los parámetros relacionados con la auto-reconexión subyacente: reconnect=valor del parámetro | reconnect es si se debe configurar reconnect, reconnect=true es para habilitar reconnect. |
El protocolo WebSocket establece los parámetros subyacentes relacionados con la reconexión automática: intervalo=valor del parámetro | el intervalo es el intervalo de reintentos, en milisegundos, el intervalo=10000 es el intervalo de reintentos de 10 segundos, el valor predeterminado es 1 segundo cuando no está establecido, es decir, el intervalo=1000. |
El protocolo WebSocket establece los parámetros subyacentes relacionados con la reconexión automática: carga útil=valor del parámetro | carga útil es el mensaje de suscripción que debe enviarse cuando se vuelve a conectar el WebSocket, por ejemplo: carga útil=okokok. |
Parámetros relacionados con los calcetines5 proxy: proxy=valor del parámetro | Proxy es el ajuste de proxy ss5, formato de valor del parámetro: socks5://name:pwd@192.168.0.1:1080, el nombre es el nombre de usuario del servidor ss5, pwd es la contraseña de inicio de sesión del servidor ss5, 1080 es el puerto de servicio ss5. |
ElDial()
La función solo es compatible con el comercio en vivo.
Al conectarse a una base de datos utilizando la función Dial, la cadena de conexión se escribe con referencia al proyecto de controlador de lenguaje go para cada base de datos.
Bases de datos soportadas | Proyectos impulsores | Cuadrícula de conexión | Las observaciones |
---|---|---|---|
el mismo | github.com/mattn/go-sqlite3 | sqlite3://file:test.db?cache=compartido y modo=memoria | Elsqlite3:// Prefijo indica que se está utilizando una base de datos sqlite3, ejemplo de llamada:Dial("sqlite3://test1.db") |
¿ Qué quieres decir con eso? | github.com/go-sql-driver/mysql | mysql://nombre de usuario:su contraseña@tcp(localhost:3306) / tu base de datos? | – |
las plantas herbáceas | github.com/lib/pq | el nombre de la base de datos sslmode=disable password=yourpassword host=localhost port=5432 | – |
La casa de click | github.com/ClickHouse/clickhouse-go | clickhouse://tcp://host:9000?nombre de usuario=nombre de usuario& contraseña=tu contraseña& base de datos=youdatabase | – |
Por favor, tenga en cuenta que cuando elpayload
contenido establecido en eladdress
Parámetro contiene caracteres=
o otros caracteres especiales, puede afectar al análisis de laaddress
Parámetro delDial
Función, como el siguiente ejemplo.
Ejemplo de llamada de interfaz privada del websocket de backPack Exchange:
var client = null
function main() {
// Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// Since signEd25519 returns a base64 encoding, it contains the character "="
var signature = signEd25519(data)
// The payload may contain the character "=" after being encoded by JSON
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
La siguiente llamada en el código funciona bien:
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
Si lo escribe directamente enpayload
, no funcionará correctamente, por ejemplo:
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
Actualmente, sólo JavaScript admite el uso de lamqtt
, nats
, amqp
, ykafka
El código de estrategia del lenguaje JavaScript se utiliza como ejemplo para mostrar el uso de los cuatro protocolos:mqtt
, nats
, amqp
, ykafka
:
// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// Write data
conn.write(name + ", time: " + _D() + ", test msg.")
// Read data
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("close", arrName[i], "connect")
}
}
Referencia de la documentación detallada:Explorando FMZ: Práctica del protocolo de comunicación entre las estrategias de negociación en vivo
ObtenerMeta HttpQuery (cuestionario de búsqueda)