This is a WebSocket Marketplace template developed by FMZ, which can be copied and used as a template by ticking this template in the new policy:https://www.fmz.com/strategy/470349
The current FMZ strategy is mainly traditional REST API wrapping, whereby a network connection is established at each API access step to obtain market data through consultation. This method is simple and easy to use, and for most needs, this is quite sufficient.
However, the REST protocol has an inherent delay problem, which is amplified when multiple trading pairs, multi-exchange strategies are needed. Although the Go function of the platform can be executed in parallel, the delay problem still exists, which is difficult to meet the needs of a relatively high frequency strategy trading, and too many trades, too fast routing frequency and also encounter access frequency limitations of the trading platform.
Currently, the server load of the exchanges is also heavy, and they all provide a complete WebSocket protocol and recommend API users to use. Compared to the REST protocol, WebSocket provides a durable two-way connection that allows the exchange to push data to the client in real time, avoiding frequent requests and responses, greatly reducing the latency. Generally, if the REST API access delay is around 20 ms, WebSocket's push data delay is around 2 ms.
FMZ's quantitative trading platform has been supporting the WebSocket protocol since early days and is relatively easy to call, but for newcomers, it is too complex to handle multiple subscriptions, subscribe to multiple exchanges, and efficiently and easily embed into the entire strategy process. This open WebSocket Real-time Market Data Accelerator template solves this problem by being very easy to use, fully compatible with the current wrapped API calls, and for most of the original REST policy, a simple change directly to use to speed up your policy.
The main features:
Note that this policy uses TypeScript, which may seem a bit unfamiliar if you're just familiar with JavaScript. TypeScript introduces a type system and richer language features based on JavaScript, for applications that require processing complex logic such as quantitative transactions. Using TypeScript can reduce potential errors and improve code readability and maintainability.
The policy also uses the asynchronous mechanism of the FMZ platform, whereby a subthread can send a message to the main thread via the __threadPostMessage function. This is asynchronous and is used to notify the main thread of data updates generated in the subthread. Data can be shared between the main thread and the subthread via the __threadGetData and __threadSetData functions. This allows the thread to access and modify the shared state.
The main principle of this strategy is to connect mainstream digital currency exchanges via WebSocket to receive real-time market data (such as in-depth information and transaction information) in order to provide data support for quantitative transaction decisions.
1. WebSocket connection settings
setupWebsocket
The function is used to initialize WebSocket connections, receiving market data. It receives a parametermain_exchanges
The exchange is also known as the "Connected Exchange".
MyDial
Function: Create WebSocket connections, record connection times, and output shutdown times when connections are closed.updateSymbols
Function: Check regularly for new subscription requests and update the list of current transaction pairs as needed.2. Processing of data
supports
Objects define supported exchanges and their processing functions (e.g.Binance
Each exchange's processing function is responsible for analyzing the received messages and extracting relevant data.
processMsg
FunctionProcessing messages from exchanges, identifying different types of data (e.g. deep updates, transactions, etc.) and formatting them into a unified event object.3. Subscription data
At each connection, the system tracks subscription-related market data channels based on the current transaction.
getFunction
Function: Obtain the corresponding processing function according to the name of the exchange.this.wssPublic
FunctionInitialize the WebSocket connection and start receiving data.4. Thread management
Start a thread for each exchange, receive data in real-time and process the data through callback functions.
threadMarket
Function: Receives data in sub-threads, parses and stores the latest depth and transaction information.5. Rewrite the data acquisition method
For each exchange, rewrite the method of obtaining depth and transaction information, giving priority to returning real-time updated data.
$.setupWebsocket()
Initialize the WebSocket connection to the target exchange.GetDepth()
andGetTrades()
Function that automatically returns market depth and transaction records using WebSocket real-time data.If you add the EventLoop () function to the policy, it will be converted into a trigger mechanism, which will automatically be immediately retrieved when there is a wss data update, without waiting for any updated data.
function main() {
$.setupWebsocket()
while (true) {
exchanges.map(e=>{
Log(e.GetName(), e.GetDepth())
Log(e.GetName(), e.GetTrades())
})
EventLoop(100) // trigger by websocket
}
}
Refer to my previous guide to multi-currency trading strategieshttps://www.fmz.com/digest-topic/10506Here's a very convenient way to modify it to support WebSocket:
function MakeOrder() {
for (let i in Info.trade_symbols) {
let symbol = Info.trade_symbols[i];
let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
let buy_amount = 50 / buy_price;
if (Info.position[symbol].value < 2000){
Trade(symbol, "buy", buy_price, buy_amount, symbol);
}
}
}
function OnTick() {
try {
UpdatePosition();
MakeOrder();
UpdateStatus();
} catch (error) {
Log("循环出错: " + error);
}
}
function main() {
$.setupWebsocket()
InitInfo();
while (true) {
let loop_start_time = Date.now();
if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
OnTick();
Info.time.last_loop_time = Date.now();
Info.time.loop_delay = Date.now() - loop_start_time;
}
Sleep(5);
}
}
If you follow the template in the policy and copy the format below, you can refer to the API documentation:
supports["Binance"] = function (ctx:ICtx) {
let processMsg = function (obj) {
let event = {
ts: obj.E,
instId: obj.s,
depth: null,
trades: [],
}
if (obj.e == "depthUpdate") {
let depth = {
asks: [],
bids: []
}
obj.b.forEach(function (item) {
depth.bids.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
obj.a.forEach(function (item) {
depth.asks.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
event.depth = depth
} else if (obj.e == 'bookTicker') {
event.depth = {
asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
}
} else if (obj.e == 'aggTrade') {
event.ts = obj.E
event.trades = [{
price: Number(obj.p),
qty: Number(obj.q),
ts: obj.T,
side: obj.m ? "sell" : "buy"
}]
} else if (typeof (obj.asks) !== 'undefined') {
event.ts = obj.E || new Date().getTime()
let depth = {
asks: [],
bids: []
}
obj.bids.forEach(function (item) {
depth.bids.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
obj.asks.forEach(function (item) {
depth.asks.push({
price: Number(item[0]),
qty: Number(item[1])
})
})
event.depth = depth
} else {
return
}
return event
}
let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
let ws = null
let endPoint = "wss://stream.binance.com/stream"
if (ctx.name == "Futures_Binance") {
endPoint = "wss://fstream.binance.com/stream"
}
while (true) {
if (!ws) {
let subscribes = []
ctx.symbols.forEach(function (symbol) {
channels.forEach(function (channel) {
subscribes.push(symbol.toLowerCase() + "@" + channel)
})
})
ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
}
if (!ws) {
Sleep(1000)
continue
}
updateSymbols(ctx, function(symbol:string, method:string) {
ws.write(JSON.stringify({
"method": method.toUpperCase(),
"params": channels.map(c=>symbol.toLowerCase()+'@'+c),
"id": 2
}))
})
let msg = ws.read(1000)
if (!msg) {
if (msg == "") {
trace("websocket is closed")
ws.close()
ws = null
}
continue
}
if (msg == 'ping') {
ws.write('pong')
} else if (msg == 'pong') {
} else {
let obj = JSON.parse(msg)
if (obj.error) {
trace(obj.error.msg, "#ff0000")
continue
}
if (!obj.stream) {
continue
}
if (obj.stream.indexOf("depth") != -1) {
if (typeof(obj.data.s) !== 'string') {
// patch
obj.data.s = obj.stream.split('@')[0].toUpperCase()
}
}
let event = processMsg(obj.data)
if (event) {
ctx.callback(event)
}
}
}
}