Les ressources ont été chargées... Je charge...

Une méthode de test stratégique basée sur un générateur de marché aléatoire

Auteur:L'inventeur de la quantification - un petit rêve, Créé: 2024-11-29 16:35:44, Mis à jour: 2024-12-02 09:12:43

[TOC] Je vous en prie.

img

Préambule

Le système de retouche de la plate-forme de négociation quantitative de l'inventeur est un système de retouche qui est constamment mis à jour et mis à niveau, augmentant progressivement les fonctionnalités et optimisant les performances de la fonction de retouche de base initiale. Au fur et à mesure que la plate-forme évolue, le système de retouche est constamment optimisé.

Les besoins

Dans le domaine de la quantification des transactions, le développement et l'optimisation des stratégies sont indissociables de la vérification des données du marché réel. Cependant, dans les applications pratiques, il peut y avoir une insuffisance de la retrospection basée sur les données historiques, par exemple une absence de couverture des marchés extrêmes ou des scénarios particuliers, en raison de la complexité et de la variabilité de l'environnement du marché.

Lorsque nous avons besoin d'une stratégie qui renvoie à des données historiques sur un échange ou une devise, nous pouvons utiliser la source officielle de la plate-forme FMZ pour la revoir. Parfois, nous voulons voir comment la stratégie se comporte dans un marché totalement inconnu.

L'utilisation de données de marché aléatoires signifie:

    1. Évaluer la robustesse de la stratégie Les générateurs de marché aléatoires peuvent créer une variété de scénarios de marché possibles, y compris des marchés extrêmes, bas, tendance et volatiles. Tester une stratégie dans ces environnements analogiques peut aider à évaluer si elle se débrouille de manière stable dans différentes conditions de marché. Par exemple:

    Les stratégies sont-elles adaptées aux tendances et aux changements? La stratégie pourrait-elle entraîner de lourdes pertes dans les marchés extrêmes?

    1. Identifier les faiblesses potentielles de la stratégie Il est possible de détecter les faiblesses potentielles de la stratégie et de l'améliorer en simulant des situations de marché anormales (par exemple, un événement supposé de cygne noir).

    La stratégie dépend-elle trop d'une certaine structure de marché? Y a-t-il un risque que les paramètres soient trop adaptés?

    1. Optimiser les paramètres stratégiques Les données générées au hasard offrent un environnement de test plus diversifié pour l'optimisation des paramètres de stratégie, sans avoir à dépendre exclusivement des données historiques. Cela permet de trouver une gamme de paramètres de stratégie plus complète et d'éviter de se limiter à des modèles de marché spécifiques dans les données historiques.
    1. Les données historiques sont insuffisantes Dans certains marchés (par exemple, les marchés émergents ou les petits marchés de négociation de devises), les données historiques peuvent ne pas être suffisantes pour couvrir toutes les conditions possibles du marché. Un générateur de marché aléatoire peut fournir un grand nombre de données complémentaires pour aider à effectuer des tests plus complets.
    1. Développement à l'itération rapide Les tests rapides avec des données aléatoires permettent d'accélérer les itérations de développement des stratégies, sans avoir à compter sur des marchés en temps réel ou des données de nettoyage et de tri fastidieuses.

Il faut aussi une stratégie d'évaluation rationnelle, et pour les données de marché générées au hasard, il faut faire attention:

  • 1, bien que les générateurs de transactions aléatoires soient utiles, leur signification dépend de la qualité des données générées et de la conception des scénarios cibles:
  • 2) La logique de génération doit être proche du marché réel: si le marché généré au hasard est complètement déconnecté de la réalité, les résultats des tests peuvent manquer de valeur de référence. Par exemple, un générateur peut être conçu en combinant des caractéristiques statistiques du marché réel (par exemple, une distribution de volatilité, des proportions de tendance).
  • 3° Les tests de données réelles ne peuvent pas être entièrement remplacés: les données aléatoires ne peuvent compléter que le développement et l'optimisation des stratégies, et les stratégies finales doivent encore être vérifiées sur des données réelles du marché.

Cela dit, comment pouvons-nous fabriquer des données de manière à ce qu'elles soient faciles, rapides et faciles à utiliser?

Des idées de conception

Ce texte a été conçu pour fournir des calculs de génération aléatoire relativement simples, mais il existe une grande variété d'algorithmes d'analyses, de modèles de données et d'autres techniques qui peuvent être appliquées, car la discussion est limitée.

En combinant les fonctionnalités de source de données personnalisées du système de retouche de plate-forme, nous avons écrit un programme en Python.

  • 1, générer au hasard un ensemble de données de ligne K écrites dans un fichier CSV pour conserver les données générées.
  • 2° créer un service qui apporte une source de données au système de retouche.
  • 3, les données générées par les lignes K sont présentées dans le graphique.

Pour certains paramètres de génération de données en ligne K, comme le stockage de fichiers, les paramètres suivants peuvent être définis:

img

  • Modèle de génération aléatoire de données Pour les types de fluctuations des données de la ligne K, il suffit d'utiliser des nombres aléatoires pour une conception simple différente de la probabilité positive négative, ce qui peut ne pas refléter le modèle de marché souhaité lorsque les données générées sont peu nombreuses. Si une meilleure méthode existe, cette partie du code peut être remplacée. Sur la base de cette conception simple, la modification de la gamme de génération de nombres aléatoires dans le code et de certains coefficients peut affecter l'effet des données générées.

  • Vérification des données Il est également nécessaire de vérifier la rationalité des données générées par la ligne K, de vérifier si les prix élevés et bas sont contraires à la définition, de vérifier la continuité des données de la ligne K, etc.

Générateur de transactions aléatoires du système de retouche

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle % 360)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
            else:
                change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 1
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

Pratique dans le système de retouche

Créer des instances de politiques, configurer des paramètres et les exécuter. Le disque réel (exemple de stratégie) doit être exécuté sur un hôte déployé sur le serveur, car il faut une adresse IP publique pour que le système de retouche puisse accéder aux données. 3° Cliquez sur le bouton d'interaction et la stratégie commence automatiquement à générer des données de marché aléatoires.

img img

4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件

img

Nous pouvons alors utiliser ces données générées au hasard pour faire des tests aléatoires en utilisant une stratégie.

img

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

Les informations ci-dessus sont utilisées pour configurer et ajuster les données.http://xxx.xxx.xxx.xxx:9090C'est l'adresse IP du serveur et le port d'ouverture du disque de stratégie généré au hasard. Il s'agit d'une source de données personnalisée, qui peut être consultée dans la section sur les sources de données personnalisées de la documentation de l'API de la plateforme.

6°, le système de retouche est configuré pour tester les données aléatoires avec les bonnes sources de données

img

img

À ce stade, le système de retouche a été testé avec des données analogiques de nos puces. Selon les données du graphique de marché au moment du retouche, les données sont les mêmes que celles du graphique de marché généré au hasard, heure: 17h00 le 16 octobre 2024.

7, oui, j'ai presque oublié de le dire! Le but de la création d'un disque dur est de faciliter la démonstration, l'opération et l'affichage des données générées par les lignes K. Dans les applications réelles, il est possible d'écrire un script python indépendant sans avoir à exécuter le disque dur.

Le code source de la stratégie:Générateur de transactions aléatoires du système de retouche

Merci de votre soutien et de votre lecture.


Plus de