Die Ressourcen sind geladen. Beförderung...

Die Hand zeigt Ihnen, wie Sie die Funktionen für die Anpassung von Datenquellen an den Rechner aktualisieren können.

Schriftsteller:Die Erfinder quantifizieren - Kleine Träume, Erstellt: 2020-05-07 17:43:54, Aktualisiert: 2023-10-09 22:47:43

img

Die Hand zeigt Ihnen, wie Sie die Funktionen für die Anpassung von Datenquellen an den Rechner aktualisieren können.

Vorheriger ArtikelHand in Hand lehrt man, wie man einen Datensammler realisiert.Wir haben gemeinsam ein Roboterprogramm implementiert, das Marktdaten sammelt, und was wird daraus gemacht? Natürlich für das Retro-System. Hier können wir uns auf die benutzerdefinierte Datenquelle des Quantifizierungssystems der Erfinder verlassen. Wir können die gesammelten Daten direkt als Datenquelle für das Retro-System verwenden, so dass wir das Retro-System für jeden Markt verwenden können, in dem wir historische Daten zurückverfolgen möchten.

So können wir einen Upgrade an "Markt-Sammler" machen, damit der Markt-Sammler die Daten gleichzeitig als benutzerdefinierte Datenquelle an das Retest-System liefert.

Wenn es eine Nachfrage gibt, dann tun Sie es!

Bereit sein

Die Vorbereitungsarbeit war anders als in dem vorherigen Artikel, als ich das Hostprogramm installierte, das auf meinem heimischen MAC-Computer läuft, um die Mongodb-Datenbank zu starten. Diesmal wechselten wir die Betriebsumgebung auf den VPS und nutzten Ali Cloud Linux Server, um unser Programm auszuführen.

  • Datenbank von mongodb

    Wie im vorherigen Artikel ist es notwendig, die Mongodb-Datenbank auf einem Gerät zu installieren, auf dem der Markt-Sammler-Programm ausgeführt wird, und den Dienst zu starten. Wie bei der Installation von Mongodb auf einem MAC-Computer gibt es im Grunde viele Tutorials im Internet.

  • Installieren von Python 3 Die Programme verwenden die Sprache Python3 und verwenden einige Bibliotheken, die nicht installiert werden müssen.

    • Pymongo
    • http
    • Urllib
  • Treuhänder Einer der größten Herausforderung für die Unternehmen ist, die Qualitäts-Trading-Plattformen zu betreiben.

Das ist ein sehr schwieriger Fall.

Die Handlungs-SammlerRecordsCollecter (Lehre)Die Strategie. Wir wollen es anpassen: Bevor ein Programm in den While-Loop der Datenerfassung gelangt, wird ein Service gestartet, der gleichzeitig mit einer Multi-Thread-Bücherei ausgeführt wird, um Datenanfragen des Systems zu überwachen. (Weitere Änderungen können ignoriert werden.)

RecordsCollector (Upgrade zur Bereitstellung von Anpassungsdatenquellen)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Tests

Konfiguration der Roboterimg

Ich habe mich in meinem eigenen Land niedergelassen, wo ich in den USA lebte und wo ich in den USA lebte.img

Sie können eine Teststrategie öffnen und eine Nachprüfung durchführen, z. B. eine Nachprüfung, um zu testen.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Die Option zum Konfigurations-Rücksehen wird auf Binance gesetzt, da die temporäre Anpassung der Datenquelle noch nicht möglich ist, einen Namen für die Börse zu erstellen, sondern nur die Konfiguration einer Börse in der Liste ausleihen kann.

img

Ob ein Chart, der von einem Marktzusammensteller als eine benutzerdefinierte Datenquelle erzeugt wurde, mit einem 1-Stunden-K-Linien-Chart auf der WexApp-Börsenseite identisch ist.

img

img

Das ermöglicht es dem Roboter auf dem VPS, die K-Line-Daten selbst zu sammeln, und wir können die gesammelten Daten jederzeit direkt im Retesting-System wiederholen. Sie können auch weitere Erweiterungen durchführen, wie z.B. Unterstützung für die Rückprüfung von kundenspezifischen Datenquellen auf dem Festplattenniveau, Unterstützung für die Sammlung von Daten für verschiedene Arten, verschiedene Märkte usw.

Ich bin sehr froh, dass Sie sich bei uns melden können.


Verwandt

Mehr

WeißwäscheUnd es gibt keine Anzeige, dass der Custom Data Source Service einen Druck aus diesem Log erhalten hat, der die Anzeige verlangt.

WeißwäscheWenn ich die Daten wiederholen möchte, wähle ich die benutzerdefinierten Daten und beginne mit den ausgewählten Transaktionen für die Daten unten.

Lcgs006Wenn man Daten von Währungspaaren sammelt, die nicht oben angegeben sind, um eine kleine Vielfalt von Währungspaaren wie DOT_USDT zurückzuschätzen, die nicht anpassen können, wie wird das erreicht?

ZltimDie Spitze

Jetzt noch nieWenn Sie eine benutzerdefinierte Datenquelle auswählen, unterstützt das Retest nur ein Transaktionspaar?

Die Erfinder quantifizieren - Kleine TräumeDer in diesem Beitrag erwähnte "Geschäfts-Sammler" muss auf dem Server ausgeführt werden.

WeißwäscheWenn Sie die Daten von Ihrem eigenen Server verwenden, werden Sie die Daten von Ihrem eigenen Server verwenden, um die Daten zu verwenden.

Die Erfinder quantifizieren - Kleine TräumeNach der Nutzung der Funktion "Custom Data Sources" müssen Sie auch die Dienstadresse für die benutzerdefinierte Datenquelle in der rechten Steuerung eingeben.

Die Erfinder quantifizieren - Kleine TräumeDiese Festplatte muss auf dem Server mit einer externen IP laufen, damit die Rückrufseite zugänglich sein kann.

Die Erfinder quantifizieren - Kleine TräumeSie haben noch nicht verstanden, was ich meine, ich meine, die Daten, die Ihre benutzerdefinierten Datenquellen bereitstellen, z. B. EOS_USDT, aber auf FMZ können Sie nur BTC_USDT wählen, und Sie nehmen die tatsächlichen Daten von EOS_USDT als die Daten von BTC_USDT, die an FMZ bereitgestellt werden.

Lcgs006Was ist das für eine Alternative, oder gibt es eine entsprechende Anleitung?

Die Erfinder quantifizieren - Kleine TräumeEs ist nicht so, dass Sie mit Namen handeln müssen, sondern Sie können nur die Daten, die Sie sammeln, als Datenpreis verwenden, um sie zu ersetzen.

Die Erfinder quantifizieren - Kleine TräumeDieser Dienstprogramm kann mehrere verschiedene Transaktionen für die Daten anbieten, die das Retargeting-System selbst aufruft.