[TOC]
발명자 양자 거래 플랫폼의 리테스트 시스템은 초기 기본 리테스트 기능에서 기능을 점차 증가시키고 성능을 최적화하기 위해 끊임없이 반복적으로 업그레이드되는 리테스트 시스템입니다. 플랫폼의 발전과 함께 리테스트 시스템이 지속적으로 업그레이드됩니다. 오늘 우리는 리테스트 시스템에 기반한 주제에 대해 논의합니다.
양적 거래 분야에서 전략의 개발과 최적화는 실제 시장 데이터의 검증과 분리되어 있지 않다. 그러나 실제 응용에서는 시장 환경이 복잡하고 변동적이기 때문에 역사적 데이터에 의존하는 재검토가 부족할 수 있다. 예를 들어 극단적인 시장이나 특별한 시나리오에 대한 보도가 부족하다. 따라서 효율적인 무작위 거래 생성기를 설계하는 것은 양적 전략 개발자의 효과적인 도구가 된다.
어떤 거래소, 어떤 통화에서 전략이 역사적인 데이터로 거슬러 올라갈 필요가 있을 때, FMZ 플랫폼의 공식 데이터 소스를 사용하여 다시 테스트 할 수 있습니다. 때로는 전략이 완전히 낯선 시장에서 어떻게 작동하는지보고 싶어 할 때, 우리는 전략 테스트를 위해 데이터를 조작 할 수 있습니다.
이 자료를 이용하면,
전략은 트렌드와 격동 전환에 적응할 수 있습니까? 이 전략은 극단적인 시장에서 큰 손실을 줄 수 있습니까?
이 전략은 어떤 시장 구조에 지나치게 의존하는가? 이 경우, 이 자리는 다른 자리로 이동할 수 있습니다.
그러나 합리적인 평가 전략도 필요합니다. 무작위로 생성된 시장 데이터에 대해 주의해야 합니다.
이렇게 많은 것을 말하면서, 우리는 어떻게 데이터를 조작할 수 있을까요? 어떻게 쉽고 빠르게, 쉽게 데이터를 조작할 수 있을까요?
이 논문은 비교적 간단한 무작위 시장 생성 계산을 제공하기 위해 기획되어 있으며, 실제로는 다양한 모의 알고리즘, 데이터 모델 등의 기술이 적용될 수 있으며, 논의의 범위가 제한되어 있기 때문에 특히 복잡한 데이터 모의 방법을 사용하지 않습니다.
플랫폼 검색 시스템의 사용자 정의 데이터 소스 기능과 함께, 우리는 파이썬 언어로 프로그램을 작성했습니다.
일부 K선 데이터 생성 기준, 파일 저장 등에 대해서는 다음과 같은 매개 변수 컨트롤을 정의할 수 있다.
데이터의 무작위 생성 패턴 K선 데이터를 모방하는 변동형의 경우, 단순히 무작위 수를 양적 음의 확률로 다르게 사용하는 간단한 설계에 불과하며, 생성된 데이터가 많지 않을 때 필요한 동작 패턴을 반영하지 않을 수 있다. 더 나은 방법이 있다면, 이 부분 코드를 대체할 수 있다. 이 간단한 설계에 기초하여, 코드 내의 무작위 숫자 생성 범위와 일부 계수들을 조정하면 생성된 데이터 효과에 영향을 줄 수 있다.
데이터 검사 생성된 K-선 데이터에 대해서도 합리성 검사가 필요하며, 높은 가격과 낮은 가격의 정의를 위반하는지 확인하고, K-선 데이터의 연속성을 검사합니다.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility
else:
change = random.uniform(-0.5, 0.5) * volatility
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 5
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
1, 위의 정책의 예를 만들고, 파라미터를 구성하고, 실행하십시오. 2, 디스크 ((정책 예제) 는 서버에 배포된 호스트에서 실행되어야하며, 공개 IP가 필요하기 때문에 검색 시스템이 액세스 할 수 있기 때문에 데이터를 얻을 수 있습니다. 세 번째, 인터랙션 버튼을 클릭하면 전략이 자동으로 무작위 시장 데이터를 생성합니다.
4、生成好的数据会显示在图表上,方便观察,同时数据会记录在本地的data.csv文件
5. 이제 우리는 무작위로 생성된 데이터를 사용할 수 있습니다.
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
위 정보에 따라 구성, 개조.http://xxx.xxx.xxx.xxx:9090
임의로 생성된 정책 디스크의 서버 IP 주소와 열린 포트이다.
이것은 사용자 지정 데이터 소스입니다. 플랫폼 API 문서의 사용자 지정 데이터 소스 섹션을 참조하십시오.
6. 리코딩 시스템을 설정하면 무작위 시장 데이터를 테스트 할 수 있습니다.
이 때 재검토 시스템은 우리의
7, 오, 맞습니다, 거의 잊어 버렸어요! 이 무작위 행렬 생성기의 파이썬 프로그램이 실제 디스크를 만드는 이유는 생성된 K 라인 데이터를 설명, 조작, 표시하는 데 편리하기 위해서입니다. 실제 응용 프로그램에서는 독립적인 파이썬 스크립트를 완전히 쓸 수 있습니다.
이 전략의 소스 코드는:재검토 시스템 무작위 거래 생성기
이 글은 많은 사람들에게 알려졌습니다.