O recurso está a ser carregado... Carregamento...

Discussão sobre o método de teste de estratégia baseado no gerador de tickers aleatórios

Autora:FMZ~Lydia, Criado: 2024-12-02 11:26:13, Atualizado: 2024-12-02 21:39:39

img

Prefácio

O sistema de backtesting da FMZ Quant Trading Platform é um sistema de backtesting que está constantemente iterando, atualizando e atualizando. Ele adiciona funções e otimiza o desempenho gradualmente a partir da função básica inicial de backtesting. Com o desenvolvimento da plataforma, o sistema de backtesting continuará a ser otimizado e atualizado.

Demandas

No campo da negociação quantitativa, o desenvolvimento e otimização de estratégias não podem ser separados da verificação de dados reais de mercado. No entanto, em aplicações reais, devido ao ambiente de mercado complexo e em mudança, confiar em dados históricos para backtesting pode ser insuficiente, como falta de cobertura de condições extremas de mercado ou cenários especiais. Portanto, projetar um gerador de mercado aleatório eficiente tornou-se uma ferramenta eficaz para desenvolvedores de estratégias quantitativas.

Quando precisamos deixar a estratégia rastrear dados históricos em uma determinada bolsa ou moeda, podemos usar a fonte de dados oficial da plataforma FMZ para backtesting.

A importância da utilização de dados aleatórios é:

    1. Avaliação da robustez das estratégias O gerador de ticker aleatório pode criar uma variedade de cenários de mercado possíveis, incluindo volatilidade extrema, baixa volatilidade, mercados em tendência e mercados voláteis.

A estratégia pode adaptar-se à mudança de tendência e de volatilidade? A estratégia incorrerá numa grande perda em condições de mercado extremas?

    1. Identificar potenciais pontos fracos da estratégia Ao simular algumas situações anormais de mercado (como eventos hipotéticos de cisne negro), potenciais fraquezas na estratégia podem ser descobertas e melhoradas.

A estratégia baseia-se demasiado numa determinada estrutura de mercado? Existe risco de sobreajuste dos parâmetros?

    1. Optimização dos parâmetros da estratégia Os dados gerados aleatoriamente fornecem um ambiente de teste mais diversificado para a otimização de parâmetros de estratégia, sem ter que depender inteiramente de dados históricos.
    1. Preenchimento da lacuna dos dados históricos Em alguns mercados (como mercados emergentes ou pequenos mercados de negociação de moeda), os dados históricos podem não ser suficientes para cobrir todas as condições possíveis do mercado.
    1. Desenvolvimento iterativo rápido O uso de dados aleatórios para testes rápidos pode acelerar a iteração do desenvolvimento de estratégias sem depender de condições de mercado em tempo real ou de limpeza e organização de dados demorados.

No entanto, é também necessário avaliar a estratégia racionalmente.

    1. Embora os geradores aleatórios de mercado sejam úteis, a sua importância depende da qualidade dos dados gerados e da concepção do cenário-alvo:
    1. A lógica de geração precisa estar próxima do mercado real: se o mercado gerado aleatoriamente estiver completamente fora de contato com a realidade, os resultados do teste podem não ter valor de referência.
    1. O teste de dados reais não pode substituir completamente o teste de dados reais: os dados aleatórios só podem complementar o desenvolvimento e a otimização das estratégias.

Como podemos "fabricar" dados para o sistema de backtesting usar convenientemente, rapidamente e facilmente?

Ideias de Design

Este artigo é projetado para fornecer um ponto de partida para a discussão e fornece um cálculo de geração de ticker aleatório relativamente simples. Na verdade, há uma variedade de algoritmos de simulação, modelos de dados e outras tecnologias que podem ser aplicados. Devido ao espaço limitado da discussão, não usaremos métodos complexos de simulação de dados.

Combinando a função de fonte de dados personalizada do sistema de backtesting da plataforma, escrevemos um programa em Python.

    1. Gerar um conjunto de dados de linha K aleatoriamente e escrevê-los em um arquivo CSV para gravação persistente, para que os dados gerados possam ser salvos.
    1. Em seguida, criar um serviço para fornecer suporte de fonte de dados para o sistema de backtesting.
    1. Exibir os dados de linha K gerados no gráfico.

Para algumas normas de geração e armazenamento de ficheiros de dados de linha K, podem ser definidos os seguintes controles de parâmetros:

img

  • Modo de geração aleatória de dados Para a simulação do tipo de flutuação de dados de linha K, um projeto simples é feito usando a probabilidade de números aleatórios positivos e negativos. Com base neste design simples, ajustar o intervalo de geração de números aleatórios e alguns coeficientes no código pode afetar o efeito de dados gerados.

  • Verificação dos dados Os dados da linha K gerados também devem ser testados quanto à racionalidade, para verificar se os preços de abertura e de fechamento elevados violam a definição e verificar a continuidade dos dados da linha K.

Gerador de tiques aleatórios do sistema de backtesting

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("the custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("The CSV file format is incorrect, the number of columns is different, please check!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("The CSV file format is incorrect, please check!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("data.detail: ", data["detail"], "Respond to backtesting system requests.")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("Unsupported K-line period, please use 'm', 'h', or 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle % 360)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
            else:
                change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("Abnormal data:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 1
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("Current path:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("The file was written successfully. The following is part of the file content:")
                Log("".join(lines[:5]))
            else:
                Log("Failed to write the file, the file is empty!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("Start the custom data source service thread, and the data is provided by the CSV file.", ", Address/Port: 0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("Failed to start custom data source service!")
        Log("error message:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("Generator parameters:", "Start time:", startTime, "End time:", endTime, "K-line period:", KLinePeriod, "Initial price:", firstPrice, "Type of volatility:", arrTrendType[trendType], "Volatility coefficient:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

Prática no sistema de backtesting

  1. Crie a instância de estratégia acima, configure os parâmetros e execute-a.
  2. A negociação ao vivo (instância de estratégia) precisa ser executada no docker implantado no servidor, precisa de um IP de rede pública, para que o sistema de backtesting possa acessá-lo e obter dados.
  3. Clique no botão de interação e a estratégia começará a gerar dados aleatórios automaticamente.

img

  1. Os dados gerados serão exibidos no gráfico para uma fácil observação e os dados serão registados no ficheiro local data.csv.

img

  1. Agora podemos usar estes dados gerados aleatoriamente e usar qualquer estratégia para backtesting:

img

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

De acordo com as informações acima, configurar e ajustar.http://xxx.xxx.xxx.xxx:9090é o endereço IP do servidor e a porta aberta da estratégia de geração de ticker aleatório. Esta é a fonte de dados personalizada, que pode ser encontrada na seção Fonte de dados personalizada do documento API da plataforma.

  1. Depois do sistema de backtest configurar a fonte de dados, podemos testar os dados de mercado aleatórios:

img

img

Neste momento, o sistema de backtest é testado com nossos dados simulados. De acordo com os dados no gráfico do ticker durante o backtest, os dados no gráfico de negociação ao vivo gerados pelo mercado aleatório são comparados.

  1. A razão pela qual este programa Python de gerador de ticker aleatório cria uma negociação ao vivo é para facilitar a demonstração, operação e exibição de dados de linha K gerados.

Código fonte da estratégia:Gerador de tiques aleatórios do sistema de backtesting

Obrigado pelo apoio e pela leitura.


Mais.