En la carga de los recursos... Cargando...

Explorar métodos de prueba de estrategias basados en generadores de mercado aleatorios

El autor:Los inventores cuantifican - sueños pequeños, Creado: 2024-11-29 16:35:44, Actualizado: 2024-11-29 17:47:24

[TOC] ¿Qué quieres decir?

img

Prólogo

El sistema de retrospección de la plataforma de negociación cuantitativa de los inventores es un sistema de retrospección que se actualiza continuamente y se actualiza, aumentando gradualmente las funciones y optimizando el rendimiento de las funciones de retrospección básicas iniciales. Con el desarrollo de la plataforma, el sistema de retrospección se optimiza continuamente.

Las necesidades

En el campo de la cuantificación de operaciones, el desarrollo de estrategias y la optimización de la verificación de datos de mercado reales son inseparables. Sin embargo, en las aplicaciones prácticas, debido a la complejidad del entorno del mercado, puede haber insuficiencia para la retroalimentación basada en datos históricos, como la falta de cobertura de mercados extremos o escenarios especiales. Por lo tanto, el diseño de un generador de tránsito aleatorio eficiente es una herramienta efectiva para los desarrolladores de estrategias de cuantificación.

Cuando necesitamos que las estrategias se remontan a datos históricos en un intercambio, una moneda, podemos usar la fuente oficial de datos de la plataforma FMZ para volver a probarlas. A veces también queremos ver cómo se desempeñan las estrategias en un mercado completamente desconocido, en este momento podemos fabricar algunos datos para probar las estrategias.

El uso de datos de mercado aleatorios tiene el siguiente significado:

    1. Evaluar la robustez de las estrategias Los generadores de mercado aleatorios pueden crear una variedad de escenarios de mercado posibles, incluyendo extremos, bajos, tendencias y mercados turbulentos. Probar estrategias en estos entornos simulados puede ayudar a evaluar si funcionan de manera estable en diferentes condiciones de mercado.

    ¿Puede la estrategia adaptarse a las tendencias y cambios de conmoción? ¿Es una estrategia que puede generar grandes pérdidas en el mercado extremo?

    1. Identificar las potenciales debilidades de las estrategias Se pueden detectar posibles debilidades de las estrategias y hacer mejoras mediante la simulación de algunas situaciones de mercado inusuales (por ejemplo, un supuesto evento de cisne negro).

    ¿La estrategia depende demasiado de alguna estructura de mercado? ¿Hay algún riesgo de que los parámetros sean demasiado ajustados?

    1. Optimización de los parámetros de la estrategia Los datos generados al azar ofrecen un entorno de prueba más diversificado para ajustar los parámetros de la estrategia, sin tener que depender completamente de los datos históricos. Esto permite encontrar un rango más amplio de parámetros de la estrategia, evitando limitarse a patrones de mercado específicos en los datos históricos.
    1. La falta de información histórica En algunos mercados (por ejemplo, mercados emergentes o mercados de operaciones de monedas pequeñas), los datos históricos pueden no ser suficientes para cubrir todas las condiciones posibles del mercado. Los generadores de mercados aleatorios pueden proporcionar una gran cantidad de datos complementarios que ayudan a realizar pruebas más completas.
    1. Desarrollo iterativo rápido Las pruebas rápidas con datos aleatorios permiten acelerar la velocidad de los iterativos de desarrollo de estrategias, sin depender de mercados en tiempo real o de la limpieza y clasificación de datos.

Sin embargo, también se necesita una estrategia de evaluación racional, y para los datos de mercado generados al azar, hay que tener en cuenta:

  • 1, aunque los generadores de transacciones aleatorias son útiles, su significado depende de la calidad de los datos generados y del diseño de los escenarios objetivos:
  • La lógica de generación debe ser cercana al mercado real: los resultados de las pruebas pueden carecer de un valor de referencia si el mercado generado al azar está completamente fuera de la realidad. Por ejemplo, se puede diseñar un generador que combine características estadísticas reales del mercado (como distribución de fluctuaciones, proporción de tendencias).
  • 3. No puede ser un reemplazo completo de la prueba de datos reales: los datos aleatorios solo pueden complementar el desarrollo y la optimización de las estrategias, y las estrategias finales aún deben verificar su eficacia en datos reales del mercado.

Hablando de todo esto, ¿cómo podemos manipular los datos? ¿Cómo podemos manipular los datos de forma fácil, rápida y fácil de usar para que los sistemas de retroevaluación los utilicen?

La idea del diseño

Este artículo está diseñado para analizar las cuentas, para dar un cálculo de generación de mercado aleatorio más sencillo, y en realidad hay una gran variedad de algoritmos de simulación, modelos de datos y otras técnicas que se pueden aplicar, ya que el alcance de la discusión es limitado.

Combinando la funcionalidad de fuente de datos personalizada del sistema de recuperación de plataformas, escribimos un programa en el lenguaje Python.

  • 1, generar al azar un conjunto de datos de K líneas escritos en registros de persistencia de archivos CSV, de modo que los datos generados puedan ser guardados.
  • 2, y luego crear un servicio para que el sistema de retrospección tenga soporte de fuentes de datos.
  • 3, muestra los datos generados de la línea K en el gráfico.

Para algunos parámetros de generación de datos de línea K, almacenamiento de archivos, etc., se pueden definir los siguientes controles de parámetros:

img

  • Modelo de generación aleatoria de datos Para el tipo de fluctuación de los datos de la línea K, solo se utiliza un simple diseño de un número aleatorio con probabilidades negativas positivas diferentes, que puede no reflejar el patrón de mercado deseado cuando no se genera una gran cantidad de datos. Si hay un mejor método, se puede reemplazar esta parte del código. Basándose en este simple diseño, ajustar el rango de generación de números aleatorios en el código y algunos coeficientes puede afectar el efecto de los datos generados.

  • Verificación de datos También se requiere una prueba de racionalidad para los datos de línea K generados, para comprobar si los precios altos y bajos de cobro violan la definición, para comprobar la continuidad de los datos de línea K, etc.

Generador de tránsito aleatorio del sistema de repetición

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility
            else:
                change = random.uniform(-0.5, 0.5) * volatility            

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 5
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

Prácticas en el sistema de retrospección

Crear un ejemplo de la política anterior, configurar los parámetros y ejecutar. 2, el disco real (instancia de la política) debe ejecutarse en el host que se ha implementado en el servidor, ya que se requiere un IP de red pública para que el sistema de retrospección pueda acceder a él para obtener datos. En el tercer caso, al hacer clic en el botón de interacción, la estrategia generará automáticamente datos de mercado aleatorios.

img img

4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件

img

Ahora podemos usar los datos generados al azar para hacer un análisis aleatorio con una estrategia.

img

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

Los cambios en la configuración se realizan de acuerdo con la información anterior.http://xxx.xxx.xxx.xxx:9090Es la dirección IP del servidor y el puerto abierto que generan la política al azar. Este es el origen de datos personalizados, y se puede consultar el capítulo de origen de datos personalizados en la documentación de la API de la plataforma.

6. Con el sistema de retroalimentación configurado, las fuentes de datos pueden ser probadas con datos aleatorios.

img

img

En este momento, el sistema de retroceso se ha probado con datos analógicos de nuestra máquina de fabricar bombas. Según los datos en el gráfico de mercado en el momento del retroceso, se compara con los datos en el gráfico de mercado generado al azar en el plato real, el tiempo: 17 horas enteras del 16 de octubre de 2024, los datos son los mismos.

7. ¡Sí, casi lo olvidé! Este programa de Python para generadores de transacciones aleatorios crea un disco real para facilitar la demostración, la operación y la visualización de los datos generados por K-lines. En la aplicación real, se puede escribir un script Python independiente sin necesidad de ejecutar el disco real.

El código fuente de la estrategia:Generador de tránsito aleatorio del sistema de repetición

Gracias por su apoyo y lectura.


Más.