[TOC] Je vous en prie.
Le système de retouche de la plate-forme de négociation quantitative de l'inventeur est un système de retouche qui est constamment mis à jour et mis à niveau, augmentant progressivement les fonctionnalités et optimisant les performances de la fonction de retouche de base initiale. Au fur et à mesure que la plate-forme évolue, le système de retouche est constamment optimisé.
Dans le domaine de la quantification des transactions, le développement et l'optimisation des stratégies sont indissociables de la vérification des données du marché réel. Cependant, dans les applications pratiques, il peut y avoir une insuffisance de la retrospection basée sur les données historiques, par exemple une absence de couverture des marchés extrêmes ou des scénarios particuliers, en raison de la complexité et de la variabilité de l'environnement du marché.
Lorsque nous avons besoin d'une stratégie qui renvoie à des données historiques sur un échange ou une devise, nous pouvons utiliser la source officielle de la plate-forme FMZ pour la revoir. Parfois, nous voulons voir comment la stratégie se comporte dans un marché totalement inconnu.
L'utilisation de données de marché aléatoires signifie:
Les stratégies sont-elles adaptées aux tendances et aux changements? La stratégie pourrait-elle entraîner de lourdes pertes dans les marchés extrêmes?
La stratégie dépend-elle trop d'une certaine structure de marché? Y a-t-il un risque que les paramètres soient trop adaptés?
Il faut aussi une stratégie d'évaluation rationnelle, et pour les données de marché générées au hasard, il faut faire attention:
Cela dit, comment pouvons-nous fabriquer des données de manière à ce qu'elles soient faciles, rapides et faciles à utiliser?
Ce texte a été conçu pour fournir des calculs de génération aléatoire relativement simples, mais il existe une grande variété d'algorithmes d'analyses, de modèles de données et d'autres techniques qui peuvent être appliquées, car la discussion est limitée.
En combinant les fonctionnalités de source de données personnalisées du système de retouche de plate-forme, nous avons écrit un programme en Python.
Pour certains paramètres de génération de données en ligne K, comme le stockage de fichiers, les paramètres suivants peuvent être définis:
Modèle de génération aléatoire de données Pour les types de fluctuations des données de la ligne K, il suffit d'utiliser des nombres aléatoires pour une conception simple différente de la probabilité positive négative, ce qui peut ne pas refléter le modèle de marché souhaité lorsque les données générées sont peu nombreuses. Si une meilleure méthode existe, cette partie du code peut être remplacée. Sur la base de cette conception simple, la modification de la gamme de génération de nombres aléatoires dans le code et de certains coefficients peut affecter l'effet des données générées.
Vérification des données Il est également nécessaire de vérifier la rationalité des données générées par la ligne K, de vérifier si les prix élevés et bas sont contraires à la définition, de vérifier la continuité des données de la ligne K, etc.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
Créer des instances de politiques, configurer des paramètres et les exécuter. Le disque réel (exemple de stratégie) doit être exécuté sur un hôte déployé sur le serveur, car il faut une adresse IP publique pour que le système de retouche puisse accéder aux données. 3° Cliquez sur le bouton d'interaction et la stratégie commence automatiquement à générer des données de marché aléatoires.
4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件
Nous pouvons alors utiliser ces données générées au hasard pour faire des tests aléatoires en utilisant une stratégie.
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Les informations ci-dessus sont utilisées pour configurer et ajuster les données.http://xxx.xxx.xxx.xxx:9090
C'est l'adresse IP du serveur et le port d'ouverture du disque de stratégie généré au hasard.
Il s'agit d'une source de données personnalisée, qui peut être consultée dans la section sur les sources de données personnalisées de la documentation de l'API de la plateforme.
6°, le système de retouche est configuré pour tester les données aléatoires avec les bonnes sources de données
À ce stade, le système de retouche a été testé avec des données analogiques de nos puces. Selon les données du graphique de marché au moment du retouche, les données sont les mêmes que celles du graphique de marché généré au hasard, heure: 17h00 le 16 octobre 2024.
7, oui, j'ai presque oublié de le dire! Le but de la création d'un disque dur est de faciliter la démonstration, l'opération et l'affichage des données générées par les lignes K. Dans les applications réelles, il est possible d'écrire un script python indépendant sans avoir à exécuter le disque dur.
Le code source de la stratégie:Générateur de transactions aléatoires du système de retouche
Merci de votre soutien et de votre lecture.