Tài nguyên đang được tải lên... tải...

Một mẫu chính sách cho phép bạn sử dụng WebSocket một cách liền mạch

Tác giả:Cỏ nhỏ, Tạo: 2024-10-30 09:49:20, Cập nhật: 2024-11-05 17:45:31

img

Đây là một mẫu thị trường WebSocket được phát triển chính thức bởi FMZ, được sao chép và lưu lại như một mẫu, bạn có thể chọn mẫu này trong chính sách mới:https://www.fmz.com/strategy/470349

Tại sao cần WebSocket?

Chiến lược hiện tại của FMZ chủ yếu là gói REST API truyền thống, với mỗi bước truy cập API thiết lập một kết nối mạng để lấy dữ liệu thị trường bằng cách tham khảo ý kiến. Phương pháp này đơn giản và dễ sử dụng, và cho hầu hết các nhu cầu, làm như vậy là hoàn toàn đủ.

Tuy nhiên, giao thức REST có những vấn đề về sự chậm trễ vốn có, khi cần nhiều cặp giao dịch, nhiều chính sách giao dịch, các vấn đề về sự chậm trễ sẽ được tăng cường. Mặc dù các chức năng Go của nền tảng có thể được thực hiện song song, các vấn đề về sự chậm trễ vẫn tồn tại, khó đáp ứng nhu cầu giao dịch chiến lược với tần suất tương đối cao, và giao dịch quá nhiều, tần suất thăm dò quá nhanh và sẽ gặp phải hạn chế tần suất truy cập nền tảng giao dịch.

Các sàn giao dịch hiện nay cũng có khối lượng máy chủ rất nặng, tất cả đều cung cấp giao thức WebSocket hoàn chỉnh và khuyến cáo người dùng sử dụng API. So với giao thức REST, WebSocket cung cấp một kết nối hai chiều lâu dài, cho phép sàn giao dịch đẩy dữ liệu đến khách hàng trong thời gian thực, tránh các yêu cầu và phản hồi thường xuyên, giảm đáng kể độ trễ. Nói chung, nếu truy cập REST API bị trì hoãn khoảng 20 ms, thì WebSocket sẽ có khoảng 2 ms độ trễ dữ liệu đẩy. Và liên kết với giao thức WebSocket không bị giới hạn về tần suất truy cập nền tảng, về cơ bản có thể đăng ký vài chục cặp giao dịch cùng một lúc.

Mẫu WebSocket thị trường

FMZ Quantified Trading Platform đã sớm hỗ trợ giao thức WebSocket và tương đối dễ sử dụng, nhưng đối với người dùng mới, việc xử lý nhiều đăng ký, đăng ký nhiều sàn giao dịch và tích hợp hiệu quả và dễ dàng trong toàn bộ quy trình chiến lược là quá phức tạp. Mẫu tăng tốc dữ liệu thị trường thời gian thực của WebSocket mở rộng giải quyết vấn đề này, rất dễ sử dụng, hoàn toàn tương thích với các cuộc gọi API được đóng gói hiện tại, thay đổi đơn giản cho hầu hết các chính sách chính sách REST ban đầu, sử dụng trực tiếp để tăng tốc chính sách của bạn.

Các đặc điểm chính:

  • Nhiều sàn giao dịch hỗ trợ: Chính sách này hỗ trợ kết nối WebSocket cho nhiều sàn giao dịch như Binance, OKX, ByBit, Bitget, và người dùng có thể bắt chước phương pháp đóng gói của mẫu này để hỗ trợ nhiều sàn giao dịch hơn.
  • Đăng ký tùy chỉnh: Cho phép đăng ký các kênh thị trường cụ thể (ví dụ: sâu, giao dịch, vv) và xử lý dữ liệu thu được một cách hiệu quả để sử dụng ngay lập tức cho chiến lược giao dịch.
  • Xử lý lỗi cao cấp: Đánh dấu lỗi tích hợp và cơ chế nối lại WebSocket để đảm bảo độ tin cậy và tính liên tục của luồng dữ liệu.

Tiến trình thực hiện đơn giản

Lưu ý rằng chính sách này sử dụng TypeScript, có vẻ hơi lạ nếu bạn chỉ quen thuộc với JavaScript. TypeScript đã giới thiệu hệ thống kiểu và các tính năng ngôn ngữ phong phú hơn dựa trên JavaScript, cho các ứng dụng cần xử lý logic phức tạp như giao dịch định lượng, sử dụng TypeScript có thể giảm thiểu các lỗi tiềm ẩn, cải thiện khả năng đọc và duy trì mã. Do đó, nên học đơn giản.

Ngoài ra, phương pháp này cũng sử dụng các cơ chế không đồng bộ của nền tảng FMZ để các subthreads có thể gửi tin nhắn đến các chủ thread thông qua chức năng __threadPostMessage. Cách này là không đồng bộ và phù hợp với việc thông báo cho chủ thread thông báo cập nhật dữ liệu được tạo ra trong các subthreads. Dữ liệu có thể được chia sẻ giữa chủ thread và các subthreads thông qua __threadGetData và __threadSetData. Cách này cho phép truy cập và sửa đổi trạng thái chia sẻ của chủ thread.

Nguyên tắc chính của chiến lược này là kết nối các sàn giao dịch tiền kỹ thuật số chính thống thông qua WebSocket để nhận dữ liệu thị trường (như thông tin sâu và thông tin giao dịch) trong thời gian thực để hỗ trợ dữ liệu cho các quyết định giao dịch định lượng.

1. WebSocket thiết lập kết nối

setupWebsocketChức năng được sử dụng để khởi tạo kết nối WebSocket, nhận dữ liệu thị trường. Nó nhận một tham sốmain_exchangesTrong khi đó, các nhà đầu tư khác cũng có thể tham gia vào dịch vụ này.

  • MyDialChức năng: Tạo kết nối WebSocket, ghi lại thời gian kết nối, và xuất thời gian kết nối khi kết nối kết thúc.
  • updateSymbolsChức năng: Kiểm tra thường xuyên xem có yêu cầu đăng ký mới hay không và cập nhật danh sách các giao dịch hiện tại nếu cần thiết.

2. Xử lý dữ liệu

supportsĐối tượng xác định sàn giao dịch được hỗ trợ và chức năng xử lý của nó (ví dụ:BinanceCác chức năng xử lý của mỗi sàn giao dịch chịu trách nhiệm phân tích các tin nhắn nhận được và trích xuất dữ liệu liên quan.

  • processMsgChức năng: xử lý tin nhắn từ sàn giao dịch, xác định các loại dữ liệu khác nhau (như cập nhật sâu, giao dịch, vv) và định dạng thành các đối tượng sự kiện thống nhất.

3. Dữ liệu đăng ký

Trong mỗi lần kết nối, hệ thống sẽ phân phối các kênh dữ liệu thị trường liên quan đến đăng ký dựa trên giao dịch hiện tại.

  • getFunctionChức năng: Nhận các chức năng xử lý tương ứng theo tên sàn giao dịch.
  • this.wssPublicChức năng: khởi động kết nối WebSocket và bắt đầu nhận dữ liệu.

4. Quản lý chủ đề

Bắt đầu một chuỗi cho mỗi sàn giao dịch, nhận dữ liệu trong thời gian thực và xử lý dữ liệu bằng các hàm gọi lại.

  • threadMarketChức năng: Nhận dữ liệu, phân tích và lưu trữ thông tin sâu và giao dịch mới nhất trong các chuỗi con.

5. Viết lại phương pháp thu thập dữ liệu

Cách viết lại để có được thông tin về độ sâu và giao dịch cho mỗi sàn giao dịch, ưu tiên trả lại dữ liệu được cập nhật trong thời gian thực.

Cách sử dụng mẫu

  1. Khởi độngSử dụng:$.setupWebsocket()Khởi động kết nối WebSocket của sàn giao dịch mục tiêu.
  2. Đăng kýHệ thống sẽ tự động đăng ký các kênh có liên quan đến loại giao dịch của bạn (ví dụ: sâu, giao dịch, v.v.).
  3. Thu thập dữ liệuĐịa chỉ:GetDepth()GetTrades()Chức năng, tự động sử dụng dữ liệu WebSocket thời gian thực để trả lại độ sâu thị trường và hồ sơ giao dịch.
  4. Xử lý sai: Các chính sách bao gồm một cơ chế theo dõi để ghi lại các lỗi kết nối và dữ liệu và tự động cố gắng kết nối lại khi kết nối bị gián đoạn.

Nếu thêm hàm EventLoop (), sẽ được chuyển thành một cơ chế kích hoạt, tự động lấy ngay lập tức khi có dữ liệu được cập nhật, thay vì chờ đợi khi không có dữ liệu mới nhất. Tương đương với chức năng Sleep thông minh, bạn cũng có thể sử dụng Sleep trực tiếp.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

Hãy tham khảo hướng dẫn chiến lược giao dịch đa đồng tiền của tôi.https://www.fmz.com/digest-topic/10506Bạn có thể dễ dàng thay đổi nó để hỗ trợ WebSocket ở đây:

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

Làm thế nào để tự mình thêm các sàn giao dịch mới

Chỉ cần làm theo các mẫu của chính sách, bạn có thể bắt chước các định dạng dưới đây và tham khảo tài liệu API của sàn giao dịch:

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }
    

Thêm nữa