Saya selalu ingin menggunakan logika paralel yang benar-benar terpisah dari program utama ketika saya menulis kebijakan menggunakan JavaScript, terus-menerus mengambil tindakan, atau melakukan beberapa operasi lainnya. Bahasa python dapat menggunakan multi-threaded library secara langsung di dalam kebijakan, sedangkan JavaScript tidak dapat melakukannya. Namun, ada juga proses yang lebih sederhana untuk mendapatkan transaksi bersamaan dengan menggunakan fungsi exchange.Go.
Menggunakan protokol umum Plugin Mengimplementasikan proses simultan Berjalan terus-menerus Mengakses antarmuka transaksi Mengumpulkan data transaksi
Golang 抛
/*
GOOS=linux GOARCH=amd64 go build -ldflags '-s -w -extldflags -static' AssistantEx.go
*/
package main
import (
"bytes"
// "crypto/md5"
// "encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"crypto/tls"
"context"
)
// 测试用函数 ---------------------------
func toFloat(s interface{}) float64 {
var ret float64
switch v := s.(type) {
case float64:
ret = v
case float32:
ret = float64(v)
case int64:
ret = float64(v)
case int:
ret = float64(v)
case int32:
ret = float64(v)
case string:
ret, _ = strconv.ParseFloat(strings.TrimSpace(v), 64)
}
return ret
}
var httpClient *http.Client
func init() {
timeout := time.Minute
httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 5,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ResponseHeaderTimeout: timeout,
},
Timeout: timeout,
}
}
func float2str(i float64) string {
return strconv.FormatFloat(i, 'f', -1, 64)
}
func toInt64(s interface{}) int64 {
var ret int64
switch v := s.(type) {
case int:
ret = int64(v)
case float64:
ret = int64(v)
case bool:
if v {
ret = 1
} else {
ret = 0
}
case int64:
ret = v
case string:
ret, _ = strconv.ParseInt(strings.TrimSpace(v), 10, 64)
}
return ret
}
func toString(s interface{}) string {
var ret string
switch v := s.(type) {
case string:
ret = v
case int64:
ret = strconv.FormatInt(v, 10)
case float64:
ret = strconv.FormatFloat(v, 'f', -1, 64)
case bool:
ret = strconv.FormatBool(v)
default:
ret = fmt.Sprintf("%v", s)
}
return ret
}
type Json struct {
data interface{}
}
func NewJson(body []byte) (*Json, error) {
j := new(Json)
err := j.UnmarshalJSON(body)
if err != nil {
return nil, err
}
return j, nil
}
func (j *Json) UnmarshalJSON(p []byte) error {
return json.Unmarshal(p, &j.data)
}
func (j *Json) Get(key string) *Json {
m, err := j.Map()
if err == nil {
if val, ok := m[key]; ok {
return &Json{val}
}
}
return &Json{nil}
}
func (j *Json) CheckGet(key string) (*Json, bool) {
m, err := j.Map()
if err == nil {
if val, ok := m[key]; ok {
return &Json{val}, true
}
}
return nil, false
}
func (j *Json) Map() (map[string]interface{}, error) {
if m, ok := (j.data).(map[string]interface{}); ok {
return m, nil
}
return nil, errors.New("type assertion to map[string]interface{} failed")
}
func (j *Json) Array() ([]interface{}, error) {
if a, ok := (j.data).([]interface{}); ok {
return a, nil
}
return nil, errors.New("type assertion to []interface{} failed")
}
func (j *Json) Bool() (bool, error) {
if s, ok := (j.data).(bool); ok {
return s, nil
}
return false, errors.New("type assertion to bool failed")
}
func (j *Json) String() (string, error) {
if s, ok := (j.data).(string); ok {
return s, nil
}
return "", errors.New("type assertion to string failed")
}
func (j *Json) Bytes() ([]byte, error) {
if s, ok := (j.data).(string); ok {
return []byte(s), nil
}
return nil, errors.New("type assertion to []byte failed")
}
func (j *Json) Int() (int, error) {
if f, ok := (j.data).(float64); ok {
return int(f), nil
}
return -1, errors.New("type assertion to float64 failed")
}
func (j *Json) MustArray(args ...[]interface{}) []interface{} {
var def []interface{}
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustArray() received too many arguments %d", len(args))
}
a, err := j.Array()
if err == nil {
return a
}
return def
}
func (j *Json) MustMap(args ...map[string]interface{}) map[string]interface{} {
var def map[string]interface{}
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustMap() received too many arguments %d", len(args))
}
a, err := j.Map()
if err == nil {
return a
}
return def
}
func (j *Json) MustString(args ...string) string {
var def string
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustString() received too many arguments %d", len(args))
}
s, err := j.String()
if err == nil {
return s
}
return def
}
func (j *Json) MustInt64() int64 {
var ret int64
var err error
switch v := j.data.(type) {
case int:
ret = int64(v)
case int64:
ret = v
case float64:
ret = int64(v)
case string:
if ret, err = strconv.ParseInt(v, 10, 64); err != nil {
panic(err)
}
default:
ret = 0
//panic("type assertion to int64 failed")
}
return ret
}
func (j *Json) MustFloat64() float64 {
var ret float64
var err error
switch v := j.data.(type) {
case int:
ret = float64(v)
case int64:
ret = float64(v)
case float64:
ret = v
case string:
v = strings.Replace(v, ",", "", -1)
if ret, err = strconv.ParseFloat(v, 64); err != nil {
panic(err)
}
default:
ret = 0
//panic("type assertion to float64 failed")
}
return ret
}
type iAssistantEx struct{
accessKey string
secretKey string
currency string
opCurrency string
baseCurrency string
secret string
secretExpires int64
apiBase string
step int64
newRate float64
timeout time.Duration
timeLocation *time.Location
}
type MapSorter []Item
type Item struct {
Key string
Val string
}
func NewMapSorter(m map[string]string) MapSorter {
ms := make(MapSorter, 0, len(m))
for k, v := range m {
if strings.HasPrefix(k, "!") {
k = strings.Replace(k, "!", "", -1)
}
ms = append(ms, Item{k, v})
}
return ms
}
func (ms MapSorter) Len() int {
return len(ms)
}
func (ms MapSorter) Less(i, j int) bool {
//return ms[i].Val < ms[j].Val // 按值排序
return ms[i].Key < ms[j].Key // 按键排序
}
func (ms MapSorter) Swap(i, j int) {
ms[i], ms[j] = ms[j], ms[i]
}
func encodeParams(params map[string]string, onlyPlusKeyAndValue bool) string {
ms := NewMapSorter(params)
sort.Sort(ms)
v := url.Values{}
for _, item := range ms {
v.Add(item.Key, item.Val)
}
if onlyPlusKeyAndValue {
var strKeyAndValue string
for _, kv := range ms {
// v.Add(item.Key, item.Val)
strKeyAndValue += (kv.Key + kv.Val)
}
return strKeyAndValue
}
var buf bytes.Buffer
keys := make([]string, 0, len(v))
for k := range v {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
vs := v[k]
prefix := k + "="
for _, v := range vs {
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(prefix)
buf.WriteString(v)
}
}
return buf.String()
}
func newAssistantEx(accessKey, secretKey string) *iAssistantEx {
s := new(iAssistantEx)
s.accessKey = accessKey
s.secretKey = secretKey
s.apiBase = "https://www.coinbig.com"
s.timeout = 20 * time.Second
s.timeLocation = time.FixedZone("Asia/Shanghai", 8*60*60)
return s
}
func (p *iAssistantEx) apiCall(method string, httpType string) (*Json, error) {
// 不用签名
req, err := http.NewRequest(httpType, p.apiBase + method, nil)
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
fmt.Println("httpType:", httpType, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err := NewJson(b)
// 容错
return js, err
}
/* 签名的函数,如果需要 访问签名接口时使用。
func (p *iAssistantEx) tapiCall(method string, params map[string]string, httpType string) (js *Json, err error) {
if params == nil {
params = map[string]string{}
}
params["apikey"] = p.accessKey
params["time"] = strconv.FormatInt(time.Now().UnixNano() / 1e6, 10) // 获取 时间戳
h := md5.New()
h.Write([]byte(encodeParams(params, false) + "&secret_key=" + p.secretKey))
params["sign"] = strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
qs := encodeParams(params, false)
req, err := http.NewRequest(httpType, fmt.Sprintf("%s%s?%s", p.apiBase, method, qs), nil)
// 测试
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "tapiCall create req:" + method, 0x1B)
fmt.Println("httpType:", httpType, "params:", params, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req) // 使用全局 的自定义的 httpClient , HTTP 对象
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err = NewJson(b)
// 容错
return js, err
}
*/
type RpcRequest struct { // 结构体里的字段首字母必须大写,否则无法正常解析,结构体有导出和未导出,大写字母开头为导出。
// 在Unmarshal的时候会 根据 json 匹配查找该结构体的tag, 所以此处需要修饰符
AccessKey string `json:"access_key"`
SecretKey string `json:"secret_key"`
Nonce int64 `json:"nonce"`
Method string `json:"method"`
Params map[string]string `json:"params"`
}
func Produce(p chan<- interface{}, ctx context.Context, method string){
for {
select {
case <-ctx.Done():
fmt.Println("Produce is finished!") // 测试
return
default:
js, err := apiCall(method, "GET")
if err != nil {
p <- map[string]interface{}{
"error" : fmt.Sprintf("%v", err),
}
} else {
p <- js.(*Json).MustMap()
}
time.Sleep(time.Second * 1)
}
}
}
func Consumer(c <-chan interface{}, ctx context.Context){
for {
select {
case <-ctx.Done():
fmt.Println("Consumer is finished!") // 测试
return
default:
ticker = <- c
time.Sleep(time.Second * 1)
}
}
}
func apiCall(method string, httpType string) (interface{}, error) {
// 不用签名
req, err := http.NewRequest(httpType, method, nil)
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "httpType:", httpType, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err := NewJson(b)
// 容错
return js, err
}
var ticker interface{}
var _Cancel interface{}
func OnPost(w http.ResponseWriter, r *http.Request) {
var ret interface{}
defer func() {
if e := recover(); e != nil {
if ee, ok := e.(error); ok {
e = ee.Error()
}
ret = map[string]string{"error": fmt.Sprintf("%v", e)}
}
b, _ := json.Marshal(ret)
w.Write(b)
}()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}
var request RpcRequest
err = json.Unmarshal(b, &request)
if err != nil {
panic(err)
}
// request.Params 参数
// request.Method[6:] 完整URL , 调用时传入 type=create 创建 , type=get 获取 数据 , type=finished 终止 并发
var data interface{}
if strings.HasPrefix(request.Method, "__api_") {
// 处理 请求
url := request.Method[6:]
cmdType := request.Params["type"]
if cmdType == "get" {
data = ticker
fmt.Println("get ticker:", ticker)
} else if cmdType == "create" {
ch := make(chan interface{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go Produce(ch, ctx, url)
go Consumer(ch, ctx)
_Cancel = cancel
panic(errors.New("创建 Produce , Consumer"))
} else if cmdType == "finished" {
// 结束 并发
_Cancel.(context.CancelFunc)()
panic(errors.New("销毁 Produce , Consumer"))
} else {
// 其他 exchange.IO 请求
}
} else {
panic(errors.New(request.Method + " not support"))
}
if err != nil {
panic(err)
}
ret = map[string]interface{}{
"data": data,
}
return
}
func main() {
var addr = flag.String("b", "127.0.0.1:6666", "bind addr")
flag.Parse()
if *addr == "" {
flag.Usage()
return
}
basePath := "/exchange"
log.Println("Running ", fmt.Sprintf("http://%s%s", *addr, basePath), "...")
http.HandleFunc(basePath, OnPost)
http.ListenAndServe(*addr, nil)
}
function main() {
var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
Log(ret)
var i = 0
while(1){
var beginTime = new Date().getTime()
var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
var endTime = new Date().getTime()
Log("exchange.IO 耗时(毫秒):", endTime - beginTime, " 第:", i, "次, ret1:", ret1)
if(i == 30){
var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")
Log("ret2:", ret2)
}
Sleep(5000)
i++
}
}
插件 使用 和一般的 通用协议 一样。
用插件 代码编译个可执行程序 和托管者一起运行就可以了。
测试用的是 OCX 交易所的 ticker 接口: https://openapi.ocx.com/api/v2/tickers/btcusdt
- 创建并发程序:
var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
- 获取 并发程序 实时访问的数据:
var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
- 结束并发程序:
var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")