Tài nguyên đang được tải lên... tải...

Dạy bạn nâng cấp các nhà thu thập thị trường backtest nguồn dữ liệu tùy chỉnh

Tác giả:Tốt, Tạo: 2020-06-06 08:53:02, Cập nhật: 2023-11-01 20:28:58

img

Bài trướcDạy bạn thực hiện một bộ sưu tập báo giá thị trườngChúng tôi đã thực hiện một chương trình robot thu thập giá cả thị trường cùng nhau.

Làm thế nào để sử dụng dữ liệu thị trường sau khi chúng tôi thu thập nó? nó sẽ được sử dụng cho hệ thống backtest. dựa trên chức năng nguồn dữ liệu tùy chỉnh của hệ thống backtest nền tảng FMZ, chúng tôi có thể trực tiếp sử dụng dữ liệu thu thập được như là nguồn dữ liệu của hệ thống backtest, để chúng tôi có thể để hệ thống backtest sử dụng trong bất kỳ thị trường nào mà chúng tôi muốn backtest dữ liệu lịch sử.

Do đó, chúng ta có thể cung cấp cho Market Quote Collector một nâng cấp! cho phép người thu thập thị trường cũng có thể phục vụ như một nguồn dữ liệu tùy chỉnh để cung cấp dữ liệu cho hệ thống backtest.

Chuẩn bị đi

Nó khác với công việc chuẩn bị trong bài viết trước. Lần trước là một chương trình docker chạy trên máy tính MAC địa phương của tôi, cài đặt cơ sở dữ liệu mongodb để bắt đầu dịch vụ cơ sở dữ liệu. Lần này chúng tôi thay đổi môi trường hoạt động thành VPS và sử dụng máy chủ Alibaba Cloud Linux để chạy bộ chương trình của chúng tôi.

  • Cơ sở dữ liệu Mongodb

Như trong bài viết trước, chúng ta cần cài đặt cơ sở dữ liệu mongodb trên thiết bị mà chương trình thu thập thị trường đang chạy và bắt đầu dịch vụ. Nó về cơ bản giống như cài đặt mongodb trên máy tính MAC. Có rất nhiều hướng dẫn trên Internet, bạn có thể google nó, rất đơn giản.

  • Cài đặt Python 3

Chương trình sử dụng python3, hãy chú ý đến việc sử dụng một số thư viện python, nếu chúng không được cài đặt, bạn cần cài đặt chúng trước.

bông hoa http urllib

  • Docker

Một FMZ docker chạy sẽ là đủ.

Chuyển đổi Market Quote Collector

Người thu thập báo giá thị trường là:https://www.fmz.com/strategy/199120(RecordsCollector) chiến lược.

Chúng ta hãy sửa đổi nó:

Trước khi chương trình nhập vào vòng lặp while để thu thập dữ liệu, một thư viện đa luồng được sử dụng và thực thi đồng thời bắt đầu một dịch vụ để theo dõi yêu cầu dữ liệu của hệ thống backtest nền tảng FMZ. (Các chi tiết khác có thể bị bỏ qua)

RecordsCollector (cải tiến để cung cấp chức năng nguồn dữ liệu tùy chỉnh)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Kiểm tra

Thiết lập robot

img

Chạy robot, chạy bộ sưu tập báo giá thị trường.

img

Mở một chiến lược thử nghiệm cho backtest. Ví dụ:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Thiết lập tùy chọn backtest, thiết lập sàn giao dịch cho Binance vì nguồn dữ liệu tùy chỉnh tạm thời chưa thể tự xây dựng tên sàn giao dịch, bạn chỉ có thể mượn một trong các cấu hình sàn giao dịch trong danh sách, backtest cho thấy Binance, thực tế Đó là dữ liệu của thị trường mô phỏng của WexApp.

img

So sánh xem biểu đồ được tạo ra bởi hệ thống backtest dựa trên bộ sưu tập báo giá thị trường như một nguồn dữ liệu tùy chỉnh có giống với biểu đồ đường K 1 giờ trên trang trao đổi wexApp không.

img img

Bằng cách này, robot trên VPS có thể tự thu thập dữ liệu đường K, và chúng tôi có thể thu thập dữ liệu thu thập được bất cứ lúc nào và kiểm tra lại trực tiếp trong hệ thống kiểm tra lại.

Bạn có thể tiếp tục mở rộng, ví dụ, thử các nguồn dữ liệu tùy chỉnh backtest cấp thực, và thu thập dữ liệu đa dạng, đa thị trường và các chức năng khác.


Có liên quan

Thêm nữa