Для примитивныхSocket
Доступ, поддержкаtcp
, udp
, tls
, unix
Поддержка 4 популярных протоколов связи:mqtt
, nats
, amqp
, kafka
Поддержка подключения к базам данных:sqlite3
, mysql
, postgres
, clickhouse
.
ВDial()
Обычный вызов возвращает объект соединения, который имеет три метода:read
, write
иclose
.read
Для считывания данных используетсяwrite
С помощью этого метода передаются данные иclose
метод используется для закрытия соединения.
Вread
метод поддерживает следующие параметры:
ws.read()
.ws.read(2000)
указывает время задержки в две секунды (2000 миллисекунд).-1
означает, что функция возвращает сообщения немедленно, независимо от наличия или отсутствия сообщений, например:ws.read(-1)
- Да.
Передача параметра-2
означает, что функция возвращает сразу с сообщением или без него, но возвращается только последнее сообщение, а буферизированное сообщение отбрасывается.ws.read(-2)
.read()
описание буфера функции:
Входящие данные, отправляемые протоколом WebSocket, могут вызывать накопление данных, если временной интервал между стратегиейread()
Эти данные хранятся в буфере, который имеет структуру данных очереди с максимальным числом 2000.
Сценарий | Нет параметра | Параметр: -1 | Параметр: -2 | Параметр: 2000, в миллисекундах |
---|---|---|---|---|
Данные уже в буфере | Немедленно верните старые данные | Немедленно верните старые данные | Немедленно верните последние данные | Немедленно верните старые данные |
Нет данных в буфере | Возвращение при блокировке данных | Немедленно верните нуль | Немедленно верните нуль | Подождите 2000 мс, верните null, если нет данных, верните null, если есть данные |
Соединение WebSocket отключено или повторно подключено базовым | read() функция возвращает пустую строку, т.е.: |
объект
Выбрать адрес Назови (адрес, время выхода)
Адрес запроса. Адрес неправда строка секунды таймаута, Тайм-аут ложное Номер
function main(){
// Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
Пример вызова функции Dial:
function main() {
LogStatus("Connecting...")
// Accessing WebSocket interface of Binance
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
// read returns only the data retrieved after the read call
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: 'Ticker Chart',
cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("Connecting...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("Connection failed, program exited")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "Ticker Chart",
"cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("Connecting...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("Connection failed, program exited");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "Ticker Chart",
"cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
Для доступа к интерфейсу WebSocket Binance:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send : ping", "#FF0000")
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("exit")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send: ping", "#FF0000")
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("Send: ping", "#FF0000");
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("exit");
}
Доступ к интерфейсу OKX
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// Respond to heartbeat packet operations
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("Execute the ws.close() function")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# Respond to heartbeat packet operations
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("Execute the ws.close() function")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// Respond to heartbeat packet operations
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("Respond to ping, send pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("Execute the ws.close() function");
}
Доступ к интерфейсу Huobi's WebSocket:
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// Signature function for login
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
}]
}
return login
}
var client_private = null
function main() {
// Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
SetErrorFilter("timeout")
// Position channel subscription information
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// Detect disconnection, reconnect
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("Disconnection detected, close connection, reconnect")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// Send heartbeat packets
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("Close the connection!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("Disconnection detected, close connection, reconnect")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("Disconnection detected, close connection, reconnect");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("exit");
}
Для доступа к интерфейсу аутентификации WebSocket OKX
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // Using an in-memory database
client = Dial("sqlite3://test1.db") // Open/connect to the database file in the docker's directory
// record handle
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// Querying tables in the database
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("Execute client.close()")
client.close()
}
// Not supported
// Not supported
Объект соединения, возвращаемый функцией Dial при подключении к базе данных, имеет две уникальные для него функции метода:
exec(sqlString)
: Используется для выполнения SQL заявлений таким же образом, какDBExec()
function.fd()
:fd()
функция возвращает ручку (например, переменная ручка является рукой), которая будет использоваться другими потоками для повторного подключения (даже если объект, созданный Dial, уже был закрыт при исполненииclose()
с функцией закрытия соединения) путем прохождения ручки вDial()
функция, например,Dial(handle)
Подключение для повторного использования.
Ниже приведен пример функции Dial, соединяющейsqlite3
database.Подробная информацияaddress
параметр, разделенный|
символ после обычного адреса:wss://ws.okx.com:8443/ws/v5/public
Если есть.|
символы в строке параметров, затем||
Часть после этого - некоторые параметры параметров функции, и каждый параметр связан с&
Например,ss5
параметры прокси и сжатия могут быть установлены вместе следующим образом:Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")
Функции, поддерживаемые параметром адреса функции Dial | Описание параметров |
---|---|
Параметры, связанные с сжатием данных протокола WebSocket: compress=value parameter | compress - это метод сжатия, параметры сжатия: gzip_raw, gzip и т. д. Если метод gzip не является стандартным gzip, вы можете использовать расширенный метод: gzip_raw |
Параметры, относящиеся к сжатию данных протокола WebSocket: mode=value parameter | режим - это режим сжатия, параметр режима может быть двойным, отправка, recv. dual - это двустороннее сжатие, отправка сжатых данных, прием сжатых данных. отправка - это отправка сжатых данных. recv - это прием сжатых данных, локальная декомпрессия. |
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: reconnect=значение параметра | reconnect означает установить reconnect, reconnect=true означает включить reconnect. По умолчанию reconnect отсутствует, если этот параметр не установлен. |
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: interval=value parameter | интервал - это интервал повторных попыток, в миллисекундах, интервал=10000 - это интервал повторных попыток 10 секунд, по умолчанию 1 секунда, когда он не установлен, то есть интервал=1000. |
Протокол WebSocket устанавливает базовые параметры, связанные с автоматическим воссоединением: полезная нагрузка=значение параметра | полезная нагрузка - это сообщение подписки, которое необходимо отправить при повторном подключении WebSocket, например: полезная нагрузка=okokok. |
Параметры, относящиеся к носкам5 прокси: proxy=значение параметра | Прокси - это настройка прокси ss5, формат значения параметра: socks5://name:pwd@192.168.0.1:1080, имя - это имя пользователя сервера ss5, pwd - это пароль входа на сервер ss5, 1080 - это порт службы ss5. |
ВDial()
Функция поддерживается только для торговли в реальном времени.
При подключении к базе данных с помощью функции Dial строка подключения записывается со ссылкой на проект драйвера языка go для каждой базы данных.
Базы данных поддерживаются | Движущие проекты | Соединительная строка | Примечания |
---|---|---|---|
Склайт3 | github.com/mattn/go-sqlite3 | sqlite3://файл:test.db?cache=shared&mode=memory | Вsqlite3:// Префикс указывает, что используется база данных sqlite3, пример вызова:Dial("sqlite3://test1.db") |
mysql | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4 | – |
послерост | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432 | – |
Кликхаус | github.com/ClickHouse/clickhouse-go | clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase | – |
Пожалуйста, обратите внимание, что когдаpayload
содержание, установленное вaddress
параметр содержит символы=
или других специальных символов, это может повлиять на анализaddress
параметрDial
функция, например следующий пример.
Пример вызова частного интерфейса websocket backPack Exchange:
var client = null
function main() {
// Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// Since signEd25519 returns a base64 encoding, it contains the character "="
var signature = signEd25519(data)
// The payload may contain the character "=" after being encoded by JSON
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
Следующий вызов в коде работает нормально:
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
Если вы напишете это прямо вpayload
, он будет работать неправильно, например:
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
В настоящее время только JavaScript поддерживает использованиеmqtt
, nats
, amqp
, иkafka
Код стратегии языка JavaScript используется в качестве примера, чтобы показать использование четырех протоколов:mqtt
, nats
, amqp
, иkafka
:
// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// Write data
conn.write(name + ", time: " + _D() + ", test msg.")
// Read data
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("close", arrName[i], "connect")
}
}
Подробная документация:Исследование FMZ: Практика протокола связи между стратегиями торговли в режиме реального времени
GetMeta HttpQuery