Hệ thống backtesting của nền tảng FMZ Quant Trading là một hệ thống backtesting liên tục lặp lại, cập nhật và nâng cấp. Nó thêm các chức năng và tối ưu hóa hiệu suất dần dần từ chức năng backtesting cơ bản ban đầu. Với sự phát triển của nền tảng, hệ thống backtesting sẽ tiếp tục được tối ưu hóa và nâng cấp. Hôm nay chúng ta sẽ thảo luận một chủ đề dựa trên hệ thống backtesting:
Trong lĩnh vực giao dịch định lượng, việc phát triển và tối ưu hóa các chiến lược không thể tách rời khỏi việc xác minh dữ liệu thị trường thực tế. Tuy nhiên, trong các ứng dụng thực tế, do môi trường thị trường phức tạp và thay đổi, dựa vào dữ liệu lịch sử để kiểm tra lại có thể không đủ, chẳng hạn như thiếu bảo hiểm các điều kiện thị trường cực đoan hoặc kịch bản đặc biệt. Do đó, thiết kế một máy tạo thị trường ngẫu nhiên hiệu quả đã trở thành một công cụ hiệu quả cho các nhà phát triển chiến lược định lượng.
Khi chúng ta cần để cho chiến lược theo dõi lại dữ liệu lịch sử trên một sàn giao dịch hoặc tiền tệ nhất định, chúng ta có thể sử dụng nguồn dữ liệu chính thức của nền tảng FMZ để kiểm tra lại.
Ý nghĩa của việc sử dụng dữ liệu ticker ngẫu nhiên là:
Chiến lược có thể thích nghi với xu hướng và biến động thay đổi? Liệu chiến lược sẽ gây ra tổn thất lớn trong điều kiện thị trường cực đoan?
Chiến lược có quá phụ thuộc vào một cấu trúc thị trường nhất định không? Có nguy cơ quá chuẩn không?
Tuy nhiên, cũng cần phải đánh giá chiến lược một cách hợp lý Đối với dữ liệu ticker được tạo ngẫu nhiên, xin lưu ý:
Sau khi nói nhiều như vậy, làm thế nào chúng ta có thể "lập trình" một số dữ liệu? Làm thế nào chúng ta có thể "lập trình" dữ liệu cho hệ thống backtesting để sử dụng thuận tiện, nhanh chóng và dễ dàng?
Bài viết này được thiết kế để cung cấp một điểm khởi đầu để thảo luận và cung cấp một tính toán tạo ticker ngẫu nhiên tương đối đơn giản. Trên thực tế, có nhiều thuật toán mô phỏng, mô hình dữ liệu và các công nghệ khác có thể được áp dụng. Do không gian thảo luận hạn chế, chúng tôi sẽ không sử dụng các phương pháp mô phỏng dữ liệu phức tạp.
Kết hợp chức năng nguồn dữ liệu tùy chỉnh của hệ thống backtesting nền tảng, chúng tôi đã viết một chương trình bằng Python.
Đối với một số tiêu chuẩn tạo và lưu trữ tệp dữ liệu đường K, các điều khiển tham số sau đây có thể được xác định:
Chế độ tạo dữ liệu ngẫu nhiên Để mô phỏng kiểu biến động của dữ liệu đường K, một thiết kế đơn giản chỉ đơn giản là sử dụng xác suất của các số ngẫu nhiên dương và âm. Khi dữ liệu được tạo ra không nhiều, nó có thể không phản ánh mô hình thị trường cần thiết. Nếu có một phương pháp tốt hơn, phần mã này có thể được thay thế. Dựa trên thiết kế đơn giản này, điều chỉnh phạm vi tạo số ngẫu nhiên và một số hệ số trong mã có thể ảnh hưởng đến hiệu ứng dữ liệu được tạo ra.
Kiểm tra dữ liệu Dữ liệu đường K được tạo ra cũng cần được kiểm tra tính hợp lý, để kiểm tra xem giá mở cao và giá đóng thấp có vi phạm định nghĩa hay không và để kiểm tra tính liên tục của dữ liệu đường K.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("the custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("The CSV file format is incorrect, the number of columns is different, please check!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("The CSV file format is incorrect, please check!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("data.detail: ", data["detail"], "Respond to backtesting system requests.")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("Unsupported K-line period, please use 'm', 'h', or 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("Abnormal data:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("Current path:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("The file was written successfully. The following is part of the file content:")
Log("".join(lines[:5]))
else:
Log("Failed to write the file, the file is empty!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("Start the custom data source service thread, and the data is provided by the CSV file.", ", Address/Port: 0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("Failed to start custom data source service!")
Log("error message:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("Generator parameters:", "Start time:", startTime, "End time:", endTime, "K-line period:", KLinePeriod, "Initial price:", firstPrice, "Type of volatility:", arrTrendType[trendType], "Volatility coefficient:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)
/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Theo thông tin trên, cấu hình và điều chỉnh.http://xxx.xxx.xxx.xxx:9090
là địa chỉ IP máy chủ và cổng mở của chiến lược tạo ticker ngẫu nhiên.
Đây là nguồn dữ liệu tùy chỉnh, có thể được tìm thấy trong phần nguồn dữ liệu tùy chỉnh của tài liệu API nền tảng.
Tại thời điểm này, hệ thống backtest được thử nghiệm với dữ liệu mô phỏng
Mã nguồn chiến lược:Hệ thống kiểm tra lại Random Ticker Generator
Cảm ơn vì đã ủng hộ và đọc.