Untuk primitifSocket
akses, mendukungtcp
, udp
, tls
, unix
Mendukung 4 protokol komunikasi populer:mqtt
, nats
, amqp
, kafka
. Dukungan untuk menghubungkan ke database:sqlite3
, mysql
, postgres
, clickhouse
.
PeraturanDial()
fungsi mengembalikan null jika waktu habis. panggilan normal mengembalikan objek koneksi yang memiliki tiga metode:read
, write
danclose
.read
metode digunakan untuk membaca data,write
metode digunakan untuk mengirim data danclose
metode digunakan untuk menutup koneksi.
Peraturanread
Metode ini mendukung parameter berikut:
ws.read()
.ws.read(2000)
menentukan timeout dua detik (2000 millisecond).-1
berarti bahwa fungsi mengembalikan langsung, terlepas dari kehadiran atau tidak adanya pesan, misalnya:ws.read(-1)
Aku tidak tahu.
Mengirim parameter-2
berarti bahwa fungsi mengembalikan segera dengan atau tanpa pesan, tetapi hanya pesan terbaru yang dikembalikan, dan pesan yang disimpen dibuang.ws.read(-2)
.read()
deskripsi buffer fungsi:
Data masuk yang didorong oleh protokol WebSocket dapat menyebabkan akumulasi data jika interval waktu antara strategiread()
Data ini disimpan di buffer, yang memiliki struktur data antrian dengan maksimum 2000. Setelah 2000 dilampaui, data terbaru dimasukkan ke buffer dan data tertua dihapus.
Skenario | Tidak ada parameter | Parameter: -1 | Parameter: -2 | Parameter: 2000, dalam milidetik |
---|---|---|---|---|
Data sudah di buffer | Kembalikan data tertua segera | Kembalikan data tertua segera | Kembali data terbaru segera | Kembalikan data tertua segera |
Tidak ada data di buffer | Kembali saat diblokir ke data | Kembali null segera | Kembali null segera | Tunggu 2000 ms, kembali null jika tidak ada data, kembali null jika ada data |
Koneksi WebSocket terputus atau dihubungkan kembali oleh yang mendasarinya | read() function mengembalikan string kosong, yaitu: |
objek
Pilih (Alamat) Pilih (alamat, waktu)
Alamat permintaan. alamat benar string detik timeout, timeout palsu Nomor
function main(){
// Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
Contoh panggilan fungsi Dial:
function main() {
LogStatus("Connecting...")
// Accessing WebSocket interface of Binance
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
// read returns only the data retrieved after the read call
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: 'Ticker Chart',
cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("Connecting...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("Connection failed, program exited")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "Ticker Chart",
"cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("Connecting...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("Connection failed, program exited");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "Ticker Chart",
"cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
Untuk mengakses antarmuka ticker WebSocket Binance:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send : ping", "#FF0000")
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("exit")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send: ping", "#FF0000")
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("Send: ping", "#FF0000");
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("exit");
}
Akses ke antarmuka ticker WebSocket OKX
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// Respond to heartbeat packet operations
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("Execute the ws.close() function")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# Respond to heartbeat packet operations
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("Execute the ws.close() function")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// Respond to heartbeat packet operations
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("Respond to ping, send pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("Execute the ws.close() function");
}
Akses ke antarmuka ticker WebSocket Huobi
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// Signature function for login
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
}]
}
return login
}
var client_private = null
function main() {
// Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
SetErrorFilter("timeout")
// Position channel subscription information
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// Detect disconnection, reconnect
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("Disconnection detected, close connection, reconnect")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// Send heartbeat packets
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("Close the connection!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("Disconnection detected, close connection, reconnect")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("Disconnection detected, close connection, reconnect");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("exit");
}
Untuk mengakses OKX
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // Using an in-memory database
client = Dial("sqlite3://test1.db") // Open/connect to the database file in the docker's directory
// record handle
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// Querying tables in the database
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("Execute client.close()")
client.close()
}
// Not supported
// Not supported
Objek koneksi yang dikembalikan oleh fungsi Dial saat terhubung ke database memiliki dua fungsi metode yang unik untuknya:
exec(sqlString)
: Digunakan untuk mengeksekusi pernyataan SQL dengan cara yang mirip denganDBExec()
function.fd()
: Thefd()
fungsi mengembalikan pegangan (misalnya, variabel pegangan adalah pegangan) yang akan digunakan oleh thread lain untuk terhubung kembali (bahkan jika objek yang dibuat oleh Dial telah ditutup oleh pelaksanaanclose()
fungsi untuk menutup koneksi) dengan melewati pegangan keDial()
fungsi, misalnya,Dial(handle)
Gunakan kembali koneksi.
Berikut ini adalah contoh dari fungsi Dial menghubungkan kesqlite3
database.Rincian dariaddress
parameter, dipisahkan oleh|
simbol setelah alamat normal:wss://ws.okx.com:8443/ws/v5/public
Jika ada.|
karakter dalam string parameter, kemudian||
Bagian setelah itu adalah beberapa pengaturan parameter fungsi, dan setiap parameter terhubung dengan&
karakter. Misalnya,ss5
Parameter proxy dan kompresi dapat disatukan sebagai berikut:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")
Fungsi yang didukung oleh parameter alamat dari fungsi Dial | Deskripsi parameter |
---|---|
Parameter yang terkait dengan kompresi data protokol WebSocket: compress=parameter value | compress adalah metode kompresi, pilihan parameter compress adalah: gzip_raw, gzip, dll. Jika metode gzip tidak standar gzip, Anda dapat menggunakan metode diperpanjang: gzip_raw |
Parameter yang terkait dengan kompresi data protokol WebSocket: mode=nilai parameter | mode adalah mode kompresi, parameter mode dapat dual, send, recv. dual adalah kompresi dua arah, mengirim data terkompresi, menerima data terkompresi. send adalah mengirim data terkompresi. recv adalah menerima data terkompresi, dekompresi lokal. |
Protokol WebSocket menetapkan parameter terkait auto-reconnect yang mendasari: reconnect=parameter value | reconnect adalah apakah untuk mengatur reconnect, reconnect=true adalah untuk mengaktifkan reconnect. |
Protokol WebSocket menetapkan parameter terkait auto-reconnect yang mendasari: interval=nilai parameter | interval adalah interval percobaan ulang, dalam milidetik, interval=10000 adalah interval percobaan ulang 10 detik, default adalah 1 detik ketika tidak diatur, yaitu interval=1000. |
Protokol WebSocket menetapkan parameter terkait auto-reconnect yang mendasari: payload=parameter value | payload adalah pesan langganan yang perlu dikirim saat WebSocket dihubungkan kembali, misalnya: payload=okokok. |
Parameter yang terkait dengan kaus kaki5 proxy: proxy=nilai parameter | Proxy adalah pengaturan proxy ss5, format nilai parameter: socks5://name:pwd@192.168.0.1:1080, nama adalah nama pengguna server ss5, pwd adalah password login server ss5, 1080 adalah port layanan ss5. |
PeraturanDial()
Fungsi hanya didukung untuk perdagangan langsung.
Saat terhubung ke database menggunakan fungsi Dial, string koneksi ditulis dengan referensi ke proyek driver bahasa go untuk setiap database.
Basis data yang didukung | Proyek Penggerak | Senar koneksi | Pengamatan |
---|---|---|---|
Sqlite3 | github.com/mattn/go-sqlite3 | sqlite3://file:test.db?cache=shared&mode=memory | Peraturansqlite3:// Prefiks menunjukkan bahwa database sqlite3 sedang digunakan, contoh panggilan:Dial("sqlite3://test1.db") |
MySQL | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp(localhost:3306) /database Anda?charset=utf8mb4 | – |
Tumbuh | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432 | – |
rumah klik | github.com/ClickHouse/clickhouse-go | klikhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase | – |
Harap dicatat bahwa ketikapayload
konten yang ditetapkan dalamaddress
parameter berisi karakter=
atau karakter khusus lainnya, dapat mempengaruhi analisis dariaddress
parameter dariDial
fungsi, seperti contoh berikut.
contoh panggilan backPack Exchange websocket private interface:
var client = null
function main() {
// Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// Since signEd25519 returns a base64 encoding, it contains the character "="
var signature = signEd25519(data)
// The payload may contain the character "=" after being encoded by JSON
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
Panggilan berikut dalam kode bekerja dengan baik:
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
Jika Anda menulisnya langsung dipayload
, tidak akan berfungsi dengan baik, misalnya:
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
Saat ini, hanya JavaScript mendukung penggunaanmqtt
, nats
, amqp
, dankafka
kode strategi bahasa JavaScript digunakan sebagai contoh untuk menunjukkan penggunaan empat protokol:mqtt
, nats
, amqp
, dankafka
:
// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// Write data
conn.write(name + ", time: " + _D() + ", test msg.")
// Read data
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("close", arrName[i], "connect")
}
}
Referensi dokumentasi rinci:Menjelajahi FMZ: Praktik Protokol Komunikasi Antara Strategi Perdagangan Langsung
GetMeta HttpQuery