리소스 로딩... 로딩...

현상 수집기 재 업그레이드 - CSV 형식 파일 수입을 지원하여 사용자 지정 데이터 소스를 제공합니다

저자:발명가들의 수량화 - 작은 꿈, 2020-05-23 15:44:47, 업데이트: 2024-12-10 20:19:56

img

사물 수집기 재 업그레이드 CSV 형식 파일 수입을 지원하여 사용자 지정 데이터 소스를 제공합니다

최근 사용자는 자신의 CSV 형식 파일을 데이터 소스로 사용해야 하며, 발명자가 거래 플랫폼의 재검토 시스템을 사용하도록 해야 한다. 발명자가 거래 플랫폼의 재검토 시스템을 사용해야 한다.

디자인 아이디어

디자인 아이디어는 매우 간단합니다. 우리는 기존의 시장 수집기 기반을 조금만 바꾸면 시장 수집기에 하나의 매개 변수를 추가할 수 있습니다.isOnlySupportCSVCSV 파일만 데이터 소스로 검색 시스템에 제공되는지 제어하기 위해 한 가지 매개 변수를 추가합니다.filePathForCSV, 시장 수집자 로봇이 실행하는 서버에 CSV 데이터 파일을 배치하는 경로를 설정하는 데 사용됩니다. 마지막으로,isOnlySupportCSV파러미터가 이렇게 설정되는지True이 변경 사항은 주로 데이터 소스 (자신 수집한 1, 2, CSV 파일의 데이터) 를 사용하기로 결정하는 데 사용됩니다.Provider클래스do_GET이 함수들은

CSV 파일이란 무엇인가요?

코마 분리 값 (Comma-Separated Values, CSV, 때로는 코마 분리 값이라고도 불린다. 왜냐하면 코마 분리 문자도 코마가 될 수 없기 때문에) 는 문서에서 순수 텍스트 형태로 테이블 데이터를 저장하는 것입니다. 순수 텍스트는 파일이 문자열이며, 이진 숫자처럼 해독되어야 할 데이터가 포함되지 않는다는 것을 의미합니다. CSV 파일은 임의의 목적 기록으로 구성되어 있으며, 기록 간격이 어떤 종류의 변선 표시로 구분됩니다. 각 기록은 필드로 구성되며, 섹션의 분리 값은 다른 문자 또는 문자열로 구성됩니다. 가장 흔한 것은 코마 또는 표기 문자입니다. 일반적으로 모든 기록은 완전히 동일한 문장 순서를 가지고 있습니다. 일반적으로 순수 텍스트 문서입니다.

CSV 파일 형식에 대한 일반적인 표준은 존재하지 않지만, 일반적으로 기록 한 줄, 첫 번째 행동 헤더와 각 줄의 데이터가 코마마 간격으로 구분되는 규칙이 있습니다.

예를 들어, 우리가 테스트하는 CSV 파일이 메모리로 열리면 다음과 같습니다.img

CSV 파일의 첫 줄은 표지표 제목입니다.

,open,high,low,close,vol

우리는 이러한 데이터 분석을 정리하고, 다시 검색 시스템 사용자 정의 데이터 소스 요구 사항을 구성하는 형식을 구성합니다.

수정된 코드

import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global isOnlySupportCSV, filePathForCSV
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)

            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            if isOnlySupportCSV:
                # 处理CSV读取,filePathForCSV路径
                listDataSequence = []
                with open(filePathForCSV, "r") as f:
                    reader = csv.reader(f)
                    # 获取表头
                    header = next(reader)
                    headerIsNoneCount = 0
                    if len(header) != len(data["schema"]):
                        Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                        return 
                    for ele in header:
                        for i in range(len(data["schema"])):
                            if data["schema"][i] == ele or ele == "":
                                if ele == "":
                                    headerIsNoneCount += 1
                                if headerIsNoneCount > 1:
                                    Log("CSV文件格式有误,请检查!", "#FF0000")
                                    return 
                                listDataSequence.append(i)
                                break
                    
                    # 读取内容
                    while True:
                        record = next(reader, -1)
                        if record == -1:
                            break
                        index = 0
                        arr = [0, 0, 0, 0, 0, 0]
                        for ele in record:
                            arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                            index += 1
                        data["data"].append(arr)
                
                Log("数据:", data, "响应回测系统请求。")
                self.wfile.write(json.dumps(data).encode())
                return 
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    if (isOnlySupportCSV):
        try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))         # 本机测试
            _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
            Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
        except BaseException as e:
            Log("启动自定义数据源服务失败!")
            Log("错误信息:", e)
            raise Exception("stop")
        while True:
            LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
            Sleep(2000)
    
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

실행 테스트

먼저 우리는 시장 수집 로봇을 실행하고, 로봇에 거래소를 추가하여 로봇을 작동시킵니다. 파라미터 설정:img

img

그리고 우리는 테스트 전략을 만들었습니다.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
}

이 방법은 매우 간단합니다.

검색 페이지, 검색 시스템으로 설정된 데이터 소스는 사용자 정의 데이터 소스이며, 주소는 시장 수집자 로봇이 실행하는 서버 주소를 채우며. 우리 CSV 파일의 데이터는 1분 K 라인이기 때문에 검색을 할 때, 우리는 K 라인 주기를 1분으로 설정한다.

img

이 로봇은 을 누르면서 다시 검색을 시작했고, 시장 수집 로봇은 데이터 요청을 받았습니다.img

재검토 시스템 실행 정책이 완료되면, 데이터 소스에서 K선 데이터에 따라 K선 그래프를 생성한다.img

이 문서의 데이터를 비교해보세요:img

img

RecordsCollector (자격화된 데이터 소스 기능을 업그레이드, CSV 데이터 파일의 데이터 소스를 지원)


관련

더 많은

조카님관리자 서버에 Python을 설치해야 하나요?

스파다 게임이제 이 사용자 지정 데이터 소스가 브라우저에서 다시 검색되고 데이터 정확성이 문제가 있습니다.

AIKPM-/upload/asset/19cfcf5244f5e2cd73173.png /upload/asset/19c100ceb1eb25a38a970.png 로봇을 꽂고, URL을 어떻게 입력해야 하는지, 제가 입력한 서버 주소 포트 번호 9090 수집기는 응답하지 않습니다.

위크스호스팅 서버에 사용자 지정 CSV 데이터 소스를 설정하여 페이지 요청에 데이터를 반환하고 다시 검색에 데이터를 반환하지 않는 경우 왜인지 물어보십시오. 데이터를 두 개의 데이터로 설정하면 https 서버 단말기가 요청에 /upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c6c4c3d286587b3e.ng /upload/asset/169e8dcdbf9c0c544pbac8.png

위크스호스팅 서버에 사용자 정의 CSV 데이터 소스를 설정하여 페이지 요청으로 데이터를 반환하고 다시 검색에 데이터를 반환하지 않고 HTTP 서버에서 /upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c4c3d28d658795b3e.png /upload/asset/169e8ddbf9c0c544png

qq89520자, 그럼, 어떻게 설정되는지 알려주세요.

설교이 지표는 이 지표에 해당하는 모든 지표에 해당하는 지표입니다.

다시 666

발명가들의 수량화 - 작은 꿈이 문서는 매우 흥미롭습니다.

스파다 게임시스템 오류입니다. 수정되었습니다.

발명가들의 수량화 - 작은 꿈API 문서에 정밀성에 대한 설명이 있습니다.

발명가들의 수량화 - 작은 꿈문장, 코드를 이해해야 합니다. 여기 CSV 파일을 데이터 소스로 사용하여 검색 시스템에 데이터를 제공하는 것에 대해 이야기하고 있습니다.

발명가들의 수량화 - 작은 꿈API 문서를 참조하십시오.

위크스사용자 지정 데이터의 교환.GetData ()) 방식을 사용하여 K줄을 사용자 지정 데이터로 변환할 수 있는지 재검토할 수 있습니까?

발명가들의 수량화 - 작은 꿈사용자 정의 데이터 소스를 제공하는 서비스는 서버에 위치해야 하며, 공개 IP가어야 합니다.

위크스어떻게 로컬로 HTTP 서버에 로컬로 검색 데이터를 설정할 수 있습니까? 로컬로 검색은 사용자 정의 데이터 소스를 검색하는 것을 지원하지 않습니까? 로컬로 검색에 exchanges: [{"eid":"Huobi","currency":"ETH_USDT","feeder":"http://127.0.0.1:9090"} 를 추가했습니다.

발명가들의 수량화 - 작은 꿈너무 많은 데이터, 웹 페이지가 로드되지 않습니다, 또한, DEMO 당신이 조사, 문제가 될 수 없습니다, 당신은 잘못 설정 된 것으로 추정됩니다.

위크스나는 csv 데이터 1분 K 라인 다른 통화 데이터이고, 다시 검색할 때 거래 쌍을 임의로 선택할 수 없기 때문에, 로봇과 다시 검색 선택한 거래소는 모두 huobi로 설정되어 있으며, 거래 쌍은 BTC-USDT입니다. 이 요청 데이터는 나는 때때로 로봇이 요청을 받을 수 있지만, 다시 검색하는 쪽에서는 데이터를 얻을 수 없습니다. 그리고 나는 csv의 시간표를 초에서 밀리 초로 변경하여 데이터를 얻을 수 없습니다.

발명가들의 수량화 - 작은 꿈BTC_USDT에 거래하는 경우, 어떤 것을 구체적으로 언급합니까? 이 정의의 데이터가 요구되는가? 예를 들어 시간 부분 밀리 초 및 초 모두 볼 수 있습니까?

발명가들의 수량화 - 작은 꿈많은 데이터도 가능하며, 테스트를 할 때 테스트했습니다.

위크스작은 양의 데이터를 얻을 수 있지만 CSV 파일을 1년 이상 데이터로 지정했을 때 얻을 수 없다는 것을 알게 되는데, 너무 많은 데이터가 영향을 미치나요? 그러면 이것은 로컬링으로 사용자 정의 데이터 소스를 열고 로컬링으로 다시 측정할 수 있습니까?

위크스제가 현재 로봇에 구성하고 있는 것은 HUOBI 거래소이고, 거래 쌍은 또한 BTC-USDT로 설정되어 있으며, 재검토 시에도 이렇게 구성되어 있으며, 재검토 코드는 또한 exchange.GetRecords ()) 함수를 사용합니다. 이 정의된 데이터가 요구되는가? 예를 들어 시간 부분 밀리 초와 초 모두 볼 수 있습니까?

발명가들의 수량화 - 작은 꿈당신은 브라우저 쪽에서 당신이 입력한 질의 파라미터 때문에, 응답 시스템은 촉발 할 수 없습니다. 로봇은 응답, 로봇이 요청을 받아들이지 않았다는 것을 알려줍니다.

발명가들의 수량화 - 작은 꿈이 문서를 통해 설정할 수 있습니다. 자신의 CSV 파일을 읽으려면 이 파일의 경로를 설정할 수 있습니다.