资源加载中... loading...

手把手教你给行情收集器升级回测自定义数据源功能

Author: 发明者量化-小小梦, Created: 2020-05-07 17:43:54, Updated: 2023-10-09 22:47:43

img

手把手教你给行情收集器升级回测自定义数据源功能

上一篇文章手把手教你实现一个行情收集器我们一起实现了一个收集行情的机器人程序,收集了行情数据接下来怎么使用呢?当然是用于回测系统了,这里依托于发明者量化交易平台回测系统的自定义数据源功能,我们就可以直接把收集到的数据作为回测系统的数据源,这样我们就可以让回测系统应用于任何我们想回测历史数据的市场了。

因此,我们可以给「行情收集器」来一个升级!让行情收集器同时可以作为自定义数据源给回测系统提供数据。

有了需求,动手!

准备

和上次文章中的准备工作有所不同,上一次是在我的本机MAC电脑上运行的托管者程序,安装mongodb数据库启动数据库服务。这次我们把运行环境换到VPS上,使用阿里云linux服务器,来运行我们这一套程序。

  • mongodb数据库

    和上篇文章一样,需要在行情收集器程序运行的设备上安装mongodb数据库,并且开启服务。和在MAC电脑上安装mongodb基本一样,网上有不少教程,可以搜索看下,很简单。

  • 安装python3 程序使用python3语言,注意用到了一些库,没有的话需要安装。

    • pymongo
    • http
    • urllib
  • 托管者 运行一个发明者量化交易平台的托管者即可。

改造「行情收集器」

行情收集器即RecordsCollecter (教学)这个策略。 我们来对它做一些改造: 在程序进入收集数据的while循环之前,使用多线程库,并发执行启动一个服务,用来监听发明者量化交易平台回测系统的数据请求。 (其它的一些细节修改可以忽略)

RecordsCollecter (升级提供自定义数据源功能)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

测试

配置机器人 img

运行机器人,运行行情收集器。 img

打开一个测试策略,进行回测,例如这样的回测策略,测试一下。

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

配置回测选项,设置交易所为币安是因为暂时自定义数据源还不能自己制定一个交易所名称,只能借用列表中的某个交易所配置上,回测时显示的是币安,实际是wexApp模拟盘的数据。

img

对比回测系统根据行情收集器作为自定义数据源回测生成的图表和wexApp交易所页面上的1小时K线图表是否相同。

img

img

这样就可以让VPS上的机器人自己收集K线数据,而我们可以随时获取这些收集的数据直接在回测系统回测了。 抛砖引玉,各位大神还可以继续扩展,例如支持实盘级别回测自定义数据源,支持多品种、多市场数据收集等等功能。

欢迎留言。


Related

More

wuzhentao 而且一直没有”自定义数据源服务接收到请求“这个日志的打印 这个是什么问题呢

wuzhentao 回测时候 选择自定义数据 下面开始用勾选的交易对数据 这种情况怎么处理 我已经用自己的VPS把数据搞定了

lcgs006 如果收集到了上面没有提供的币对的数据,要回测一些小品种货币对,如DOT_USDT,回测时币种不能自定义,那么,要如何实现?

zltim

NoworNever 回测选择自定义数据源的话,是不是只支持一个交易对?

发明者量化-小小梦 需要在服务器上运行这个帖子里说的「行情收集器」。作为给FMZ回测系统自定义数据源功能,提供数据。按照帖子上做就可以,这个测试过的。

wuzhentao 都有的,现在自定义数据源地址填写后,下面回测数据没有显示出新的数据。是 vps需要起什么服务?

发明者量化-小小梦 使用自定义数据源功能之后,你还需要在右侧控件填写自定义数据源的服务地址。

发明者量化-小小梦 这个实盘必须运行在服务器上具备外网IP,让回测系统页面能访问到。

发明者量化-小小梦 你还没明白我说的意思, 我是说你的自定义数据源提供的数据 比如实际是 EOS_USDT 的,但是FMZ上只能选择BTC_USDT这样的交易对,你就拿这个EOS_USDT的实际数据当做BTC_USDT的提供给FMZ回测系统。明白了么。回测的时候虽然显示的是BTC_USDT, 但是没什么关系,数据价格都是EOS的。

lcgs006 那需要在哪里替代呢,或者有没有相关的教程?

发明者量化-小小梦 不用交易对名字 一样, 用个代替就行了数据价格是你收集的数据就可以了。

发明者量化-小小梦 可以给数据源提供的这个服务程序多写几个不同的交易对数据提供,回测系统会自己调用需要的。