[TOC]
Le système de backtesting de la plateforme de trading quantitative Inventor est un système de backtesting qui est en constante itération, mise à jour et mise à niveau. À partir des fonctions de backtesting de base initiales, il ajoute progressivement des fonctions et optimise les performances. Au fur et à mesure du développement de la plateforme, le système de backtesting continuera d’être optimisé et mis à niveau. Aujourd’hui, nous allons aborder un sujet basé sur le système de backtesting : « Tests de stratégie basés sur des conditions de marché aléatoires ».
Dans le domaine du trading quantitatif, le développement et l’optimisation des stratégies ne peuvent être séparés de la vérification des données réelles du marché. Cependant, dans les applications réelles, en raison de l’environnement de marché complexe et changeant, le recours aux données historiques pour les tests rétrospectifs peut présenter des lacunes, telles que le manque de couverture des conditions de marché extrêmes ou de scénarios particuliers. Par conséquent, la conception d’un générateur de marché aléatoire efficace devient un outil efficace pour les développeurs de stratégies quantitatives.
Lorsque nous devons tester la stratégie sur une certaine bourse ou devise en utilisant des données historiques, nous pouvons utiliser la source de données officielle de la plateforme FMZ pour le test rétrospectif. Parfois, nous souhaitons également voir comment une stratégie se comporte sur un marché totalement « inconnu ». À ce moment-là, nous pouvons « fabriquer » des données pour tester la stratégie.
L’importance de l’utilisation de données de marché aléatoires est la suivante :
La stratégie peut-elle s’adapter aux tendances et aux chocs ? La stratégie entraînera-t-elle des pertes substantielles dans des conditions de marché extrêmes ?
La stratégie s’appuie-t-elle trop sur une structure de marché particulière ? Existe-t-il un risque de surajustement des paramètres ?
Il est toutefois nécessaire d’évaluer la stratégie de manière rationnelle. Pour les données de marché générées de manière aléatoire, veuillez noter :
Ceci étant dit, comment peut-on « fabriquer » des données ? Comment pouvons-nous « fabriquer » de manière pratique, rapide et facile des données à utiliser dans un système de backtesting ?
Cet article est conçu pour fournir un point de départ à la discussion et propose un calcul de génération de marché aléatoire relativement simple. En fait, il existe une variété d’algorithmes de simulation, de modèles de données et d’autres technologies qui peuvent être appliqués. En raison de l’espace limité de la discussion , nous n’utiliserons pas de méthodes de simulation de données particulièrement complexes.
En combinant la fonction de source de données personnalisée du système de backtesting de la plateforme, nous avons écrit un programme en Python.
Pour certaines normes de génération et de stockage de fichiers de données de ligne K, les contrôles de paramètres suivants peuvent être définis :
Modèle de données généré aléatoirement Pour simuler le type de fluctuation des données K-line, une conception simple est simplement réalisée en utilisant les différentes probabilités de nombres aléatoires positifs et négatifs. Lorsque les données générées ne sont pas volumineuses, le modèle de marché requis peut ne pas être reflété. S’il existe un meilleur moyen, vous pouvez remplacer cette partie du code. Sur la base de cette conception simple, l’ajustement de la plage de génération de nombres aléatoires et de certains coefficients dans le code peut affecter l’effet des données générées.
Vérification des données Les données de la ligne K générées doivent également être vérifiées pour leur rationalité, pour vérifier si les prix d’ouverture élevés et les prix de clôture bas violent la définition, pour vérifier la continuité des données de la ligne K, etc.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Configurez selon les informations ci-dessus et effectuez des ajustements spécifiques.http://xxx.xxx.xxx.xxx:9090
Il s’agit de l’adresse IP du serveur et du port ouvert du disque réel de la stratégie de génération de marché aléatoire.
Il s’agit d’une source de données personnalisée. Vous pouvez vous référer à la section Source de données personnalisée dans la documentation API de la plateforme pour plus d’informations.
À ce stade, le système de backtesting est testé à l’aide de nos données simulées « fabriquées ». Selon les données du graphique du marché pendant le backtest, comparez les données du graphique en temps réel généré par des conditions de marché aléatoires. L’heure est 17h00 le 16 octobre 2024. Les données sont les mêmes.
Code source de la stratégie :Générateur de citations aléatoires pour système de backtesting
Merci pour votre soutien et votre lecture.