用于原始的Socket
访问,支持tcp
,udp
,tls
,unix
协议。 支持4种流行的通信协议:mqtt
、nats
、amqp
、kafka
。 支持连接数据库,支持的数据库有:sqlite3
、mysql
、postgres
、clickhouse
。
如果超时Dial()
函数返回空值。 正常调用时返回一个连接对象,该对象有三个方法:read
、write
、close
。read
方法用于读取数据,write
方法用于发送数据。 close
方法用于关闭连接。
- 不传参数时,阻塞到有消息时就返回。例如:```ws.read()```。
- 传入参数时,单位为毫秒,指定消息等待超时时间。例如:```ws.read(2000)```指定超时时间为两秒(2000毫秒)。
- 以下两个参数只对WebSocket有效:
传入参数```-1```指不管有无消息,函数立即返回,例如:```ws.read(-1)```。
传入参数```-2```指不管有无消息,函数立即返回,但只返回最新的消息,缓冲区的消息会被丢弃。例如```ws.read(-2)```。
```read()```函数缓冲区说明:
WebSocket协议推送的来的数据,如果在策略```read()```函数调用之间时间间隔过长,就可能造成数据累积。这些数据储存在缓冲区,缓冲区数据结构为队列,上限2000个。超出2000后最新的数据进入缓冲区,最旧的数据清除掉。
|场景|无参数|参数:-1|参数:-2|参数:2000,单位是毫秒|
| - | - | - | - | - |
|缓冲区已有数据|立即返回最旧数据|立即返回最旧数据|立即返回最新数据|立即返回最旧数据|
|缓冲区无数据|阻塞到有数据时返回|立即返回空值|立即返回空值|等待2000毫秒,无数据返回空值,有数据则返回|
|WebSocket连接断开或者底层重连时|read()函数返回空字符串,即:"",write()函数返回0,检测到该情况。可以使用close()函数关闭连接,如果设置了自动重连则不用关闭,系统底层会自动重连。||||
object
Dial(address)
Dial(address, timeout)
请求地址。
address
true
string
超时秒数,
timeout
false
number
```javascript
function main(){
// Dial支持tcp://,udp://,tls://,unix://协议,可加一个参数指定超时的秒数
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write可再跟一个数字参数指定超时,write返回成功发送的字节数
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read可再跟一个数字参数指定超时,单位:毫秒。返回null指出错或者超时或者socket已经关闭
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}
Dial函数调用例子:
function main() {
LogStatus("正在连接...")
// 访问币安的WebSocket接口
var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if (!client) {
Log("连接失败, 程序退出")
return
}
while (true) {
// read只返回调用read之后获取的数据
var buf = client.read()
if (!buf) {
break
}
var table = {
type: 'table',
title: '行情图表',
cols: ['币种', '最高', '最低', '买一', '卖一', '最后成交价', '成交量', '更新时间'],
rows: []
}
var obj = JSON.parse(buf)
_.each(obj, function(ticker) {
table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
})
LogStatus('`' + JSON.stringify(table) + '`')
}
client.close()
}
import json
def main():
LogStatus("正在连接...")
client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
if not client:
Log("连接失败, 程序退出")
return
while True:
buf = client.read()
if not buf:
break
table = {
"type" : "table",
"title" : "行情图表",
"cols" : ["币种", "最高", "最低", "买一", "卖一", "最后成交价", "成交量", "更新时间"],
"rows" : []
}
obj = json.loads(buf)
for i in range(len(obj)):
table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
LogStatus('`' + json.dumps(table) + '`')
client.close()
void main() {
LogStatus("正在连接...");
auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
if(!client.Valid) {
Log("连接失败, 程序退出");
return;
}
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
json table = R"({
"type" : "table",
"title" : "行情图表",
"cols" : ["币种", "最高", "最低", "买一", "卖一", "最后成交价", "成交量", "更新时间"],
"rows" : []
})"_json;
json obj = json::parse(buf);
for(auto& ele : obj.items()) {
table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
ele.value()["q"], _D(ele.value()["E"])});
}
LogStatus("`" + table.dump() + "`");
}
client.close();
}
访问币安(binance)的WebSocket行情接口:
var ws = null
function main(){
var param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
// 在调用Dial函数时,指定reconnect=true即设置为重连模式,指定payload即为重连时发送的消息。在WebSocket连接断开后,会自动重连,自动发送消息
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
var pingCyc = 1000 * 20
var lastPingTime = new Date().getTime()
while(true){
var nowTime = new Date().getTime()
var ret = ws.read()
Log("ret:", ret)
if(nowTime - lastPingTime > pingCyc){
var retPing = ws.write("ping")
lastPingTime = nowTime
Log("发送 :ping", "#FF0000")
}
LogStatus("当前时间:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("退出")
}
import json
import time
ws = None
def main():
global ws
param = {
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
}
ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
pingCyc = 1000 * 20
lastPingTime = time.time() * 1000
while True:
nowTime = time.time() * 1000
ret = ws.read()
Log("ret:", ret)
if nowTime - lastPingTime > pingCyc:
retPing = ws.write("ping")
lastPingTime = nowTime
Log("发送:ping", "#FF0000")
LogStatus("当前时间:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("退出")
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");
void main() {
json param = R"({
"op": "subscribe",
"args": [{
"channel": "tickers",
"instId": "BTC-USDT"
}]
})"_json;
objWS.write(param.dump());
if(objWS.Valid) {
uint64_t pingCyc = 1000 * 20;
uint64_t lastPingTime = Unix() * 1000;
while(true) {
uint64_t nowTime = Unix() * 1000;
auto ret = objWS.read();
Log("ret:", ret);
if(nowTime - lastPingTime > pingCyc) {
auto retPing = objWS.write("ping");
lastPingTime = nowTime;
Log("发送:ping", "#FF0000");
}
LogStatus("当前时间:", _D());
Sleep(1000);
}
}
}
void onexit() {
objWS.close();
Log("退出");
}
访问OKX的WebSocket行情接口:
var ws = null
function main(){
var param = {"sub": "market.btcusdt.detail", "id": "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload="+ JSON.stringify(param))
if(ws){
while(1){
var ret = ws.read()
Log("ret:", ret)
// 响应心跳包操作
try {
var jsonRet = JSON.parse(ret)
if(typeof(jsonRet.ping) == "number") {
var strPong = JSON.stringify({"pong" : jsonRet.ping})
ws.write(strPong)
Log("响应ping,发送pong:", strPong, "#FF0000")
}
} catch(e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus("当前时间:", _D())
Sleep(1000)
}
}
}
function onexit() {
ws.close()
Log("执行ws.close()函数")
}
import json
ws = None
def main():
global ws
param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
if ws:
while True:
ret = ws.read()
Log("ret:", ret)
# 响应心跳包操作
try:
jsonRet = json.loads(ret)
if "ping" in jsonRet and type(jsonRet["ping"]) == int:
strPong = json.dumps({"pong" : jsonRet["ping"]})
ws.write(strPong)
Log("响应ping,发送pong:", strPong, "#FF0000")
except Exception as e:
Log("e:", e)
LogStatus("当前时间:", _D())
Sleep(1000)
def onexit():
ws.close()
Log("执行ws.close()函数")
using namespace std;
void main() {
json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
if(ws.Valid) {
while(true) {
auto ret = ws.read();
Log("ret:", ret);
// 响应心跳包操作
try
{
auto jsonRet = json::parse(ret);
if(jsonRet["ping"].is_number()) {
json pong = R"({"pong" : 0})"_json;
pong["pong"] = jsonRet["ping"];
auto strPong = pong.dump();
ws.write(strPong);
Log("响应ping,发送pong:", strPong, "#FF0000");
}
} catch(exception &e)
{
Log("e:", e.what());
}
LogStatus("当前时间:", _D());
Sleep(1000);
}
}
}
void onexit() {
// ws.close();
Log("执行ws.close()函数");
}
访问火币的WebSocket行情接口:
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
// 签名函数,用于登录
var ts = (new Date().getTime() / 1000).toString()
var login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey) // exchange.HMAC已经废弃,暂时支持。请替换使用最新的exchange.Encode函数
}]
}
return login
}
var client_private = null
function main() {
// 因为read函数使用了超时设置,过滤超时的报错,否则会有冗余错误输出
SetErrorFilter("timeout")
// 持仓频道订阅信息
var posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
var accessKey = "xxx"
var secretKey = "xxx"
var passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000) // 登录时,不能立即订阅私有频道,需要等待服务器反应
client_private.write(JSON.stringify(posSubscribe))
if (client_private) {
var lastPingTS = new Date().getTime()
while (true) {
var buf = client_private.read(-1)
if (buf) {
Log(buf)
}
// 检测断开,重连
if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
Log("检测到断开,关闭连接,重连")
client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(JSON.stringify(posSubscribe))
}
// 发送心跳包
var nowPingTS = new Date().getTime()
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping")
lastPingTS = nowPingTS
}
}
}
}
function onexit() {
var ret = client_private.close()
Log("关闭连接!", ret)
}
import json
import time
def getLogin(pAccessKey, pSecretKey, pPassphrase):
ts = str(time.time())
login = {
"op": "login",
"args":[{
"apiKey" : pAccessKey,
"passphrase" : pPassphrase,
"timestamp" : ts,
"sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
}]
}
return login
client_private = None
def main():
global client_private
SetErrorFilter("timeout")
posSubscribe = {
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
}
accessKey = "xxx"
secretKey = "xxx"
passphrase = "xxx"
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
if client_private:
lastPingTS = time.time() * 1000
while True:
buf = client_private.read(-1)
if buf:
Log(buf)
if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
Log("检测到断开,关闭连接,重连")
ret = client_private.close()
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
Sleep(3000)
client_private.write(json.dumps(posSubscribe))
nowPingTS = time.time() * 1000
if nowPingTS - lastPingTS > 10 * 1000:
client_private.write("ping")
lastPingTS = nowPingTS
def onexit():
ret = client_private.close()
Log("关闭连接!", ret)
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
json getLogin(string pAccessKey, string pSecretKey, string pPassphrase) {
auto ts = std::to_string(Unix());
json login = R"({
"op": "login",
"args": [{
"apiKey": "",
"passphrase": "",
"timestamp": "",
"sign": ""
}]
})"_json;
login["args"][0]["apiKey"] = pAccessKey;
login["args"][0]["passphrase"] = pPassphrase;
login["args"][0]["timestamp"] = ts;
login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
return login;
}
void main() {
SetErrorFilter("timeout");
json posSubscribe = R"({
"op": "subscribe",
"args": [{
"channel": "positions",
"instType": "ANY"
}]
})"_json;
auto accessKey = "xxx";
auto secretKey = "xxx";
auto passphrase = "xxx";
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
if (client_private.Valid) {
uint64_t lastPingTS = Unix() * 1000;
while (true) {
auto buf = client_private.read(-1);
if (buf != "") {
Log(buf);
}
if (buf == "") {
if (client_private.write(posSubscribe.dump()) == 0) {
Log("检测到断开,关闭连接,重连");
client_private.close();
client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
Sleep(3000);
client_private.write(posSubscribe.dump());
}
}
uint64_t nowPingTS = Unix() * 1000;
if (nowPingTS - lastPingTS > 10 * 1000) {
client_private.write("ping");
lastPingTS = nowPingTS;
}
}
}
}
void onexit() {
client_private.close();
Log("退出");
}
访问OKX的WebSocket验证接口:
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // 使用内存数据库
client = Dial("sqlite3://test1.db") // 打开/连接托管者所在目录的数据库文件
// 记录句柄
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// 查询数据库中的表
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("执行client.close()")
client.close()
}
// 不支持
// 不支持
Dial函数连接数据库时返回的连接对象有2个其独有的方法函数:
- exec(sqlString)
: 用于执行SQL语句,使用方式类似于DBExec()
函数。
- fd()
: fd()
函数返回一个句柄(例如:句柄变量为handle),用于其它线程重连(即使Dial创建的对象已经被执行close()
函数关闭连接),将句柄传入Dial()
函数,例如:Dial(handle)
重用连接。
以下是Dial函数连接sqlite3
数据库的例子。
|Dial函数的address参数支持的功能|参数说明|
| - | - |
|WebSocket协议数据压缩相关的参数:compress=参数值|compress为压缩方式,compress参数可选gzip_raw、gzip等。如果gzip方式非标准gzip,可以使用扩展的方式:gzip_raw|
|WebSocket协议数据压缩相关的参数:mode=参数值|mode为压缩模式,mode参数可选dual,send,recv三种。dual为双向压缩,发送压缩数据,接收压缩数据。send为发送压缩数据。recv为接收压缩数据,本地解压缩。|
|WebSocket协议设置底层自动重连相关的参数:reconnect=参数值|reconnect为是否设置重连,reconnect=true为启用重连。不设置该参数时默认不重连。|
|WebSocket协议设置底层自动重连相关的参数:interval=参数值|interval为重试时间间隔,单位毫秒,interval=10000为重试间隔10秒,不设置默认1秒,即interval=1000。|
|WebSocket协议设置底层自动重连相关的参数:payload=参数值|payload为WebSocket重连时需要发送的订阅消息,例如:payload=okok。|
|socks5代理的相关参数:proxy=参数值|proxy为ss5代理设置,参数值格式:socks5://name:pwd@192.168.0.1:1080,name为ss5服务端用户名,pwd为ss5服务端登录密码,1080为ss5服务的端口。|
```Dial()```函数仅支持实盘。
使用Dial函数连接数据库时,编写的连接字符串参考各数据库的go语言驱动项目。
| 支持的数据库 | 驱动项目 | 连接字符串(Connection String) | 备注 |
| - | - | - | - |
| sqlite3 | github.com/mattn/go-sqlite3 | sqlite3://file:test.db?cache=shared&mode=memory | ```sqlite3://```前缀表示使用的是sqlite3数据库,调用例子:```Dial("sqlite3://test1.db")``` |
| mysql | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4 | -- |
| postgres | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432 | -- |
| clickhouse | github.com/ClickHouse/clickhouse-go | clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase | -- |
需要注意当```address```参数中设置的```payload```内容中有字符```=```或者其它特殊字符时,可能影响```Dial```函数的```address```参数解析,例如以下例子。
backPack交易所websocket私有接口调用范例:
```js
var client = null
function main() {
// base64编码的秘钥对公钥,即在FMZ上配置的access key
var base64ApiKey = "xxx"
var ts = String(new Date().getTime())
var data = "instruction=subscribe×tamp=" + ts + "&window=5000"
// 由于signEd25519最终返回的是base64编码,其中会有字符"="
var signature = signEd25519(data)
// payload 被JSON编码后可能包含字符"="
payload = {
"method": "SUBSCRIBE",
"params": ["account.orderUpdate"],
"signature": [base64ApiKey, signature, ts, "5000"]
}
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
if (!client) {
Log("连接失败, 程序退出")
return
}
while (true) {
var buf = client.read()
Log(buf)
}
}
function onexit() {
client.close()
}
function signEd25519(data) {
return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}
代码中写成以下调用方式是可以正常工作的:
client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))
如果直接写在payload
中则无法正常工作,例如:
client = Dial("wss://ws.backpack.exchange|payload=" + JSON.stringify(payload))
目前仅JavaScript语言支持Dial函数中使用mqtt
、nats
、amqp
、kafka
通信协议,以JavaScript语言策略代码为例展示mqtt
、nats
、amqp
、kafka
四种协议使用例子:
// 需要先配置、部署完成各个协议的代理服务器
// 为了便于演示,主题test_topic的订阅(read操作)、发布(write操作)都在当前这个策略中进行
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// 写数据
conn.write(name + ", time: " + _D() + ", test msg.")
// 读数据
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("关闭", arrName[i], "连接")
}
}
详细介绍文档参考:探索FMZ:交易策略实盘间通信协议实践