FMZ 양자 거래 플랫폼의 백테스팅 시스템은 지속적으로 반복, 업데이트 및 업그레이드되는 백테스팅 시스템입니다. 초기 기본 백테스팅 기능에서 기능을 추가하고 성능을 점진적으로 최적화합니다. 플랫폼의 개발에 따라 백테스팅 시스템은 계속해서 최적화 및 업그레이드 될 것입니다. 오늘은 백테스팅 시스템에 기반한 주제에 대해 논의 할 것입니다.
양적 거래 분야에서 전략의 개발과 최적화는 실제 시장 데이터의 검증과 분리될 수 없다. 그러나 실제 응용 프로그램에서는 복잡하고 변화하는 시장 환경으로 인해 극한 시장 조건이나 특수 시나리오의 커버리지 부족과 같은 역 테스트를 위해 역사적 데이터에 의존하는 것이 충분하지 않을 수 있다. 따라서 효율적인 무작위 시장 생성자를 설계하는 것은 양적 전략 개발자에게 효과적인 도구가 되었다.
우리가 전략이 특정 거래소 또는 통화에 대한 역사적 데이터를 추적 할 필요가있을 때, 우리는 역 테스트를 위해 FMZ 플랫폼의 공식 데이터 소스를 사용할 수 있습니다. 때로는 전략이 완전히 낯선 시장에서 어떻게 수행하는지보고 싶어합니다. 그래서 우리는 전략을 테스트하기 위해 일부 데이터를 만들 수 있습니다.
무작위 틱어 데이터 사용의 중요성은 다음과 같습니다.
전략은 트렌드와 변동성 변화에 적응할 수 있나요? 이 전략은 극단적인 시장 조건에서 큰 손실을 입을까요?
전략은 특정 시장 구조에 너무 의존합니까? 패러미터가 너무 잘 맞을 위험이 있나요?
그러나 전략을 합리적으로 평가하는 것도 필요합니다. 무작위로 생성 된 틱어 데이터에 대해서는 다음을 참고하십시오.
이렇게 많은 것을 말하면서, 우리는 어떻게 데이터를 "제조"할 수 있을까요? 백테스팅 시스템이 편리하고 빠르고 쉽게 사용할 수 있도록 데이터를 "제조"할 수 있을까요?
이 문서는 토론의 출발점을 제공하기 위해 고안되었으며 비교적 간단한 무작위 틱어 생성 계산을 제공합니다. 실제로 다양한 시뮬레이션 알고리즘, 데이터 모델 및 기타 기술이 적용 될 수 있습니다. 논의의 제한된 공간으로 인해 복잡한 데이터 시뮬레이션 방법을 사용하지 않을 것입니다.
플랫폼 백테스팅 시스템의 사용자 정의 데이터 소스 기능을 결합하여 파이썬으로 프로그램을 작성했습니다.
일부 생성 표준 및 K-라인 데이터의 파일 저장에 대해서는 다음과 같은 매개 변수 컨트롤을 정의할 수 있습니다.
무작위 데이터 생성 모드 K-라인 데이터의 변동 유형을 시뮬레이션하기 위해, 양수와 음수의 확률을 사용하여 간단한 디자인이 만들어집니다. 생성된 데이터가 많지 않을 경우 필요한 시장 패턴을 반영하지 않을 수 있습니다. 더 나은 방법이 있다면, 코드의이 부분은 대체 될 수 있습니다. 이 간단한 디자인을 바탕으로, 무작위 숫자 생성 범위와 코드 내의 일부 계수들을 조정하면 생성된 데이터 효과에 영향을 줄 수 있습니다.
데이터 확인 생성된 K-라인 데이터의 합리성을 테스트하고, 높은 오픈 가격과 낮은 종료 가격이 정의를 위반하는지 확인하고, K-라인 데이터의 연속성을 확인해야 합니다.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("the custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("The CSV file format is incorrect, the number of columns is different, please check!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("The CSV file format is incorrect, please check!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("data.detail: ", data["detail"], "Respond to backtesting system requests.")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("Unsupported K-line period, please use 'm', 'h', or 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("Abnormal data:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("Current path:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("The file was written successfully. The following is part of the file content:")
Log("".join(lines[:5]))
else:
Log("Failed to write the file, the file is empty!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("Start the custom data source service thread, and the data is provided by the CSV file.", ", Address/Port: 0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("Failed to start custom data source service!")
Log("error message:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("Generator parameters:", "Start time:", startTime, "End time:", endTime, "K-line period:", KLinePeriod, "Initial price:", firstPrice, "Type of volatility:", arrTrendType[trendType], "Volatility coefficient:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
위의 정보에 따라 설정하고 조정합니다.http://xxx.xxx.xxx.xxx:9090
임의 틱어 생성 전략의 서버 IP 주소와 오픈 포트입니다.
이것은 플랫폼 API 문서의 사용자 지정 데이터 소스 섹션에서 찾을 수 있는 사용자 지정 데이터 소스입니다.
이 때, 백테스트 시스템은 우리의
전략 소스 코드:백테스팅 시스템 무작위 틱어 생성기
여러분의 지원과 독서 감사합니다.