Pour le primitifSocket
l'accès, le soutientcp
, udp
, tls
, unix
Prise en charge de 4 protocoles de communication populaires:mqtt
, nats
, amqp
, kafka
. Prise en charge de la connexion aux bases de données:sqlite3
, mysql
, postgres
, clickhouse
.
LeDial()
une fonction renvoie null s'il s'éteint. Un appel normal renvoie un objet de connexion qui a trois méthodes:read
, write
etclose
Leread
La méthode est utilisée pour lire les données,write
La méthode est utilisée pour envoyer des données etclose
méthode est utilisée pour fermer la connexion.
Leread
la méthode prend en charge les paramètres suivants:
ws.read()
.ws.read(2000)
spécifie un temps d'arrêt de deux secondes (2000 millisecondes).-1
signifie que la fonction renvoie immédiatement, indépendamment de la présence ou de l'absence de messages, par exemple:ws.read(-1)
- Je ne sais pas.
Passer le paramètre-2
signifie que la fonction renvoie immédiatement avec ou sans message, mais que seul le dernier message est renvoyé, et le message tamponné est écarté.ws.read(-2)
.read()
Description du tampon de fonction:
Les données entrantes poussées par le protocole WebSocket peuvent entraîner une accumulation de données si l'intervalle de temps entre la stratégieread()
Ces données sont stockées dans le tampon, qui a une structure de données d'une file d'attente avec un maximum de 2000.
Scénario | Aucun paramètre | Paramètre: -1 | Paramètre: -2 | Paramètre: 2000, en millisecondes |
---|---|---|---|---|
Données déjà dans le tampon | Retournez les données les plus anciennes immédiatement | Retournez les données les plus anciennes immédiatement | Retournez les dernières données immédiatement | Retournez les données les plus anciennes immédiatement |
Aucune donnée dans le tampon | Retourner lorsque les données sont bloquées | Retourner nul immédiatement | Retourner nul immédiatement | Attendre 2000 ms, retourner nul si aucune donnée, retourner nul si des données sont disponibles |
La connexion WebSocket est déconnectée ou reconnectée par le sous-jacent | read() fonction renvoie la chaîne vide, c.-à-d.: |
objet
Numéro de téléphone (adresse) Numéro (adresse, délai d'attente)
Demandez une adresse. l'adresse vrai chaîne secondes de temps d'arrêt, temps d'arrêt faux Numéro
function main(){
// Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
Exemple d'appel à la fonction Dial:
function main() {
LogStatus("Connecting...")
// Accessing WebSocket interface of Binance
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
// read returns only the data retrieved after the read call
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: 'Ticker Chart',
cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("Connecting...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("Connection failed, program exited")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "Ticker Chart",
"cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("Connecting...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("Connection failed, program exited");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "Ticker Chart",
"cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
Pour accéder à l'interface WebSocket de Binance:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send : ping", "#FF0000")
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("exit")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send: ping", "#FF0000")
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("Send: ping", "#FF0000");
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("exit");
}
Accès à l'interface du ticker WebSocket OKX
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// Respond to heartbeat packet operations
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("Execute the ws.close() function")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# Respond to heartbeat packet operations
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("Execute the ws.close() function")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// Respond to heartbeat packet operations
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("Respond to ping, send pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("Execute the ws.close() function");
}
Accès à l'interface du ticker WebSocket de Huobi:
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// Signature function for login
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
}]
}
return login
}
var client_private = null
function main() {
// Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
SetErrorFilter("timeout")
// Position channel subscription information
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// Detect disconnection, reconnect
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("Disconnection detected, close connection, reconnect")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// Send heartbeat packets
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("Close the connection!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("Disconnection detected, close connection, reconnect")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("Disconnection detected, close connection, reconnect");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("exit");
}
Pour accéder à l'interface d'authentification WebSocket OKX
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // Using an in-memory database
client = Dial("sqlite3://test1.db") // Open/connect to the database file in the docker's directory
// record handle
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// Querying tables in the database
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("Execute client.close()")
client.close()
}
// Not supported
// Not supported
L'objet de connexion renvoyé par la fonction Dial lors de la connexion à une base de données possède deux fonctions de méthode qui lui sont propres:
exec(sqlString)
: Utilisé pour exécuter des instructions SQL de manière similaire à laDBExec()
function.fd()
Lefd()
fonction renvoie une poignée (par exemple, la variable poignée est poignée) à utiliser par d'autres threads pour se reconnecter (même si l'objet créé par Dial a déjà été fermé par l'exécution duclose()
La fonction de fermeture de la connexion) en passant la poignée dans leDial()
la fonction, par exemple,Dial(handle)
connexion de réutilisation.
Ce qui suit est un exemple de la fonction Dial se connectant à unsqlite3
database.Détails de laaddress
paramètre, séparé par le|
le symbole suivant l'adresse normale:wss://ws.okx.com:8443/ws/v5/public
S' il y en a|
les caractères dans la chaîne de paramètres, puis||
La partie après que ce sont quelques paramètres de paramètres de fonction, et chaque paramètre est connecté avec&
Il s'agit par exemple dess5
Les paramètres de substitution et de compression peuvent être définis ensemble comme suit:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")
Fonctions prises en charge par le paramètre d'adresse de la fonction Dial | Description du paramètre |
---|---|
Paramètres liés à la compression des données du protocole WebSocket: compress=valeur du paramètre | compresser est la méthode de compression, les options de paramètres compresser sont: gzip_raw, gzip, etc. Si la méthode gzip n'est pas gzip standard, vous pouvez utiliser la méthode étendue: gzip_raw |
Paramètres liés à la compression des données du protocole WebSocket: mode=valeur du paramètre | mode est le mode de compression, le paramètre de mode peut être double, envoyer, recv. double est la compression bidirectionnelle, envoyer des données compressées, recevoir des données compressées. envoyer est envoyer des données compressées. recv est recevoir des données compressées, décompression locale. |
Le protocole WebSocket définit les paramètres sous-jacents liés à la reconnexion automatique: reconnect=valeur du paramètre | reconnect est la définition de reconnect, reconnect=true est la définition de reconnect. |
Le protocole WebSocket définit les paramètres sous-jacents liés à la reconnexion automatique: interval=valeur du paramètre | l'intervalle est l'intervalle de réessayage, en millisecondes, l'intervalle=10000 est l'intervalle de réessayage de 10 secondes, la valeur par défaut est de 1 seconde lorsqu'il n'est pas réglé, c'est-à-dire l'intervalle=1000. |
Le protocole WebSocket définit les paramètres sous-jacents liés à la reconnexion automatique: charge utile=valeur du paramètre | payload est le message d'abonnement qui doit être envoyé lorsque le WebSocket est reconnecté, par exemple: payload=okokok. |
Paramètres liés aux chaussettes5 proxy: proxy=valeur du paramètre | Proxy est le paramètre de paramètre de configuration de proxy ss5: socks5://name:pwd@192.168.0.1:1080, le nom est le nom d'utilisateur du serveur ss5, pwd est le mot de passe de connexion du serveur ss5, 1080 est le port de service ss5. |
LeDial()
La fonction n'est prise en charge que pour le trading en direct.
Lors de la connexion à une base de données à l'aide de la fonction Dial, la chaîne de connexion est écrite en référence au projet de pilote de langage go pour chaque base de données.
Base de données prise en charge | Projets moteurs | Chaîne de connexion | Les commentaires |
---|---|---|---|
- Je ne sais pas. | github.com/mattn/go-sqlite3 | sqlite3://fichier:test.db?cache=partagé et mode=mémoire | Lesqlite3:// le préfixe indique qu'une base de données sqlite3 est utilisée, exemple d'appel:Dial("sqlite3://test1.db") |
Je vais essayer. | github.com/go-sql-driver/mysql | Je suis désolée.localhost:3306) /votre base de données?charset=utf8mb4 | – |
les produits de postgres | github.com/lib/pq | le nom de l'utilisateur et le nom de la base de données sslmode | – |
maison de jeu | github.com/ClickHouse/clickhouse-go | Cliquez sur le lien suivant: | – |
Veuillez noter que lorsque lepayload
contenus définis dans leaddress
paramètre contient des caractères=
ou d'autres caractères spéciaux, il peut affecter l'analyse de laaddress
paramètre duDial
fonction, comme dans l'exemple suivant.
Exemple d'appel à l'interface privée de backPack Exchange:
var client = null
function main() {
// Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// Since signEd25519 returns a base64 encoding, it contains the character "="
var signature = signEd25519(data)
// The payload may contain the character "=" after being encoded by JSON
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
L'appel suivant dans le code fonctionne bien:
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
Si vous l'écrivez directement danspayload
, il ne fonctionnera pas correctement, par exemple:
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
À l'heure actuelle, seul JavaScript prend en charge l'utilisation demqtt
, nats
, amqp
, etkafka
Le code de stratégie du langage JavaScript est utilisé comme exemple pour montrer l'utilisation des quatre protocoles:mqtt
, nats
, amqp
, etkafka
:
// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// Write data
conn.write(name + ", time: " + _D() + ", test msg.")
// Read data
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("close", arrName[i], "connect")
}
}
Référence de la documentation détaillée:Exploration de FMZ: pratique du protocole de communication entre les stratégies de négociation en direct
Obtenez Meta HttpQuery est une requête