Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

Créé le: 2024-11-29 16:35:44, Mis à jour le: 2024-12-02 09:12:43
comments   0
hits   304

[TOC]

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

Préface

Le système de backtesting de la plateforme de trading quantitative Inventor est un système de backtesting qui est en constante itération, mise à jour et mise à niveau. À partir des fonctions de backtesting de base initiales, il ajoute progressivement des fonctions et optimise les performances. Au fur et à mesure du développement de la plateforme, le système de backtesting continuera d’être optimisé et mis à niveau. Aujourd’hui, nous allons aborder un sujet basé sur le système de backtesting : « Tests de stratégie basés sur des conditions de marché aléatoires ».

besoin

Dans le domaine du trading quantitatif, le développement et l’optimisation des stratégies ne peuvent être séparés de la vérification des données réelles du marché. Cependant, dans les applications réelles, en raison de l’environnement de marché complexe et changeant, le recours aux données historiques pour les tests rétrospectifs peut présenter des lacunes, telles que le manque de couverture des conditions de marché extrêmes ou de scénarios particuliers. Par conséquent, la conception d’un générateur de marché aléatoire efficace devient un outil efficace pour les développeurs de stratégies quantitatives.

Lorsque nous devons tester la stratégie sur une certaine bourse ou devise en utilisant des données historiques, nous pouvons utiliser la source de données officielle de la plateforme FMZ pour le test rétrospectif. Parfois, nous souhaitons également voir comment une stratégie se comporte sur un marché totalement « inconnu ». À ce moment-là, nous pouvons « fabriquer » des données pour tester la stratégie.

L’importance de l’utilisation de données de marché aléatoires est la suivante :

  • 1. Évaluer la robustesse de la stratégie Le générateur de marché aléatoire peut créer une variété de scénarios de marché possibles, notamment une volatilité extrême, une faible volatilité, des marchés tendance et des marchés volatils. Tester une stratégie dans ces environnements simulés peut aider à évaluer si ses performances sont stables dans différentes conditions de marché. Par exemple:

La stratégie peut-elle s’adapter aux tendances et aux chocs ? La stratégie entraînera-t-elle des pertes substantielles dans des conditions de marché extrêmes ?

  • 2. Identifiez les faiblesses potentielles de votre stratégie En simulant certaines situations de marché anormales (telles que des événements hypothétiques de type cygne noir), les faiblesses potentielles de la stratégie peuvent être découvertes et améliorées. Par exemple:

La stratégie s’appuie-t-elle trop sur une structure de marché particulière ? Existe-t-il un risque de surajustement des paramètres ?

    1. Optimisation des paramètres de stratégie Les données générées aléatoirement fournissent un environnement de test plus diversifié pour le réglage des paramètres de stratégie sans avoir à s’appuyer entièrement sur des données historiques. Cela permet une gamme plus complète de paramètres de stratégie et évite d’être limité à des modèles de marché spécifiques dans les données historiques.
    1. Combler le manque de données historiques Sur certains marchés (tels que les marchés émergents ou les marchés négociant de petites devises), les données historiques peuvent ne pas être suffisantes pour couvrir toutes les conditions de marché possibles. Le randomiseur peut fournir une grande quantité de données supplémentaires pour faciliter des tests plus complets.
    1. Développement itératif rapide L’utilisation de données aléatoires pour des tests rapides peut accélérer l’itération du développement de la stratégie sans dépendre des conditions du marché en temps réel ou du nettoyage et de l’organisation des données qui prennent du temps.

Il est toutefois nécessaire d’évaluer la stratégie de manière rationnelle. Pour les données de marché générées de manière aléatoire, veuillez noter :

  • 1. Bien que les générateurs de marché aléatoire soient utiles, leur importance dépend de la qualité des données générées et de la conception du scénario cible :
  • 2. La logique de génération doit être proche du marché réel : si les conditions de marché générées aléatoirement sont complètement déconnectées de la réalité, les résultats des tests peuvent manquer de valeur de référence. Par exemple, le générateur peut être conçu en combinaison avec des caractéristiques statistiques réelles du marché (telles que la distribution de volatilité, le ratio de tendance).
  • 3. Elle ne peut pas remplacer complètement les tests sur données réelles : les données aléatoires ne peuvent que compléter le développement et l’optimisation des stratégies. La stratégie finale doit encore être vérifiée pour son efficacité sur des données de marché réelles.

Ceci étant dit, comment peut-on « fabriquer » des données ? Comment pouvons-nous « fabriquer » de manière pratique, rapide et facile des données à utiliser dans un système de backtesting ?

Idées de conception

Cet article est conçu pour fournir un point de départ à la discussion et propose un calcul de génération de marché aléatoire relativement simple. En fait, il existe une variété d’algorithmes de simulation, de modèles de données et d’autres technologies qui peuvent être appliqués. En raison de l’espace limité de la discussion , nous n’utiliserons pas de méthodes de simulation de données particulièrement complexes.

En combinant la fonction de source de données personnalisée du système de backtesting de la plateforme, nous avons écrit un programme en Python.

  • 1. Générez aléatoirement un ensemble de données de ligne K et écrivez-les dans un fichier CSV pour un enregistrement persistant, afin que les données générées puissent être enregistrées.
  • 2. Créez ensuite un service pour fournir un support de source de données pour le système de backtesting.
  • 3. Affichez les données K-line générées dans le graphique.

Pour certaines normes de génération et de stockage de fichiers de données de ligne K, les contrôles de paramètres suivants peuvent être définis :

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

  • Modèle de données généré aléatoirement Pour simuler le type de fluctuation des données K-line, une conception simple est simplement réalisée en utilisant les différentes probabilités de nombres aléatoires positifs et négatifs. Lorsque les données générées ne sont pas volumineuses, le modèle de marché requis peut ne pas être reflété. S’il existe un meilleur moyen, vous pouvez remplacer cette partie du code. Sur la base de cette conception simple, l’ajustement de la plage de génération de nombres aléatoires et de certains coefficients dans le code peut affecter l’effet des données générées.

  • Vérification des données Les données de la ligne K générées doivent également être vérifiées pour leur rationalité, pour vérifier si les prix d’ouverture élevés et les prix de clôture bas violent la définition, pour vérifier la continuité des données de la ligne K, etc.

Générateur de citations aléatoires pour système de backtesting

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle % 360)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
            else:
                change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 1
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

Pratique du backtesting système

  1. Créez l’instance de politique ci-dessus, configurez les paramètres et exécutez-la.
  2. Le marché réel (instance de stratégie) doit être exécuté sur un hôte déployé sur un serveur, car une adresse IP publique est requise pour que le système de backtesting puisse y accéder et obtenir des données.
  3. Cliquez sur le bouton interactif et la stratégie commencera automatiquement à générer des données de marché aléatoires.

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

  1. Les données générées seront affichées sur le graphique pour une observation facile, et les données seront enregistrées dans le fichier data.csv local

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

  1. Nous pouvons maintenant utiliser ces données générées aléatoirement et utiliser n’importe quelle stratégie pour le backtesting

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

Configurez selon les informations ci-dessus et effectuez des ajustements spécifiques.http://xxx.xxx.xxx.xxx:9090Il s’agit de l’adresse IP du serveur et du port ouvert du disque réel de la stratégie de génération de marché aléatoire. Il s’agit d’une source de données personnalisée. Vous pouvez vous référer à la section Source de données personnalisée dans la documentation API de la plateforme pour plus d’informations.

  1. Une fois que le système de backtest a configuré la source de données, vous pouvez tester les données de marché aléatoires

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

Discussion sur la méthode de test de stratégie basée sur un générateur de marché aléatoire

À ce stade, le système de backtesting est testé à l’aide de nos données simulées « fabriquées ». Selon les données du graphique du marché pendant le backtest, comparez les données du graphique en temps réel généré par des conditions de marché aléatoires. L’heure est 17h00 le 16 octobre 2024. Les données sont les mêmes.

  1. Oh oui, j’ai presque oublié de dire ça ! La raison pour laquelle ce programme Python générateur de marché aléatoire crée un véritable marché est de faciliter la démonstration, le fonctionnement et l’affichage des données K-line générées. Dans une application réelle, vous pouvez écrire un script Python indépendant, vous n’avez donc pas besoin d’exécuter le disque réel.

Code source de la stratégie :Générateur de citations aléatoires pour système de backtesting

Merci pour votre soutien et votre lecture.