Das Backtesting-System der FMZ Quant Trading Platform ist ein Backtesting-System, das sich ständig wiederholt, aktualisiert und aktualisiert. Es fügt Funktionen hinzu und optimiert die Leistung nach und nach von der ursprünglichen grundlegenden Backtesting-Funktion. Mit der Entwicklung der Plattform wird das Backtesting-System weiterhin optimiert und aktualisiert. Heute werden wir ein Thema diskutieren, das auf dem Backtesting-System basiert:
Im Bereich des quantitativen Handels können die Entwicklung und Optimierung von Strategien nicht von der Verifizierung realer Marktdaten getrennt werden. In tatsächlichen Anwendungen kann es jedoch aufgrund des komplexen und sich verändernden Marktumfelds unzureichend sein, sich auf historische Daten für das Backtesting zu verlassen, z. B. fehlende Abdeckung extremer Marktbedingungen oder spezieller Szenarien. Daher ist die Entwicklung eines effizienten Zufallsmarktgenerators zu einem effektiven Werkzeug für quantitative Strategieentwickler geworden.
Wenn wir die Strategie historische Daten auf einer bestimmten Börse oder Währung zurückverfolgen lassen müssen, können wir die offizielle Datenquelle der FMZ-Plattform für Backtesting verwenden.
Die Bedeutung der Verwendung zufälliger Tickerdaten besteht darin:
Kann sich die Strategie an den Trend- und Volatilitätswechsel anpassen? Wird die Strategie unter extremen Marktbedingungen einen großen Verlust verursachen?
Stützt sich die Strategie zu sehr auf eine bestimmte Marktstruktur? Gibt es eine Gefahr einer Überanpassung der Parameter?
Es ist jedoch auch notwendig, die Strategie rational zu bewerten.
Wie können wir Daten für das Backtesting-System "fabrizieren", damit es sie bequem, schnell und einfach nutzen kann?
Dieser Artikel soll einen Ausgangspunkt für die Diskussion bieten und eine relativ einfache zufällige Tickererzeugung berechnen. In der Tat gibt es eine Vielzahl von Simulationsalgorithmen, Datenmodellen und anderen Technologien, die angewendet werden können. Aufgrund des begrenzten Raums der Diskussion werden wir keine komplexen Datensimulationsmethoden verwenden.
Wir haben die benutzerdefinierte Datenquelle des Backtesting-Systems der Plattform kombiniert und ein Programm in Python geschrieben.
Für einige Generationsstandards und Datei-Speicherung von K-Line-Daten können folgende Parametersteuerungen definiert werden:
Zufällig generierter Datenmodus Für die Simulation der Schwankungsart von K-Liniendaten wird einfach ein einfaches Design mit der Wahrscheinlichkeit von positiven und negativen Zufallszahlen erstellt. Wenn die erzeugten Daten nicht viel sind, spiegeln sie möglicherweise nicht das erforderliche Marktmuster wider. Wenn es eine bessere Methode gibt, kann dieser Teil des Codes ersetzt werden. Auf der Grundlage dieses einfachen Designs kann die Anpassung des Zufallszahlengenerationsbereichs und einiger Koeffizienten im Code den erzeugten Dateneffekt beeinflussen.
Überprüfung der Daten Die erzeugten K-Liniendaten müssen auch auf Rationalität geprüft werden, um zu überprüfen, ob die hohen Eröffnungs- und niedrigen Schlusskosten gegen die Definition verstoßen, und um die Kontinuität der K-Liniendaten zu überprüfen.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("the custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("The CSV file format is incorrect, the number of columns is different, please check!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("The CSV file format is incorrect, please check!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("data.detail: ", data["detail"], "Respond to backtesting system requests.")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("Unsupported K-line period, please use 'm', 'h', or 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("Abnormal data:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("Current path:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("The file was written successfully. The following is part of the file content:")
Log("".join(lines[:5]))
else:
Log("Failed to write the file, the file is empty!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("Start the custom data source service thread, and the data is provided by the CSV file.", ", Address/Port: 0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("Failed to start custom data source service!")
Log("error message:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("Generator parameters:", "Start time:", startTime, "End time:", endTime, "K-line period:", KLinePeriod, "Initial price:", firstPrice, "Type of volatility:", arrTrendType[trendType], "Volatility coefficient:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Nach den oben genannten Angaben konfigurieren und anpassen.http://xxx.xxx.xxx.xxx:9090
ist die IP-Adresse des Servers und der offene Port der Strategie zur Erstellung zufälliger Tickers.
Dies ist die benutzerdefinierte Datenquelle, die im Abschnitt benutzerdefinierte Datenquelle des Plattform-API-Dokumentes zu finden ist.
Zu diesem Zeitpunkt wird das Backtest-System mit unseren
Strategie-Quellcode:Backtest-System zufälliger Tickergenerator
Vielen Dank für Ihre Unterstützung und Lesung.