手把手教你实现一个行情收集器

Author: 小小梦, Created: 2020-04-16 12:44:03, Updated: 2023-10-11 19:57:00

img

在程序化交易、量化交易中研究策略、设计策略、回测分析时离不开行情数据的支持。市面上的所有数据都收集也不现实,毕竟数据量太大。对于数字货币市场来说,发明者量化交易平台上支持有限的交易所、交易对的回测数据。如果想回测一些暂时不支持数据的交易所、交易对。可以使用自定义数据源来进行回测,但是这个前提是要自己有数据才行。所以就迫切需要一个行情收集程序,并且能持久化保存,最好还能实时获取。

这样可以解决几个需求,例如:

  • 可以给多个机器人提供数据源,可以缓解每个机器人访问交易所接口的频率。
  • 可以让机器人启动时,获取一个K线BAR数量足够多的K线数据,再也不用担心机器人起始的时候K线BAR数量不足了。
  • 可以收集小币种行情数据,用来给发明者量化交易平台回测系统提供自定义数据源,从而使用回测系统回测策略。
  • 等等…

计划使用python实现,为什么?因为很方便 :) 有了需求,动手!

准备

  • python的pymongo库

    因为要用到数据库,做持久化保存。数据选择使用MongoDB,使用Python语言写收集程序,所以需要这个数据库的驱动库。 在Python上安装pymongo即可。

  • 在托管者所在设备安装MongoDB

    例如:MAC安装MongoDB,当然WIN系统安装MongoDB也差不多,网上有很多教程,以在苹果MAC系统安装为例:

  • 下载 下载链接:https://www.mongodb.com/download-center?jmp=nav#community

  • 解压缩 下载后,解压缩到目录:/usr/local

  • 配置环境变量 终端输入:open -e .bash_profile,打开文件后,写入:export PATH=${PATH}:/usr/local/MongoDB/bin 保存后,终端使用source .bash_profile使修改生效。

  • 手动配置数据库文件目录和日志目录 创建目录/usr/local/data/db中对应的文件夹。 创建目录/usr/local/data/logs中对应的文件夹。

  • 编辑配置文件mongo.conf

    #bind_ip_all = true                        # 任何机器可以连接
    bind_ip = 127.0.0.1                        # 本机可以访问
    port = 27017                               # 实例运行在27017端口(默认)
    dbpath = /usr/local/data/db                # 数据文件夹存放地址(db要预先创建)
    logpath = /usr/local/data/logs/mongodb.log # 日志文件地址
    logappend = false                          # 启动时 添加还是重写日志文件
    fork = false                               # 是否后台运行
    auth = false                               # 开启校验用户
    
  • 运行MongoDB服务

    命令:

    ./mongod -f mongo.conf
    
  • 停止服务

    use admin;
    db.shutdownServer();
    

实现收集器程序

收集器以发明者量化交易平台上的Python机器人策略形式运行。由于本人Python水平有限,只是实现了一个简单的例子,用于展示本文的思路。

收集器策略代码:

import pymongo
import json

def main():
    Log("测试数据收集")
    
    # 连接数据库服务
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
    # 创建数据库
    huobi_DB = myDBClient["huobi"]
    
    # 打印目前数据库表
    collist = huobi_DB.list_collection_names()
    Log("collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = huobi_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = huobi_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 创建records表
    huobi_DB_Records = huobi_DB["records"]
    
    # 请求数据
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                # 逐根写入
                bar = r[i]
                huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        Sleep(10000)
        

完整策略地址:链接

使用数据

创建使用数据的策略机器人。 注意:需要勾选上「画线类库」,没有的话可以去复制一个到自己策略库。 img

import pymongo
import json

def main():
    Log("测试使用数据库数据")
    
    # 连接数据库服务
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
    # 创建数据库
    huobi_DB = myDBClient["huobi"]
    
    # 打印目前数据库表
    collist = huobi_DB.list_collection_names()
    Log("collist:", collist)
    
    # 查询数据打印
    huobi_DB_Records = huobi_DB["records"]
    
    while True:
        arrRecords = []
        for x in huobi_DB_Records.find():
            bar = {
                "High": x["High"], 
                "Low": x["Low"], 
                "Close": x["Close"], 
                "Open": x["Open"], 
                "Time": x["Time"], 
                "Volume": x["Volume"]
            }
            arrRecords.append(bar)
        
        # 使用画线类库,把取到的K线数据画出来
        ext.PlotRecords(arrRecords, "K")
        LogStatus(_D(), "records length:", len(arrRecords))
        Sleep(10000)

可以看到使用数据的策略机器人代码中没有访问任何交易所接口,通过访问数据库获取数据,行情收集器程序没有记录当前BAR的数据,收集的是已经完成状态的K线BAR,如果需要当前BAR实时数据,稍加修改即可。 当前的例子代码,只是为了演示,在访问数据库中表内的数据记录时是全部获取,这样随着收集数据时间增长,收集数据越来越多,全部查询出来会一定程度上影响性能,可以设计成只查询比当前数据新的数据,添加到当前数据中。

运行

运行托管者程序 img

在托管者所在设备,运行起来MongoDB数据库服务 ./mongod -f mongo.conf img

收集器运行,收集发明者量化交易平台的模拟盘wexAppBTC_USDT交易对: 地址:wexApp img

使用数据库数据的机器人A: img

使用数据库数据的机器人B: img

wexApp页面: img

图中可以看到,不同ID的机器人,共享使用一个数据源的K线数据。

收集任意周期的K线数据

依托于发明者量化交易平台的强大功能,我们可以轻松实现收集任意周期的K线数据。 比如,我要收集3分钟K线,交易所没有3分钟K线怎么办?没关系,可以轻松实现。

我们修改收集器机器人的配置,K线周期设置为3分钟,发明者量化交易平台会自动合成3分钟K线给收集器程序。 img

我们使用参数删除表的名称,设置:["records"]删除之前收集的1分钟K线数据表。准备收集3分钟K线数据。

启动收集器程序,再启动使用数据的策略机器人

img

img

可以看到画出的K线图表,BAR之间间隔时间就是3分钟了,每根BAR就是3分钟周期的K线柱。

下期我们尝试实现自定义数据源的需求实现。 感谢阅读


Related

More

xukitty Wonderful

zltim

zltim

homily

dsaidasi

小小梦 感谢支持,我是python小白,抛砖引玉,有更好的实现或者建议的话,期待完善。