संसाधन लोड हो रहा है... लोड करना...

यादृच्छिक बाजार जनरेटर पर आधारित रणनीति परीक्षण के तरीकों का पता लगाना

लेखक:आविष्कारक मात्रा - छोटे सपने, बनाया गयाः 2024-11-29 16:35:44, अद्यतनः 2024-11-29 17:47:24

[TOC]

img

प्रस्तावना

आविष्कारक क्वांटिफाइड ट्रेडिंग प्लेटफॉर्म के लिए रीट्रेस सिस्टम एक रीट्रेस सिस्टम है जो निरंतर पुनरावृत्ति और उन्नयन के साथ रिट्रेस सिस्टम है, जो मूल रीट्रेस फ़ंक्शन से शुरू होता है, धीरे-धीरे कार्यक्षमता में वृद्धि करता है, प्रदर्शन को अनुकूलित करता है। जैसे-जैसे प्लेटफ़ॉर्म का विकास होता है, रीट्रेस सिस्टम लगातार उन्नयन करता है, आज हम रीट्रेस सिस्टम के आधार पर एक विषय पर चर्चा करेंगेः "रैंडम बाजार पर आधारित रणनीतिक परीक्षण"।

जरूरतें

क्वांटिफाइड ट्रेडिंग के क्षेत्र में, रणनीतियों के विकास और अनुकूलन को वास्तविक बाजार डेटा से अलग नहीं किया जा सकता है। हालांकि, वास्तविक अनुप्रयोगों में, बाजार के परिवेश की जटिलता के कारण, ऐतिहासिक डेटा पर भरोसा करने के लिए अपर्याप्त हो सकता है, जैसे कि चरम बाजारों या विशेष परिदृश्यों की कमी। इसलिए, एक कुशल रैंडम ट्रेड जनरेटर का डिजाइन करना क्वांटिफाइड रणनीति डेवलपर्स के लिए एक प्रभावी उपकरण है।

जब हमें किसी एक्सचेंज, किसी मुद्रा पर एक रणनीति के लिए ऐतिहासिक डेटा को वापस लाने की आवश्यकता होती है, तो हम एफएमजेड प्लेटफॉर्म के आधिकारिक डेटा स्रोत का उपयोग करके पुनः परीक्षण कर सकते हैं। कभी-कभी हम यह देखना चाहते हैं कि रणनीति पूरी तरह से अपरिचित बाजार में कैसे प्रदर्शन करती है।

रैंडम बाजार डेटा का उपयोग करने का अर्थ हैः

    1. रणनीतियों की प्रभावशीलता का आकलन एक यादृच्छिक बाजार जनरेटर विभिन्न संभावित बाजार परिदृश्यों का निर्माण कर सकता है, जिनमें चरम उतार-चढ़ाव, कम उतार-चढ़ाव, ट्रेंडिंग बाजार और अस्थिर बाजार शामिल हैं। इन अनुकरणीय वातावरणों में रणनीति का परीक्षण करने से यह आकलन करने में मदद मिलती है कि क्या यह विभिन्न बाजार स्थितियों में स्थिर है। उदाहरण के लिएः

    क्या रणनीति रुझानों और उतार-चढ़ाव के लिए अनुकूल है? क्या रणनीति चरम बाजारों में भारी नुकसान का कारण बनती है?

    1. रणनीतियों की संभावित कमजोरियों की पहचान करना कुछ असामान्य बाजार स्थितियों का अनुकरण करके (उदाहरण के लिए, एक काल्पनिक ब्लैक स्वान घटना) रणनीतियों में संभावित कमजोरियों का पता लगाया जा सकता है और सुधार किया जा सकता है; उदाहरण के लिएः

    क्या रणनीति किसी बाजार संरचना पर बहुत अधिक निर्भर करती है? क्या कोई जोखिम है कि पैरामीटर बहुत अधिक फिट हो रहे हैं?

    1. रणनीतिक पैरामीटर अनुकूलित करें यादृच्छिक रूप से उत्पन्न डेटा रणनीतिक पैरामीटर समायोजन के लिए अधिक विविधतापूर्ण परीक्षण वातावरण प्रदान करता है, जो पूरी तरह से ऐतिहासिक डेटा पर निर्भर नहीं होना चाहिए। यह रणनीतियों के पैरामीटर के दायरे को अधिक व्यापक रूप से खोजने के लिए किया जाता है, जो ऐतिहासिक डेटा में विशिष्ट बाजार पैटर्न तक सीमित नहीं होता है।
    1. ऐतिहासिक आंकड़ों को भरने की कमी कुछ बाजारों (जैसे उभरते बाजार या छोटे मुद्रा व्यापार बाजार) में, ऐतिहासिक डेटा सभी संभावित बाजार स्थितियों को कवर करने के लिए पर्याप्त नहीं हो सकता है। यादृच्छिक बाजार जनरेटर बहुत सारे पूरक डेटा प्रदान कर सकते हैं जो अधिक व्यापक परीक्षण करने में मदद करते हैं।
    1. तेजी से पुनरावर्ती विकास रैंडम डेटा का उपयोग करके तेजी से परीक्षण करने से रणनीति विकास के पुनरावृत्ति की गति तेज हो सकती है, बिना वास्तविक समय के बाजार बाजार या समय लेने वाले डेटा की सफाई और संगठित करने के लिए।

लेकिन एक तर्कसंगत मूल्यांकन रणनीति की भी आवश्यकता है, और यादृच्छिक रूप से उत्पन्न बाजार डेटा के लिए सावधान रहेंः

  • 1, हालांकि एक यादृच्छिक मुद्रा जनरेटर उपयोगी है, इसका अर्थ डेटा उत्पन्न करने की गुणवत्ता और लक्ष्य परिदृश्य के डिजाइन पर निर्भर करता हैः
  • 2. जनरेटिंग लॉजिक को वास्तविक बाजार के करीब होना चाहिएः यदि एक यादृच्छिक रूप से उत्पन्न बाजार वास्तविकता से पूरी तरह से अलग है, तो परीक्षण परिणामों में संदर्भ मूल्य की कमी हो सकती है। उदाहरण के लिए, जनरेटर को वास्तविक बाजार के सांख्यिकीय विशेषताओं (जैसे उतार-चढ़ाव वितरण, प्रवृत्ति अनुपात) के साथ डिज़ाइन किया जा सकता है।
  • 3. वास्तविक डेटा परीक्षण को पूरी तरह से प्रतिस्थापित नहीं किया जा सकता हैः यादृच्छिक डेटा केवल रणनीतियों के विकास और अनुकूलन के पूरक हैं, और अंतिम रणनीति को अभी भी वास्तविक बाजार डेटा में इसकी प्रभावशीलता का सत्यापन करना होगा।

इतना कहकर, हम कैसे डेटा को आसानी से, जल्दी और आसानी से बना सकते हैं, ताकि हम डेटा का उपयोग कर सकें?

डिजाइन विचार

इस आलेख का उद्देश्य सरल रैंडम मार्केट जनरेटिंग का उपयोग करना है, लेकिन विभिन्न प्रकार के एनालिटिक्स, डेटा मॉडल और अन्य तकनीकें हैं जो विशेष रूप से जटिल डेटा एनालिटिक्स का उपयोग नहीं करती हैं क्योंकि चर्चा सीमित है।

हम पायथन भाषा का उपयोग करके एक प्रोग्राम लिखते हैं।

  • 1, CSV फ़ाइलों में स्थायी रिकॉर्ड के लिए K-लाइन डेटा के एक सेट को यादृच्छिक रूप से उत्पन्न करें, ताकि उत्पन्न डेटा को रिकॉर्ड किया जा सके।
  • 2. फिर एक सेवा बनाएं जो डेटा स्रोतों का समर्थन करता है।
  • 3. उत्पन्न K-रेखा डेटा को चार्ट में दिखाएं।

के-लाइन डेटा के लिए कुछ जनरेटिंग मानकों, फ़ाइल भंडारण आदि के लिए, निम्नलिखित पैरामीटर नियंत्रण को परिभाषित किया जा सकता हैः

img

  • डेटा के यादृच्छिक जनरेटिंग पैटर्न के-लाइन डेटा के लिए, केवल एक सरल डिजाइन के लिए, जो कि सकारात्मक नकारात्मक संभावनाओं के लिए एक अलग रैंडम संख्या का उपयोग करता है, यह आवश्यक व्यवहार पैटर्न को प्रतिबिंबित नहीं कर सकता है जब उत्पन्न डेटा कम है। यदि कोई बेहतर तरीका है, तो इस कोड के हिस्से को प्रतिस्थापित किया जा सकता है। इस सरल डिजाइन के आधार पर, कोड में यादृच्छिक संख्याओं के जनरेटिंग रेंज और कुछ गुणांक को समायोजित करने से उत्पन्न डेटा प्रभाव को प्रभावित किया जा सकता है।

  • डेटा जांच उत्पन्न K-लाइन डेटा के लिए तर्कसंगतता परीक्षण की आवश्यकता होती है, यह जांचने के लिए कि क्या उच्च या निम्न शुल्क की कीमत परिभाषा के विपरीत है, K-लाइन डेटा निरंतरता की जांच करना, आदि।

पुनः परीक्षण प्रणाली के लिए यादृच्छिक विनिमय जनरेटर

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility
            else:
                change = random.uniform(-0.5, 0.5) * volatility            

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 5
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

पुनर्मूल्यांकन प्रणाली में अभ्यास

1, नीति के उदाहरण बनाने, पैरामीटर विन्यास, और चलाने के लिए । 2, वास्तविक डिस्क (नीति उदाहरण) को सर्वर पर तैनात होस्ट पर चलाने की आवश्यकता होती है, क्योंकि डेटा प्राप्त करने के लिए सार्वजनिक आईपी की आवश्यकता होती है, जिसे पुनः परीक्षण प्रणाली द्वारा एक्सेस किया जा सकता है। 3. इंटरैक्टिव बटन पर क्लिक करें और रणनीति स्वचालित रूप से यादृच्छिक बाजार डेटा उत्पन्न करना शुरू कर देती है।

img img

4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件

img

5. अब हम इस यादृच्छिक रूप से उत्पन्न डेटा का उपयोग कर सकते हैं और एक रणनीति का उपयोग करके फिर से परीक्षण कर सकते हैं।

img

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

इस जानकारी के आधार पर कॉन्फ़िगरेशन को संशोधित किया गया है।http://xxx.xxx.xxx.xxx:9090यह सर्वर का आईपी पता और पोर्ट है जो रैंडम तरीके से नीति डिस्क उत्पन्न करता है। यह एक कस्टम डेटा स्रोत है, जिसे आप प्लेटफ़ॉर्म एपीआई दस्तावेज़ में कस्टम डेटा स्रोत अनुभाग में देख सकते हैं।

6. यदि आप एक अच्छा डेटा स्रोत सेट करते हैं, तो आप यादृच्छिक बाजार डेटा का परीक्षण कर सकते हैं।

img

img

इस समय रीट्रेसिंग सिस्टम का परीक्षण हमारे रीट्रेसिंग के लिए रीट्रेसिंग के लिए एनालॉग डेटा के साथ किया जाता है। रीट्रेसिंग के समय बाजार चार्ट में डेटा के आधार पर, रैंडम बाजार उत्पन्न वास्तविक समय चार्ट में डेटा के साथ तुलना की जाती है, समयः 16 अक्टूबर, 2024 को 17 बजे, डेटा समान है।

7. ओह हाँ, लगभग भूल गया! इस यादृच्छिक घटना जनरेटर के लिए एक पायथन प्रोग्राम एक वास्तविक डिस्क बनाने के लिए बनाया गया है ताकि यह आसान प्रदर्शन, संचालन, उत्पन्न K लाइन डेटा प्रदर्शित करने के लिए हो। वास्तविक अनुप्रयोगों में एक स्वतंत्र पायथन स्क्रिप्ट पूरी तरह से लिखी जा सकती है, इसलिए वास्तविक डिस्क को चलाने की आवश्यकता नहीं है।

रणनीति का स्रोत कोडःपुनः परीक्षण प्रणाली के लिए यादृच्छिक विनिमय जनरेटर

धन्यवाद समर्थन और पढ़ने के लिए।


अधिक