原始語Socket
アクセス,サポートtcp
, udp
, tls
, unix
4つの一般的な通信プロトコルをサポートします.mqtt
, nats
, amqp
, kafka
データベースに接続するサポート:sqlite3
, mysql
, postgres
, clickhouse
.
についてDial()
標準呼び出しでは,接続オブジェクトが3つのメソッドを持つ.read
, write
そしてclose
.....read
データを読み取るために使用されます.write
データを送信するために使用されます.close
接続を閉めるために使用されます.
についてread
この方法は次のパラメータをサポートします.
ws.read()
.ws.read(2000)
2秒 (2000ミリ秒) のタイムアウトを指定する.-1
機能がメッセージの有無に関わらず即座に返信することを意味します.例えば:ws.read(-1)
- わかった
パラメータを転送する-2
つまり,この関数はメッセージを含むか,ないか,すぐに返信しますが,最新のメッセージのみが返信され,バッファ化されたメッセージは捨てられます.例えば,ws.read(-2)
.read()
機能バッファの記述:
WebSocket プロトコルによって押された受信データは,戦略の間の時間間隔がread()
函数呼び出しが長すぎる.これらのデータはバッファーに格納され,そのデータ構造は最大2000のキューである.2000を超えると,最新のデータがバッファーに入力され,最も古いデータは削除される.
シナリオ | パラメーターなし | パラメータ: -1 | パラメーター: -2 | パラメータ: 2000 ミリ秒 |
---|---|---|---|---|
すでにバッファにあるデータ | 最古のデータをすぐに返します. | 最古のデータをすぐに返します. | 最新のデータをすぐに返します | 最古のデータをすぐに返します. |
バッファーにデータがない | データにブロックされたときに戻す | すぐに null を返します | すぐに null を返します | 2000 ms を待って,データがない場合は null を返します.データがある場合は null を返します. |
WebSocket 接続は,底辺の接続によって切断または再接続されます. | read() 機能は空の文字列を返します.つまり: |
オブジェクト
ダイヤル (住所) ダイヤル (住所,タイムアウト)
アドレスを要求します アドレス 本当 文字列 タイムアウト秒 タイムアウト 偽り 番号
function main(){
// Dial supports tcp://,udp://,tls://,unix://protocol, you can add a parameter to specify the number of seconds for the timeout
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write can be followed by a numeric parameter to specify the timeout, write returns the number of bytes successfully sent
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read can be followed by a numeric parameter specifying the timeout in milliseconds. Returning null indicates an error or timeout or that the socket has been closed
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
ダイヤル関数呼び出しの例:
function main() {
LogStatus("Connecting...")
// Accessing WebSocket interface of Binance
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
// read returns only the data retrieved after the read call
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: 'Ticker Chart',
cols: ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("Connecting...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("Connection failed, program exited")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "Ticker Chart",
"cols" : ['Currency', 'Highest', 'Lowest', 'Buy 1', 'Sell 1', 'Last traded price', 'Volume', 'Update time'],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("Connecting...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("Connection failed, program exited");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "Ticker Chart",
"cols" : ["Currency", "Highest", "Lowest", "Buy 1", "Sell 1", "Last traded price", "Volume", "Update time"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
バイナンス WebSocket ticker インターフェースにアクセスするには:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// When calling Dial function, specify reconnect=true to set reconnection mode and payload to be the message sent when reconnecting. When the WebSocket connection is disconnected, it will reconnect and send messages automatically.
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send : ping", "#FF0000")
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("exit")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("Send: ping", "#FF0000")
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("exit")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("Send: ping", "#FF0000");
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("exit");
}
OKX
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// Respond to heartbeat packet operations
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("Current time:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("Execute the ws.close() function")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# Respond to heartbeat packet operations
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("Respond to ping, send pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("Current time:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("Execute the ws.close() function")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// Respond to heartbeat packet operations
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("Respond to ping, send pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("Current time:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("Execute the ws.close() function");
}
Huobiの WebSocket ティッカーインターフェイスへのアクセス:
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// Signature function for login
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC has been deprecated and is temporarily supported. Please use the latest exchange.Encode function instead.
}]
}
return login
}
var client_private = null
function main() {
// Because the read function uses a timeout setting, filtering the timeout reports errors that would otherwise be output with redundant errors
SetErrorFilter("timeout")
// Position channel subscription information
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // When logging in, you cannot subscribe to private channels immediately, you need to wait for server response
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// Detect disconnection, reconnect
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("Disconnection detected, close connection, reconnect")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// Send heartbeat packets
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("Close the connection!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("Disconnection detected, close connection, reconnect")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("Close the connection!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("Disconnection detected, close connection, reconnect");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("exit");
}
OKX
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // Using an in-memory database
client = Dial("sqlite3://test1.db") // Open/connect to the database file in the docker's directory
// record handle
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// Querying tables in the database
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("Execute client.close()")
client.close()
}
// Not supported
// Not supported
データベースに接続する際に Dial 関数によって返される接続 オブジェクトには 2 つの独自のメソッド 関数があります.
exec(sqlString)
: SQL 文を SQL 文と類似した方法で実行するために使用されます.DBExec()
function.fd()
についてfd()
この関数は,他のスレッドが再接続するために使用するハンドル (例えば,ハンドル変数はハンドル) を返します (Dial によって作成されたオブジェクトが既に実行によって閉じられた場合でも).close()
接続を閉じる機能) を使って,ハンドルをハンドルにDial()
機能などDial(handle)
再利用接続
ダイヤル関数と接続する例ですsqlite3
database.詳細についてaddress
パラメータ,|
通常の住所の後ろの記号:wss://ws.okx.com:8443/ws/v5/public
もしそうなら|
パラメータ文字列の文字列,その後||
機能のパラメータの設定です. それぞれのパラメータは,&
文字は,例えば,ss5
プロキシと圧縮パラメータは次のように組み合わせることができます.Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")
ダイヤル機能のアドレスパラメータによってサポートされる機能 | パラメータ説明 |
---|---|
WebSocket プロトコルのデータ圧縮に関連するパラメータ: compress=パラメータ値 | gzip は標準 gzip でない場合は,拡張メソッド: gzip_raw を使用できます. |
WebSocket プロトコルデータ圧縮に関連するパラメータ: mode=パラメータ値 | モードは圧縮モードで,モードパラメータは二重で,送信,recv. dualは二方向圧縮,圧縮データを送信,圧縮データ受信.送信は圧縮データを送信する.recvは圧縮データを受け取る,ローカルデコンプレッション. |
WebSocket プロトコルは,元となる自動再接続関連パラメータを設定します: reconnect=パラメータ値 | reconnect は reconnect を設定するかどうか, reconnect=true は reconnect を有効にするかどうかです.このパラメータが設定されていない場合,デフォルトでは reconnect はありません. |
WebSocket プロトコルは,基礎となる自動再接続関連パラメータを設定します: interval=パラメータ値 | intervalは再試行間隔,ミリ秒で,interval=10000は再試行間隔10秒,デフォルトは1秒で,設定されていない場合,つまりinterval=1000です. |
WebSocket プロトコルは,基礎となる自動再接続関連パラメータを設定します: payload=パラメータ値 | payload は WebSocket が再接続されたときに送信されるサブスクリプションメッセージです.例えば: payload=okokok. |
靴下5のパラメータに関するプロキシ:プロキシ=パラメータ値 | プロキシは ss5 プロキシ設定です パラメータ値形式: socks5://name:pwd@192.168.0.1:1080名前はSS5サーバーのユーザー名 pwdはSS5サーバーのログインパスワード 1080はSS5サービスポートです |
についてDial()
この機能はライブ取引のみに対応しています
ダイヤル関数を使用してデータベースに接続するときは,接続文字列は各データベースのgo言語ドライバープロジェクトを参照して記述されます.
サポートされているデータベース | 推進プロジェクト | 接続文字列 | コメント |
---|---|---|---|
スクライト3 | github.com/mattn/go-sqlite3 | sqlite3://ファイル:test.db?キャッシュ=共有&モード=メモリ | についてsqlite3:// 前項は sqlite3 データベースが使用されていることを示します.Dial("sqlite3://test1.db") |
mysql | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp (あなたのパスワード@tcp)localhost:3306) /あなたのデータベース?チャルセット=utf8mb4 | – |
産後 | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword ホスト=ローカルホスト ポート=5432 | – |
クリックハウス | github.com/ClickHouse/clickhouse-go | クリックハウス://tcp://host:9000?ユーザー名=ユーザー名&パスワード=あなたのパスワード&データベース=youdatabase | – |
確認してください.payload
設定された内容address
パラメータには文字が含まれています.=
解析に影響を与える可能性があります.address
パラメータDial
この例のようなものです
backPack Exchange websocket のプライベートインターフェースの呼び出し例:
var client = null
function main() {
// Base64-encoded public key of the key pair, i.e. the access key configured on FMZ
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// Since signEd25519 returns a base64 encoding, it contains the character "="
var signature = signEd25519(data)
// The payload may contain the character "=" after being encoded by JSON
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("Connection failed, program exited")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
次の呼び出しはうまく動作します.
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
文字を書き込むとpayload
機能しない場合,例えば
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
JavaScriptだけが JavaScript の使用をサポートしています.mqtt
, nats
, amqp
そしてkafka
ダイヤル関数における通信プロトコル. JavaScript 言語戦略コードは,4つのプロトコルの使用を示す例として使用されます.mqtt
, nats
, amqp
そしてkafka
:
// We need to configure and deploy proxy servers for each protocol first.
// For the sake of demonstration, the subscription (read operation) and publishing (write operation) of the topic test_topic are all performed in the current strategy.
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// Write data
conn.write(name + ", time: " + _D() + ", test msg.")
// Read data
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("close", arrName[i], "connect")
}
}
詳細なドキュメント参照:FMZを調査する: ライブ取引戦略間の通信プロトコルの実践
GetMeta を取得する HttpQuery をインストールする