汎用プロトコル プラグイン JS言語を実装する 同期任務

作者: リン・ハーン発明者 量化 - 微かな夢, 作成日: 2018-07-19 19:10:48, 更新日: 2018-07-19 19:15:42

汎用プロトコル プラグイン JS言語を実装する 同期任務

JavaScriptの策略を書くとき,常に主プログラムから完全に切り離された並行論理を使用し,常に行事を取得したり,他の操作を行ったりしたいと考えていました. Pythonでは,ポリシーの内側にマルチスレッドライブラリを直接使用できますが,JavaScriptではできません. しかし,同時取得の取引は,より簡単な処理もできます. exchange.Go関数を使用できます.

  • 汎用プロトコルプラグインを使用する 同期プロセスを実現する 継続する 行政インターフェースにアクセスする 行政データを収集する

ゴラング 玉:

/*
GOOS=linux GOARCH=amd64 go build -ldflags '-s -w -extldflags -static' AssistantEx.go
*/
package main

import (
    "bytes"
    // "crypto/md5"
    // "encoding/hex"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "sort"
    "strconv"
    "strings"
    "time"
    "crypto/tls"                           
    "context"
)

// 测试用函数 ---------------------------
func toFloat(s interface{}) float64 {
    var ret float64
    switch v := s.(type) {
    case float64:
        ret = v
    case float32:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case int:
        ret = float64(v)
    case int32:
        ret = float64(v)
    case string:
        ret, _ = strconv.ParseFloat(strings.TrimSpace(v), 64)
    }
    return ret
}

var httpClient *http.Client                                                 
func init() {                                                               
    timeout := time.Minute                                                  
    httpClient = &http.Client{                                              
        Transport: &http.Transport{
            MaxIdleConnsPerHost:   5,                                       
            TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},   
            ResponseHeaderTimeout: timeout,                                 
        },
        Timeout: timeout,                                                   
    }
}

func float2str(i float64) string {
    return strconv.FormatFloat(i, 'f', -1, 64)
}

func toInt64(s interface{}) int64 {
    var ret int64
    switch v := s.(type) {
    case int:
        ret = int64(v)
    case float64:
        ret = int64(v)
    case bool:
        if v {
            ret = 1
        } else {
            ret = 0
        }
    case int64:
        ret = v
    case string:
        ret, _ = strconv.ParseInt(strings.TrimSpace(v), 10, 64)
    }
    return ret
}

func toString(s interface{}) string {
    var ret string
    switch v := s.(type) {
    case string:
        ret = v
    case int64:
        ret = strconv.FormatInt(v, 10)
    case float64:
        ret = strconv.FormatFloat(v, 'f', -1, 64)
    case bool:
        ret = strconv.FormatBool(v)
    default:
        ret = fmt.Sprintf("%v", s)
    }
    return ret
}

type Json struct {
    data interface{}
}

func NewJson(body []byte) (*Json, error) {
    j := new(Json)
    err := j.UnmarshalJSON(body)
    if err != nil {
        return nil, err
    }
    return j, nil
}

func (j *Json) UnmarshalJSON(p []byte) error {
    return json.Unmarshal(p, &j.data)
}

func (j *Json) Get(key string) *Json {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}
        }
    }
    return &Json{nil}
}

func (j *Json) CheckGet(key string) (*Json, bool) {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}, true
        }
    }
    return nil, false
}

func (j *Json) Map() (map[string]interface{}, error) {
    if m, ok := (j.data).(map[string]interface{}); ok {
        return m, nil
    }
    return nil, errors.New("type assertion to map[string]interface{} failed")
}

func (j *Json) Array() ([]interface{}, error) {
    if a, ok := (j.data).([]interface{}); ok {
        return a, nil
    }
    return nil, errors.New("type assertion to []interface{} failed")
}

func (j *Json) Bool() (bool, error) {
    if s, ok := (j.data).(bool); ok {
        return s, nil
    }
    return false, errors.New("type assertion to bool failed")
}

func (j *Json) String() (string, error) {
    if s, ok := (j.data).(string); ok {
        return s, nil
    }
    return "", errors.New("type assertion to string failed")
}

func (j *Json) Bytes() ([]byte, error) {
    if s, ok := (j.data).(string); ok {
        return []byte(s), nil
    }
    return nil, errors.New("type assertion to []byte failed")
}

func (j *Json) Int() (int, error) {
    if f, ok := (j.data).(float64); ok {
        return int(f), nil
    }

    return -1, errors.New("type assertion to float64 failed")
}

func (j *Json) MustArray(args ...[]interface{}) []interface{} {
    var def []interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustArray() received too many arguments %d", len(args))
    }

    a, err := j.Array()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustMap(args ...map[string]interface{}) map[string]interface{} {
    var def map[string]interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustMap() received too many arguments %d", len(args))
    }

    a, err := j.Map()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustString(args ...string) string {
    var def string

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustString() received too many arguments %d", len(args))
    }

    s, err := j.String()
    if err == nil {
        return s
    }

    return def
}

func (j *Json) MustInt64() int64 {
    var ret int64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = int64(v)
    case int64:
        ret = v
    case float64:
        ret = int64(v)
    case string:
        if ret, err = strconv.ParseInt(v, 10, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to int64 failed")
    }
    return ret
}

func (j *Json) MustFloat64() float64 {
    var ret float64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case float64:
        ret = v
    case string:
        v = strings.Replace(v, ",", "", -1)
        if ret, err = strconv.ParseFloat(v, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to float64 failed")
    }
    return ret
}

type iAssistantEx struct{
    accessKey      string
    secretKey      string
    currency       string
    opCurrency     string
    baseCurrency   string
    secret         string
    secretExpires  int64
    apiBase        string
    step           int64
    newRate        float64
    timeout        time.Duration
    timeLocation   *time.Location
}

type MapSorter []Item

type Item struct {
    Key string
    Val string
}

func NewMapSorter(m map[string]string) MapSorter {
    ms := make(MapSorter, 0, len(m))

    for k, v := range m {
        if strings.HasPrefix(k, "!") {
            k = strings.Replace(k, "!", "", -1)
        }
        ms = append(ms, Item{k, v})
    }

    return ms
}

func (ms MapSorter) Len() int {
    return len(ms)
}

func (ms MapSorter) Less(i, j int) bool {
    //return ms[i].Val < ms[j].Val // 按值排序
    return ms[i].Key < ms[j].Key   // 按键排序
}

func (ms MapSorter) Swap(i, j int) {
    ms[i], ms[j] = ms[j], ms[i]
}

func encodeParams(params map[string]string, onlyPlusKeyAndValue bool) string {
    ms := NewMapSorter(params)
    sort.Sort(ms)

    v := url.Values{}
    for _, item := range ms {
        v.Add(item.Key, item.Val)
    }
    if onlyPlusKeyAndValue {
        var strKeyAndValue string
        for _, kv := range ms {
            // v.Add(item.Key, item.Val)
            strKeyAndValue += (kv.Key + kv.Val)
        }   
        return strKeyAndValue
    }
    var buf bytes.Buffer
    keys := make([]string, 0, len(v))
    for k := range v {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    for _, k := range keys {
        vs := v[k]
        prefix := k + "="
        for _, v := range vs {
            if buf.Len() > 0 {
                buf.WriteByte('&')
            }
            buf.WriteString(prefix)
            buf.WriteString(v)
        }
    }
    return buf.String()
}

func newAssistantEx(accessKey, secretKey string) *iAssistantEx {
    s := new(iAssistantEx)
    s.accessKey = accessKey
    s.secretKey = secretKey
    s.apiBase = "https://www.coinbig.com"                 

    s.timeout = 20 * time.Second
    s.timeLocation = time.FixedZone("Asia/Shanghai", 8*60*60)

    return s
}

func (p *iAssistantEx) apiCall(method string, httpType string) (*Json, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, p.apiBase + method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

/* 签名的函数,如果需要 访问签名接口时使用。
func (p *iAssistantEx) tapiCall(method string, params map[string]string, httpType string) (js *Json, err error) {
    if params == nil {
        params = map[string]string{}
    }
    params["apikey"] = p.accessKey
    params["time"] = strconv.FormatInt(time.Now().UnixNano() / 1e6, 10)            // 获取 时间戳

    h := md5.New()
    h.Write([]byte(encodeParams(params, false) + "&secret_key=" + p.secretKey))
    params["sign"] = strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
    qs := encodeParams(params, false)

    req, err := http.NewRequest(httpType, fmt.Sprintf("%s%s?%s", p.apiBase, method, qs), nil)
    
    // 测试
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "tapiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "params:", params, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    resp, err := httpClient.Do(req)                              // 使用全局 的自定义的 httpClient , HTTP 对象
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    js, err = NewJson(b)

    // 容错

    return js, err
}
*/

type RpcRequest struct {        // 结构体里的字段首字母必须大写,否则无法正常解析,结构体有导出和未导出,大写字母开头为导出。
                                // 在Unmarshal的时候会  根据 json 匹配查找该结构体的tag, 所以此处需要修饰符
    AccessKey string            `json:"access_key"`
    SecretKey string            `json:"secret_key"`
    Nonce     int64             `json:"nonce"`
    Method    string            `json:"method"`
    Params    map[string]string `json:"params"`
}

func Produce(p chan<- interface{}, ctx context.Context, method string){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Produce is finished!")   // 测试
            return
        default: 
            js, err := apiCall(method, "GET")
            if err != nil {
                p <- map[string]interface{}{
                    "error" : fmt.Sprintf("%v", err),
                }
            } else {
                p <- js.(*Json).MustMap()
            }
            time.Sleep(time.Second * 1)
        }
    }
}

func Consumer(c <-chan interface{}, ctx context.Context){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Consumer is finished!")   // 测试
            return
        default:
            ticker = <- c
            time.Sleep(time.Second * 1)
        }
    }
}

func apiCall(method string, httpType string) (interface{}, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

var ticker interface{}
var _Cancel interface{}

func OnPost(w http.ResponseWriter, r *http.Request) {
    var ret interface{}
    defer func() {
        if e := recover(); e != nil {
            if ee, ok := e.(error); ok {
                e = ee.Error()
            }
            ret = map[string]string{"error": fmt.Sprintf("%v", e)}
        }
        b, _ := json.Marshal(ret)
        w.Write(b)
    }()

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
        panic(err)
    }
    var request RpcRequest
    err = json.Unmarshal(b, &request)
    if err != nil {
        panic(err)
    }

    // request.Params      参数
    // request.Method[6:]  完整URL   , 调用时传入  type=create  创建 , type=get 获取 数据 , type=finished 终止 并发

    var data interface{}
    if strings.HasPrefix(request.Method, "__api_") {
        // 处理 请求
        url := request.Method[6:]
        cmdType := request.Params["type"]

        if cmdType == "get" {
            data = ticker
            fmt.Println("get ticker:", ticker)
        } else if cmdType == "create" {
            ch := make(chan interface{}, 1)
            ctx, cancel := context.WithCancel(context.Background())
            go Produce(ch, ctx, url)
            go Consumer(ch, ctx)
            _Cancel = cancel
            panic(errors.New("创建 Produce , Consumer"))
        } else if cmdType == "finished" {
            // 结束 并发
            _Cancel.(context.CancelFunc)()
            panic(errors.New("销毁 Produce , Consumer"))
        } else {
            // 其他 exchange.IO 请求
        }
    } else {
        panic(errors.New(request.Method + " not support"))
    }

    if err != nil {
        panic(err)
    }
    ret = map[string]interface{}{
        "data": data,
    }

    return
}

func main() {
    var addr = flag.String("b", "127.0.0.1:6666", "bind addr")
    flag.Parse()
    if *addr == "" {
        flag.Usage()
        return
    }
    basePath := "/exchange"
    log.Println("Running ", fmt.Sprintf("http://%s%s", *addr, basePath), "...")
    http.HandleFunc(basePath, OnPost)
    http.ListenAndServe(*addr, nil)
}
  • テスト戦略
function main() {
    var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
    
    Log(ret)
    var i = 0
    while(1){
        var beginTime = new Date().getTime()
        var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
        var endTime = new Date().getTime()
        Log("exchange.IO 耗时(毫秒):", endTime - beginTime, " 第:", i, "次, ret1:", ret1)
        if(i == 30){
            var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")
            Log("ret2:", ret2)
        }
        Sleep(5000)
        i++
    }
}

使用 通用协议 插件 实现 JS 语言 并发 任务

插件 使用 和一般的 通用协议 一样。
用插件 代码编译个可执行程序 和托管者一起运行就可以了。

测试用的是  OCX 交易所的  ticker 接口: https://openapi.ocx.com/api/v2/tickers/btcusdt

- 创建并发程序: 
  var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")

- 获取 并发程序 实时访问的数据: 
  var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")

- 结束并发程序:
  var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")

間違えがある場合は,歓迎し,学ぶ必要があります.


もっと見る