В процессе загрузки ресурсов... загрузка...

Использование общих протоколов Плагины реализация языка JS Синхронизация задач

Автор:Изобретатели количественного измерения - мечты, Создано: 2018-07-19 19:10:48, Обновлено: 2018-07-19 19:15:42

Использование общих протоколов Плагины реализация языка JS Синхронизация задач

Всегда хотелось использовать параллельную логику, полностью отделенную от основной программы, чтобы получать действия или выполнять какие-либо другие действия при написании стратегии с помощью JavaScript. Язык python может использовать многострочную библиотеку прямо в политике, а JavaScript не может. Тем не менее, есть более простые способы обработки операций с одновременным получением, которые можно использовать с помощью функции exchange.Go.

  • Использование универсального протокола Плагины Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола Использование универсального протокола.

    Голанг бросает цинк:

/*
GOOS=linux GOARCH=amd64 go build -ldflags '-s -w -extldflags -static' AssistantEx.go
*/
package main

import (
    "bytes"
    // "crypto/md5"
    // "encoding/hex"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "sort"
    "strconv"
    "strings"
    "time"
    "crypto/tls"                           
    "context"
)

// 测试用函数 ---------------------------
func toFloat(s interface{}) float64 {
    var ret float64
    switch v := s.(type) {
    case float64:
        ret = v
    case float32:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case int:
        ret = float64(v)
    case int32:
        ret = float64(v)
    case string:
        ret, _ = strconv.ParseFloat(strings.TrimSpace(v), 64)
    }
    return ret
}

var httpClient *http.Client                                                 
func init() {                                                               
    timeout := time.Minute                                                  
    httpClient = &http.Client{                                              
        Transport: &http.Transport{
            MaxIdleConnsPerHost:   5,                                       
            TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},   
            ResponseHeaderTimeout: timeout,                                 
        },
        Timeout: timeout,                                                   
    }
}

func float2str(i float64) string {
    return strconv.FormatFloat(i, 'f', -1, 64)
}

func toInt64(s interface{}) int64 {
    var ret int64
    switch v := s.(type) {
    case int:
        ret = int64(v)
    case float64:
        ret = int64(v)
    case bool:
        if v {
            ret = 1
        } else {
            ret = 0
        }
    case int64:
        ret = v
    case string:
        ret, _ = strconv.ParseInt(strings.TrimSpace(v), 10, 64)
    }
    return ret
}

func toString(s interface{}) string {
    var ret string
    switch v := s.(type) {
    case string:
        ret = v
    case int64:
        ret = strconv.FormatInt(v, 10)
    case float64:
        ret = strconv.FormatFloat(v, 'f', -1, 64)
    case bool:
        ret = strconv.FormatBool(v)
    default:
        ret = fmt.Sprintf("%v", s)
    }
    return ret
}

type Json struct {
    data interface{}
}

func NewJson(body []byte) (*Json, error) {
    j := new(Json)
    err := j.UnmarshalJSON(body)
    if err != nil {
        return nil, err
    }
    return j, nil
}

func (j *Json) UnmarshalJSON(p []byte) error {
    return json.Unmarshal(p, &j.data)
}

func (j *Json) Get(key string) *Json {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}
        }
    }
    return &Json{nil}
}

func (j *Json) CheckGet(key string) (*Json, bool) {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}, true
        }
    }
    return nil, false
}

func (j *Json) Map() (map[string]interface{}, error) {
    if m, ok := (j.data).(map[string]interface{}); ok {
        return m, nil
    }
    return nil, errors.New("type assertion to map[string]interface{} failed")
}

func (j *Json) Array() ([]interface{}, error) {
    if a, ok := (j.data).([]interface{}); ok {
        return a, nil
    }
    return nil, errors.New("type assertion to []interface{} failed")
}

func (j *Json) Bool() (bool, error) {
    if s, ok := (j.data).(bool); ok {
        return s, nil
    }
    return false, errors.New("type assertion to bool failed")
}

func (j *Json) String() (string, error) {
    if s, ok := (j.data).(string); ok {
        return s, nil
    }
    return "", errors.New("type assertion to string failed")
}

func (j *Json) Bytes() ([]byte, error) {
    if s, ok := (j.data).(string); ok {
        return []byte(s), nil
    }
    return nil, errors.New("type assertion to []byte failed")
}

func (j *Json) Int() (int, error) {
    if f, ok := (j.data).(float64); ok {
        return int(f), nil
    }

    return -1, errors.New("type assertion to float64 failed")
}

func (j *Json) MustArray(args ...[]interface{}) []interface{} {
    var def []interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustArray() received too many arguments %d", len(args))
    }

    a, err := j.Array()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustMap(args ...map[string]interface{}) map[string]interface{} {
    var def map[string]interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustMap() received too many arguments %d", len(args))
    }

    a, err := j.Map()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustString(args ...string) string {
    var def string

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustString() received too many arguments %d", len(args))
    }

    s, err := j.String()
    if err == nil {
        return s
    }

    return def
}

func (j *Json) MustInt64() int64 {
    var ret int64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = int64(v)
    case int64:
        ret = v
    case float64:
        ret = int64(v)
    case string:
        if ret, err = strconv.ParseInt(v, 10, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to int64 failed")
    }
    return ret
}

func (j *Json) MustFloat64() float64 {
    var ret float64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case float64:
        ret = v
    case string:
        v = strings.Replace(v, ",", "", -1)
        if ret, err = strconv.ParseFloat(v, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to float64 failed")
    }
    return ret
}

type iAssistantEx struct{
    accessKey      string
    secretKey      string
    currency       string
    opCurrency     string
    baseCurrency   string
    secret         string
    secretExpires  int64
    apiBase        string
    step           int64
    newRate        float64
    timeout        time.Duration
    timeLocation   *time.Location
}

type MapSorter []Item

type Item struct {
    Key string
    Val string
}

func NewMapSorter(m map[string]string) MapSorter {
    ms := make(MapSorter, 0, len(m))

    for k, v := range m {
        if strings.HasPrefix(k, "!") {
            k = strings.Replace(k, "!", "", -1)
        }
        ms = append(ms, Item{k, v})
    }

    return ms
}

func (ms MapSorter) Len() int {
    return len(ms)
}

func (ms MapSorter) Less(i, j int) bool {
    //return ms[i].Val < ms[j].Val // 按值排序
    return ms[i].Key < ms[j].Key   // 按键排序
}

func (ms MapSorter) Swap(i, j int) {
    ms[i], ms[j] = ms[j], ms[i]
}

func encodeParams(params map[string]string, onlyPlusKeyAndValue bool) string {
    ms := NewMapSorter(params)
    sort.Sort(ms)

    v := url.Values{}
    for _, item := range ms {
        v.Add(item.Key, item.Val)
    }
    if onlyPlusKeyAndValue {
        var strKeyAndValue string
        for _, kv := range ms {
            // v.Add(item.Key, item.Val)
            strKeyAndValue += (kv.Key + kv.Val)
        }   
        return strKeyAndValue
    }
    var buf bytes.Buffer
    keys := make([]string, 0, len(v))
    for k := range v {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    for _, k := range keys {
        vs := v[k]
        prefix := k + "="
        for _, v := range vs {
            if buf.Len() > 0 {
                buf.WriteByte('&')
            }
            buf.WriteString(prefix)
            buf.WriteString(v)
        }
    }
    return buf.String()
}

func newAssistantEx(accessKey, secretKey string) *iAssistantEx {
    s := new(iAssistantEx)
    s.accessKey = accessKey
    s.secretKey = secretKey
    s.apiBase = "https://www.coinbig.com"                 

    s.timeout = 20 * time.Second
    s.timeLocation = time.FixedZone("Asia/Shanghai", 8*60*60)

    return s
}

func (p *iAssistantEx) apiCall(method string, httpType string) (*Json, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, p.apiBase + method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

/* 签名的函数,如果需要 访问签名接口时使用。
func (p *iAssistantEx) tapiCall(method string, params map[string]string, httpType string) (js *Json, err error) {
    if params == nil {
        params = map[string]string{}
    }
    params["apikey"] = p.accessKey
    params["time"] = strconv.FormatInt(time.Now().UnixNano() / 1e6, 10)            // 获取 时间戳

    h := md5.New()
    h.Write([]byte(encodeParams(params, false) + "&secret_key=" + p.secretKey))
    params["sign"] = strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
    qs := encodeParams(params, false)

    req, err := http.NewRequest(httpType, fmt.Sprintf("%s%s?%s", p.apiBase, method, qs), nil)
    
    // 测试
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "tapiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "params:", params, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    resp, err := httpClient.Do(req)                              // 使用全局 的自定义的 httpClient , HTTP 对象
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    js, err = NewJson(b)

    // 容错

    return js, err
}
*/

type RpcRequest struct {        // 结构体里的字段首字母必须大写,否则无法正常解析,结构体有导出和未导出,大写字母开头为导出。
                                // 在Unmarshal的时候会  根据 json 匹配查找该结构体的tag, 所以此处需要修饰符
    AccessKey string            `json:"access_key"`
    SecretKey string            `json:"secret_key"`
    Nonce     int64             `json:"nonce"`
    Method    string            `json:"method"`
    Params    map[string]string `json:"params"`
}

func Produce(p chan<- interface{}, ctx context.Context, method string){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Produce is finished!")   // 测试
            return
        default: 
            js, err := apiCall(method, "GET")
            if err != nil {
                p <- map[string]interface{}{
                    "error" : fmt.Sprintf("%v", err),
                }
            } else {
                p <- js.(*Json).MustMap()
            }
            time.Sleep(time.Second * 1)
        }
    }
}

func Consumer(c <-chan interface{}, ctx context.Context){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Consumer is finished!")   // 测试
            return
        default:
            ticker = <- c
            time.Sleep(time.Second * 1)
        }
    }
}

func apiCall(method string, httpType string) (interface{}, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

var ticker interface{}
var _Cancel interface{}

func OnPost(w http.ResponseWriter, r *http.Request) {
    var ret interface{}
    defer func() {
        if e := recover(); e != nil {
            if ee, ok := e.(error); ok {
                e = ee.Error()
            }
            ret = map[string]string{"error": fmt.Sprintf("%v", e)}
        }
        b, _ := json.Marshal(ret)
        w.Write(b)
    }()

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
        panic(err)
    }
    var request RpcRequest
    err = json.Unmarshal(b, &request)
    if err != nil {
        panic(err)
    }

    // request.Params      参数
    // request.Method[6:]  完整URL   , 调用时传入  type=create  创建 , type=get 获取 数据 , type=finished 终止 并发

    var data interface{}
    if strings.HasPrefix(request.Method, "__api_") {
        // 处理 请求
        url := request.Method[6:]
        cmdType := request.Params["type"]

        if cmdType == "get" {
            data = ticker
            fmt.Println("get ticker:", ticker)
        } else if cmdType == "create" {
            ch := make(chan interface{}, 1)
            ctx, cancel := context.WithCancel(context.Background())
            go Produce(ch, ctx, url)
            go Consumer(ch, ctx)
            _Cancel = cancel
            panic(errors.New("创建 Produce , Consumer"))
        } else if cmdType == "finished" {
            // 结束 并发
            _Cancel.(context.CancelFunc)()
            panic(errors.New("销毁 Produce , Consumer"))
        } else {
            // 其他 exchange.IO 请求
        }
    } else {
        panic(errors.New(request.Method + " not support"))
    }

    if err != nil {
        panic(err)
    }
    ret = map[string]interface{}{
        "data": data,
    }

    return
}

func main() {
    var addr = flag.String("b", "127.0.0.1:6666", "bind addr")
    flag.Parse()
    if *addr == "" {
        flag.Usage()
        return
    }
    basePath := "/exchange"
    log.Println("Running ", fmt.Sprintf("http://%s%s", *addr, basePath), "...")
    http.HandleFunc(basePath, OnPost)
    http.ListenAndServe(*addr, nil)
}
  • Стратегия тестирования
function main() {
    var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
    
    Log(ret)
    var i = 0
    while(1){
        var beginTime = new Date().getTime()
        var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
        var endTime = new Date().getTime()
        Log("exchange.IO 耗时(毫秒):", endTime - beginTime, " 第:", i, "次, ret1:", ret1)
        if(i == 30){
            var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")
            Log("ret2:", ret2)
        }
        Sleep(5000)
        i++
    }
}

img

插件 使用 和一般的 通用协议 一样。
用插件 代码编译个可执行程序 和托管者一起运行就可以了。

测试用的是  OCX 交易所的  ticker 接口: https://openapi.ocx.com/api/v2/tickers/btcusdt

- 创建并发程序: 
  var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")

- 获取 并发程序 实时访问的数据: 
  var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
  
- 结束并发程序:
  var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")

Если кто-то из вас не знает, что делать, то он должен знать, что делать.


Больше