Tài nguyên đang được tải lên... tải...

Khám phá phương pháp thử nghiệm chiến lược dựa trên trình tạo thị trường ngẫu nhiên

Tác giả:Những nhà phát minh định lượng - những giấc mơ nhỏ, Tạo: 2024-11-29 16:35:44, Cập nhật: 2024-12-02 09:12:43

[TOC]

img

Lời mở đầu

Các nhà phát minh của nền tảng giao dịch định lượng hệ thống kiểm tra lại là một hệ thống kiểm tra liên tục lặp đi lặp lại nâng cấp, từ các chức năng kiểm tra cơ bản ban đầu, tăng dần tính năng, tối ưu hóa hiệu suất. Với sự phát triển của nền tảng kiểm tra lại hệ thống sẽ tiếp tục tối ưu hóa nâng cấp, hôm nay chúng tôi sẽ đi đến một chủ đề dựa trên hệ thống kiểm tra lại: "kiểm tra chiến lược dựa trên tình huống ngẫu nhiên".

Nhu cầu

Trong lĩnh vực giao dịch định lượng, việc phát triển chiến lược và tối ưu hóa chứng minh dữ liệu thị trường thực không tách rời. Tuy nhiên, trong các ứng dụng thực tế, việc dựa vào dữ liệu lịch sử để tái khảo sát có thể không đủ, chẳng hạn như thiếu bảo hiểm cho các thị trường cực hoặc các kịch bản đặc biệt. Do đó, việc thiết kế một trình tạo tình huống ngẫu nhiên hiệu quả trở thành một công cụ hiệu quả cho các nhà phát triển chiến lược định lượng.

Khi chúng ta cần một chiến lược có dữ liệu lịch sử trên một sàn giao dịch, một loại tiền tệ, chúng ta có thể sử dụng nguồn dữ liệu chính thức của nền tảng FMZ để kiểm tra lại. Đôi khi chúng ta cũng muốn xem chiến lược sẽ hoạt động như thế nào trong một thị trường hoàn toàn xa lạ.

Sử dụng dữ liệu thị trường ngẫu nhiên có nghĩa là:

    1. Đánh giá tính mạnh mẽ của chiến lược Các trình tạo thị trường ngẫu nhiên có thể tạo ra nhiều tình huống thị trường có thể xảy ra, bao gồm biến động cực, biến động thấp, thị trường xu hướng và thị trường lung lay. Kiểm tra chiến lược trong các môi trường giả lập này có thể giúp đánh giá liệu nó có ổn định trong các điều kiện thị trường khác nhau hay không. Ví dụ:

    Liệu chiến lược có phù hợp với xu hướng và biến động chuyển đổi không? Có phải chiến lược này sẽ gây ra tổn thất lớn trong thị trường cực đoan?

    1. Xác định điểm yếu tiềm tàng của chiến lược Bằng cách mô phỏng một số tình huống thị trường bất thường (ví dụ như một sự kiện bạch tuộc giả định), các điểm yếu tiềm ẩn của chiến lược có thể được phát hiện và cải thiện; ví dụ:

    Liệu chiến lược của bạn có phụ thuộc quá nhiều vào một cấu trúc thị trường nào đó không? Có nguy cơ các tham số quá phù hợp không?

    1. Tối ưu hóa các tham số chiến lược Dữ liệu được tạo ngẫu nhiên cung cấp môi trường thử nghiệm đa dạng hơn để điều chỉnh các tham số chiến lược, không phải hoàn toàn dựa vào dữ liệu lịch sử. Điều này giúp tìm thấy phạm vi tham số của chiến lược một cách toàn diện hơn, tránh bị giới hạn trong các mô hình thị trường cụ thể trong dữ liệu lịch sử.
    1. Không đủ dữ liệu lịch sử Trong một số thị trường (ví dụ như thị trường mới nổi hoặc thị trường giao dịch tiền tệ nhỏ), dữ liệu lịch sử có thể không đủ để bao gồm tất cả các tình huống thị trường có thể xảy ra. Máy tạo thị trường ngẫu nhiên có thể cung cấp một lượng lớn dữ liệu bổ sung để giúp kiểm tra toàn diện hơn.
    1. Phát triển lặp lại nhanh Sử dụng dữ liệu ngẫu nhiên để thử nghiệm nhanh, có thể tăng tốc độ lặp lại phát triển chiến lược mà không cần phải dựa vào thị trường thực tế hoặc việc dọn dẹp và sắp xếp dữ liệu tốn thời gian.

Tuy nhiên, cũng cần một chiến lược đánh giá hợp lý, với dữ liệu thị trường được tạo ra ngẫu nhiên, hãy lưu ý:

  • 1, Mặc dù các bộ tạo ra thị trường ngẫu nhiên rất hữu ích, nhưng ý nghĩa của nó phụ thuộc vào chất lượng dữ liệu được tạo và thiết kế các kịch bản mục tiêu:
  • 2, tạo ra logic gần với thị trường thực: nếu thị trường được tạo ra ngẫu nhiên hoàn toàn tách khỏi thực tế, kết quả thử nghiệm có thể thiếu giá trị tham chiếu. Ví dụ, có thể kết hợp các đặc điểm thống kê thị trường thực (như phân bố tỷ lệ biến động, tỷ lệ xu hướng) để thiết kế trình tạo.
  • 3, Không thể thay thế hoàn toàn thử nghiệm dữ liệu thực: Dữ liệu ngẫu nhiên chỉ bổ sung cho việc phát triển và tối ưu hóa chiến lược, và chiến lược cuối cùng vẫn cần phải được chứng minh hiệu quả trên dữ liệu thị trường thực.

Như vậy, làm thế nào chúng ta có thể tạo ra một số dữ liệu dễ dàng, nhanh chóng, dễ sử dụng để tạo ra dữ liệu để sử dụng hệ thống kiểm tra lại?

Ý tưởng thiết kế

Bài viết này được thiết kế để loại bỏ các quả cầu dẫn để cung cấp tính toán tạo ra các thị trường ngẫu nhiên tương đối đơn giản, thực tế có một loạt các thuật toán mô phỏng đa dạng, mô hình dữ liệu và các kỹ thuật khác có thể được áp dụng, vì thảo luận giới hạn không sử dụng các phương pháp mô phỏng dữ liệu đặc biệt phức tạp.

Kết hợp với tính năng nguồn dữ liệu tùy chỉnh của hệ thống tra cứu nền tảng, chúng tôi đã viết một chương trình bằng ngôn ngữ Python.

  • 1, Tự nhiên tạo ra một tập hợp dữ liệu K-line để ghi vào hồ sơ CSV để lưu trữ dữ liệu.
  • 2, sau đó tạo một dịch vụ hỗ trợ nguồn dữ liệu cho hệ thống kiểm tra lại.
  • 3, trình bày dữ liệu đường K được tạo ra trong biểu đồ.

Đối với một số tiêu chuẩn tạo dữ liệu đường K, lưu trữ tệp, v.v., các điều khiển tham số sau đây có thể được định nghĩa:

img

  • Mô hình dữ liệu được tạo ngẫu nhiên Đối với kiểu biến động của dữ liệu K-line, chỉ cần sử dụng đơn giản các số ngẫu nhiên khác với xác suất âm dương để thiết kế đơn giản, có thể không thể thể hiện các mô hình hành vi cần thiết khi dữ liệu được tạo ra không nhiều. Nếu có phương pháp tốt hơn, bạn có thể thay thế phần mã này. Dựa trên thiết kế đơn giản này, điều chỉnh phạm vi tạo số ngẫu nhiên trong mã và một số hệ số có thể ảnh hưởng đến hiệu ứng dữ liệu được tạo.

  • Kiểm tra dữ liệu Đối với dữ liệu K-line được tạo ra, cũng cần kiểm tra tính hợp lý, kiểm tra liệu giá trả cao hay thấp có vi phạm định nghĩa hay không, kiểm tra tính liên tục của dữ liệu K-line, v.v.

Hệ thống kiểm tra máy phát hành tình huống ngẫu nhiên

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle % 360)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
            else:
                change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 1
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

Thực hành trong hệ thống kiểm tra lại

1, Tạo các trường hợp chính sách trên, cấu hình các tham số, chạy. 2, ổ đĩa thực (đối với trường hợp chính sách) cần phải chạy trên máy chủ được triển khai trên máy chủ, vì cần có IP mạng công cộng để hệ thống truy cập có thể lấy dữ liệu. Khi bạn nhấp vào nút tương tác, chiến lược sẽ tự động bắt đầu tạo dữ liệu thị trường ngẫu nhiên.

img img

4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件

img

Và sau đó, chúng ta có thể sử dụng dữ liệu được tạo ngẫu nhiên này để kiểm tra lại một cách ngẫu nhiên bằng một chiến lược.

img

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

Theo thông tin trên, cấu hình được điều chỉnh.http://xxx.xxx.xxx.xxx:9090Đây là địa chỉ IP của máy chủ và cổng mở của ổ đĩa chính sách được tạo ngẫu nhiên. Đây là nguồn dữ liệu tùy chỉnh, bạn có thể truy vấn mục nguồn dữ liệu tùy chỉnh trong tài liệu API nền tảng.

6, hệ thống truy cập lại được thiết lập để kiểm tra dữ liệu tình huống ngẫu nhiên.

img

img

Trong thời điểm này, hệ thống kiểm tra lại được thử nghiệm bằng dữ liệu mô phỏng của chúng tôi. Dựa trên dữ liệu trong biểu đồ thị trường khi kiểm tra lại, so sánh với dữ liệu trong biểu đồ thị trường thực được tạo ra ngẫu nhiên, thời gian: 17 giờ hoàn chỉnh ngày 16 tháng 10 năm 2024, dữ liệu giống nhau.

7, ôi đúng, gần như quên nói! Chương trình python của trình tạo trường hợp ngẫu nhiên này tạo ra một ổ đĩa thực để dễ dàng trình bày, vận hành, hiển thị dữ liệu K-line được tạo ra. Trong ứng dụng thực tế, bạn hoàn toàn có thể viết một kịch bản python độc lập mà không cần phải chạy ổ đĩa thực.

Nguồn mã chiến lược:Hệ thống kiểm tra máy phát hành tình huống ngẫu nhiên

Cảm ơn bạn đã ủng hộ và đọc.


Thêm nữa