I've always wanted to use a concurrent logic that is completely separate from the main program when writing policies using JavaScript, and keep getting the behavior, or doing some other operation. Python can use a multi-threaded library directly within the policy, while JavaScript cannot. However, there is also a simpler way to access concurrent transactions using the exchange.Go function.
Use the common protocol Plug-in Implement a concurrent process Perform continuously Access the transaction interface Collect transaction data
Golang: The golang is a symbol of love.
/*
GOOS=linux GOARCH=amd64 go build -ldflags '-s -w -extldflags -static' AssistantEx.go
*/
package main
import (
"bytes"
// "crypto/md5"
// "encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"crypto/tls"
"context"
)
// 测试用函数 ---------------------------
func toFloat(s interface{}) float64 {
var ret float64
switch v := s.(type) {
case float64:
ret = v
case float32:
ret = float64(v)
case int64:
ret = float64(v)
case int:
ret = float64(v)
case int32:
ret = float64(v)
case string:
ret, _ = strconv.ParseFloat(strings.TrimSpace(v), 64)
}
return ret
}
var httpClient *http.Client
func init() {
timeout := time.Minute
httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 5,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ResponseHeaderTimeout: timeout,
},
Timeout: timeout,
}
}
func float2str(i float64) string {
return strconv.FormatFloat(i, 'f', -1, 64)
}
func toInt64(s interface{}) int64 {
var ret int64
switch v := s.(type) {
case int:
ret = int64(v)
case float64:
ret = int64(v)
case bool:
if v {
ret = 1
} else {
ret = 0
}
case int64:
ret = v
case string:
ret, _ = strconv.ParseInt(strings.TrimSpace(v), 10, 64)
}
return ret
}
func toString(s interface{}) string {
var ret string
switch v := s.(type) {
case string:
ret = v
case int64:
ret = strconv.FormatInt(v, 10)
case float64:
ret = strconv.FormatFloat(v, 'f', -1, 64)
case bool:
ret = strconv.FormatBool(v)
default:
ret = fmt.Sprintf("%v", s)
}
return ret
}
type Json struct {
data interface{}
}
func NewJson(body []byte) (*Json, error) {
j := new(Json)
err := j.UnmarshalJSON(body)
if err != nil {
return nil, err
}
return j, nil
}
func (j *Json) UnmarshalJSON(p []byte) error {
return json.Unmarshal(p, &j.data)
}
func (j *Json) Get(key string) *Json {
m, err := j.Map()
if err == nil {
if val, ok := m[key]; ok {
return &Json{val}
}
}
return &Json{nil}
}
func (j *Json) CheckGet(key string) (*Json, bool) {
m, err := j.Map()
if err == nil {
if val, ok := m[key]; ok {
return &Json{val}, true
}
}
return nil, false
}
func (j *Json) Map() (map[string]interface{}, error) {
if m, ok := (j.data).(map[string]interface{}); ok {
return m, nil
}
return nil, errors.New("type assertion to map[string]interface{} failed")
}
func (j *Json) Array() ([]interface{}, error) {
if a, ok := (j.data).([]interface{}); ok {
return a, nil
}
return nil, errors.New("type assertion to []interface{} failed")
}
func (j *Json) Bool() (bool, error) {
if s, ok := (j.data).(bool); ok {
return s, nil
}
return false, errors.New("type assertion to bool failed")
}
func (j *Json) String() (string, error) {
if s, ok := (j.data).(string); ok {
return s, nil
}
return "", errors.New("type assertion to string failed")
}
func (j *Json) Bytes() ([]byte, error) {
if s, ok := (j.data).(string); ok {
return []byte(s), nil
}
return nil, errors.New("type assertion to []byte failed")
}
func (j *Json) Int() (int, error) {
if f, ok := (j.data).(float64); ok {
return int(f), nil
}
return -1, errors.New("type assertion to float64 failed")
}
func (j *Json) MustArray(args ...[]interface{}) []interface{} {
var def []interface{}
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustArray() received too many arguments %d", len(args))
}
a, err := j.Array()
if err == nil {
return a
}
return def
}
func (j *Json) MustMap(args ...map[string]interface{}) map[string]interface{} {
var def map[string]interface{}
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustMap() received too many arguments %d", len(args))
}
a, err := j.Map()
if err == nil {
return a
}
return def
}
func (j *Json) MustString(args ...string) string {
var def string
switch len(args) {
case 0:
case 1:
def = args[0]
default:
log.Panicf("MustString() received too many arguments %d", len(args))
}
s, err := j.String()
if err == nil {
return s
}
return def
}
func (j *Json) MustInt64() int64 {
var ret int64
var err error
switch v := j.data.(type) {
case int:
ret = int64(v)
case int64:
ret = v
case float64:
ret = int64(v)
case string:
if ret, err = strconv.ParseInt(v, 10, 64); err != nil {
panic(err)
}
default:
ret = 0
//panic("type assertion to int64 failed")
}
return ret
}
func (j *Json) MustFloat64() float64 {
var ret float64
var err error
switch v := j.data.(type) {
case int:
ret = float64(v)
case int64:
ret = float64(v)
case float64:
ret = v
case string:
v = strings.Replace(v, ",", "", -1)
if ret, err = strconv.ParseFloat(v, 64); err != nil {
panic(err)
}
default:
ret = 0
//panic("type assertion to float64 failed")
}
return ret
}
type iAssistantEx struct{
accessKey string
secretKey string
currency string
opCurrency string
baseCurrency string
secret string
secretExpires int64
apiBase string
step int64
newRate float64
timeout time.Duration
timeLocation *time.Location
}
type MapSorter []Item
type Item struct {
Key string
Val string
}
func NewMapSorter(m map[string]string) MapSorter {
ms := make(MapSorter, 0, len(m))
for k, v := range m {
if strings.HasPrefix(k, "!") {
k = strings.Replace(k, "!", "", -1)
}
ms = append(ms, Item{k, v})
}
return ms
}
func (ms MapSorter) Len() int {
return len(ms)
}
func (ms MapSorter) Less(i, j int) bool {
//return ms[i].Val < ms[j].Val // 按值排序
return ms[i].Key < ms[j].Key // 按键排序
}
func (ms MapSorter) Swap(i, j int) {
ms[i], ms[j] = ms[j], ms[i]
}
func encodeParams(params map[string]string, onlyPlusKeyAndValue bool) string {
ms := NewMapSorter(params)
sort.Sort(ms)
v := url.Values{}
for _, item := range ms {
v.Add(item.Key, item.Val)
}
if onlyPlusKeyAndValue {
var strKeyAndValue string
for _, kv := range ms {
// v.Add(item.Key, item.Val)
strKeyAndValue += (kv.Key + kv.Val)
}
return strKeyAndValue
}
var buf bytes.Buffer
keys := make([]string, 0, len(v))
for k := range v {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
vs := v[k]
prefix := k + "="
for _, v := range vs {
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(prefix)
buf.WriteString(v)
}
}
return buf.String()
}
func newAssistantEx(accessKey, secretKey string) *iAssistantEx {
s := new(iAssistantEx)
s.accessKey = accessKey
s.secretKey = secretKey
s.apiBase = "https://www.coinbig.com"
s.timeout = 20 * time.Second
s.timeLocation = time.FixedZone("Asia/Shanghai", 8*60*60)
return s
}
func (p *iAssistantEx) apiCall(method string, httpType string) (*Json, error) {
// 不用签名
req, err := http.NewRequest(httpType, p.apiBase + method, nil)
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
fmt.Println("httpType:", httpType, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err := NewJson(b)
// 容错
return js, err
}
/* 签名的函数,如果需要 访问签名接口时使用。
func (p *iAssistantEx) tapiCall(method string, params map[string]string, httpType string) (js *Json, err error) {
if params == nil {
params = map[string]string{}
}
params["apikey"] = p.accessKey
params["time"] = strconv.FormatInt(time.Now().UnixNano() / 1e6, 10) // 获取 时间戳
h := md5.New()
h.Write([]byte(encodeParams(params, false) + "&secret_key=" + p.secretKey))
params["sign"] = strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
qs := encodeParams(params, false)
req, err := http.NewRequest(httpType, fmt.Sprintf("%s%s?%s", p.apiBase, method, qs), nil)
// 测试
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "tapiCall create req:" + method, 0x1B)
fmt.Println("httpType:", httpType, "params:", params, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req) // 使用全局 的自定义的 httpClient , HTTP 对象
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err = NewJson(b)
// 容错
return js, err
}
*/
type RpcRequest struct { // 结构体里的字段首字母必须大写,否则无法正常解析,结构体有导出和未导出,大写字母开头为导出。
// 在Unmarshal的时候会 根据 json 匹配查找该结构体的tag, 所以此处需要修饰符
AccessKey string `json:"access_key"`
SecretKey string `json:"secret_key"`
Nonce int64 `json:"nonce"`
Method string `json:"method"`
Params map[string]string `json:"params"`
}
func Produce(p chan<- interface{}, ctx context.Context, method string){
for {
select {
case <-ctx.Done():
fmt.Println("Produce is finished!") // 测试
return
default:
js, err := apiCall(method, "GET")
if err != nil {
p <- map[string]interface{}{
"error" : fmt.Sprintf("%v", err),
}
} else {
p <- js.(*Json).MustMap()
}
time.Sleep(time.Second * 1)
}
}
}
func Consumer(c <-chan interface{}, ctx context.Context){
for {
select {
case <-ctx.Done():
fmt.Println("Consumer is finished!") // 测试
return
default:
ticker = <- c
time.Sleep(time.Second * 1)
}
}
}
func apiCall(method string, httpType string) (interface{}, error) {
// 不用签名
req, err := http.NewRequest(httpType, method, nil)
// fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B) // 输出颜色
fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "httpType:", httpType, "req:", req) // 测试 输出 http 请求创建
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
js, err := NewJson(b)
// 容错
return js, err
}
var ticker interface{}
var _Cancel interface{}
func OnPost(w http.ResponseWriter, r *http.Request) {
var ret interface{}
defer func() {
if e := recover(); e != nil {
if ee, ok := e.(error); ok {
e = ee.Error()
}
ret = map[string]string{"error": fmt.Sprintf("%v", e)}
}
b, _ := json.Marshal(ret)
w.Write(b)
}()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}
var request RpcRequest
err = json.Unmarshal(b, &request)
if err != nil {
panic(err)
}
// request.Params 参数
// request.Method[6:] 完整URL , 调用时传入 type=create 创建 , type=get 获取 数据 , type=finished 终止 并发
var data interface{}
if strings.HasPrefix(request.Method, "__api_") {
// 处理 请求
url := request.Method[6:]
cmdType := request.Params["type"]
if cmdType == "get" {
data = ticker
fmt.Println("get ticker:", ticker)
} else if cmdType == "create" {
ch := make(chan interface{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go Produce(ch, ctx, url)
go Consumer(ch, ctx)
_Cancel = cancel
panic(errors.New("创建 Produce , Consumer"))
} else if cmdType == "finished" {
// 结束 并发
_Cancel.(context.CancelFunc)()
panic(errors.New("销毁 Produce , Consumer"))
} else {
// 其他 exchange.IO 请求
}
} else {
panic(errors.New(request.Method + " not support"))
}
if err != nil {
panic(err)
}
ret = map[string]interface{}{
"data": data,
}
return
}
func main() {
var addr = flag.String("b", "127.0.0.1:6666", "bind addr")
flag.Parse()
if *addr == "" {
flag.Usage()
return
}
basePath := "/exchange"
log.Println("Running ", fmt.Sprintf("http://%s%s", *addr, basePath), "...")
http.HandleFunc(basePath, OnPost)
http.ListenAndServe(*addr, nil)
}
function main() {
var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
Log(ret)
var i = 0
while(1){
var beginTime = new Date().getTime()
var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
var endTime = new Date().getTime()
Log("exchange.IO 耗时(毫秒):", endTime - beginTime, " 第:", i, "次, ret1:", ret1)
if(i == 30){
var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")
Log("ret2:", ret2)
}
Sleep(5000)
i++
}
}
插件 使用 和一般的 通用协议 一样。
用插件 代码编译个可执行程序 和托管者一起运行就可以了。
测试用的是 OCX 交易所的 ticker 接口: https://openapi.ocx.com/api/v2/tickers/btcusdt
- 创建并发程序:
var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
- 获取 并发程序 实时访问的数据:
var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
- 结束并发程序:
var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")