[TOC] ¿Qué quieres decir?
El sistema de retrospección de la plataforma de negociación cuantitativa de los inventores es un sistema de retrospección que se actualiza continuamente y se actualiza, aumentando gradualmente las funciones y optimizando el rendimiento de las funciones de retrospección básicas iniciales. Con el desarrollo de la plataforma, el sistema de retrospección se optimiza continuamente.
En el campo de la cuantificación de operaciones, el desarrollo de estrategias y la optimización de la verificación de datos de mercado reales son inseparables. Sin embargo, en las aplicaciones prácticas, debido a la complejidad del entorno del mercado, puede haber insuficiencia para la retroalimentación basada en datos históricos, como la falta de cobertura de mercados extremos o escenarios especiales. Por lo tanto, el diseño de un generador de tránsito aleatorio eficiente es una herramienta efectiva para los desarrolladores de estrategias de cuantificación.
Cuando necesitamos que las estrategias se remontan a datos históricos en un intercambio, una moneda, podemos usar la fuente oficial de datos de la plataforma FMZ para volver a probarlas. A veces también queremos ver cómo se desempeñan las estrategias en un mercado completamente desconocido, en este momento podemos fabricar algunos datos para probar las estrategias.
El uso de datos de mercado aleatorios tiene el siguiente significado:
¿Puede la estrategia adaptarse a las tendencias y cambios de conmoción? ¿Es una estrategia que puede generar grandes pérdidas en el mercado extremo?
¿La estrategia depende demasiado de alguna estructura de mercado? ¿Hay algún riesgo de que los parámetros sean demasiado ajustados?
Sin embargo, también se necesita una estrategia de evaluación racional, y para los datos de mercado generados al azar, hay que tener en cuenta:
Hablando de todo esto, ¿cómo podemos manipular los datos? ¿Cómo podemos manipular los datos de forma fácil, rápida y fácil de usar para que los sistemas de retroevaluación los utilicen?
Este artículo está diseñado para analizar las cuentas, para dar un cálculo de generación de mercado aleatorio más sencillo, y en realidad hay una gran variedad de algoritmos de simulación, modelos de datos y otras técnicas que se pueden aplicar, ya que el alcance de la discusión es limitado.
Combinando la funcionalidad de fuente de datos personalizada del sistema de recuperación de plataformas, escribimos un programa en el lenguaje Python.
Para algunos parámetros de generación de datos de línea K, almacenamiento de archivos, etc., se pueden definir los siguientes controles de parámetros:
Modelo de generación aleatoria de datos Para el tipo de fluctuación de los datos de la línea K, solo se utiliza un simple diseño de un número aleatorio con probabilidades negativas positivas diferentes, que puede no reflejar el patrón de mercado deseado cuando no se genera una gran cantidad de datos. Si hay un mejor método, se puede reemplazar esta parte del código. Basándose en este simple diseño, ajustar el rango de generación de números aleatorios en el código y algunos coeficientes puede afectar el efecto de los datos generados.
Verificación de datos También se requiere una prueba de racionalidad para los datos de línea K generados, para comprobar si los precios altos y bajos de cobro violan la definición, para comprobar la continuidad de los datos de línea K, etc.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility
else:
change = random.uniform(-0.5, 0.5) * volatility
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 5
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
Crear un ejemplo de la política anterior, configurar los parámetros y ejecutar. 2, el disco real (instancia de la política) debe ejecutarse en el host que se ha implementado en el servidor, ya que se requiere un IP de red pública para que el sistema de retrospección pueda acceder a él para obtener datos. En el tercer caso, al hacer clic en el botón de interacción, la estrategia generará automáticamente datos de mercado aleatorios.
4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件
Ahora podemos usar los datos generados al azar para hacer un análisis aleatorio con una estrategia.
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Los cambios en la configuración se realizan de acuerdo con la información anterior.http://xxx.xxx.xxx.xxx:9090
Es la dirección IP del servidor y el puerto abierto que generan la política al azar.
Este es el origen de datos personalizados, y se puede consultar el capítulo de origen de datos personalizados en la documentación de la API de la plataforma.
6. Con el sistema de retroalimentación configurado, las fuentes de datos pueden ser probadas con datos aleatorios.
En este momento, el sistema de retroceso se ha probado con datos analógicos de nuestra máquina de fabricar bombas. Según los datos en el gráfico de mercado en el momento del retroceso, se compara con los datos en el gráfico de mercado generado al azar en el plato real, el tiempo: 17 horas enteras del 16 de octubre de 2024, los datos son los mismos.
7. ¡Sí, casi lo olvidé! Este programa de Python para generadores de transacciones aleatorios crea un disco real para facilitar la demostración, la operación y la visualización de los datos generados por K-lines. En la aplicación real, se puede escribir un script Python independiente sin necesidad de ejecutar el disco real.
El código fuente de la estrategia:Generador de tránsito aleatorio del sistema de repetición
Gracias por su apoyo y lectura.