资源加载中... loading...

FMZ平台外部信号接收的探讨:策略内置Http服务接收信号的完整方案

Author: 发明者量化-小小梦, Created: 2024-12-17 11:44:07, Updated: 2024-12-17 16:08:11

[TOC]

img

前言

在上一篇文章「FMZ平台外部信号接收的探讨:扩展API vs 策略内置HTTP服务」的讨论中,我们对比了两种不同的接收外部信号进行程序化交易的方式,分析了其中的细节。使用FMZ平台扩展API接收外部信号的方案在平台策略库中已经有完整的策略,本篇我们来一起实现一个完整的使用策略内置Http服务接收信号的方案。

策略实现

仿照之前的使用FMZ扩展API接入Trading View信号的策略,我们沿用之前的消息格式、消息处理方式等,对策略进行简单修改。

因为策略内建的服务可以用Http或者HTTPS,为了简单演示,我们使用Http协议,增加IP白名单验证、增加口令验证。如果有进一步增加安全性的需求,可以把策略内置服务设计为Https服务。

//信号结构
var Template = {
    Flag: "45M103Buy",     // 标识,可随意指定
    Exchange: 1,           // 指定交易所交易对
    Currency: "BTC_USDT",  // 交易对
    ContractType: "spot",  // 合约类型,swap,quarter,next_quarter,现货填写spot
    Price: "{{close}}",    // 开仓或者平仓价格,-1为市价
    Action: "buy",         // 交易类型[ buy:现货买入 , sell:现货卖出 , long:期货做多 , short:期货做空 , closesell:期货买入平空 , closebuy:期货卖出平多]
    Amount: "1",           // 交易量
}

var Success = "#5cb85c"    // 成功颜色
var Danger = "#ff0000"     // 危险颜色
var Warning = "#f0ad4e"    // 警告颜色
var buffSignal = []

// Http服务
function serverFunc(ctx, ipWhiteList, passPhrase) {
    var path = ctx.path()
    if (path == "/CommandRobot") {
        // 校验IP地址
        var fromIP = ctx.remoteAddr().split(":")[0]        
        if (ipWhiteList && ipWhiteList.length > 0) {
            var ipList = ipWhiteList.split(",")
            if (!ipList.includes(fromIP)) {
                ctx.setStatus(500)
                ctx.write("IP address not in white list")
                Log("500 Error: IP address not in white list", "#FF0000")
                return 
            }
        }

        // 校验口令
        var pass = ctx.rawQuery().length > 0 ? ctx.query("passPhrase") : ""
        if (passPhrase && passPhrase.length > 0) {
            if (pass != passPhrase) {
                ctx.setStatus(500)
                ctx.write("Authentication failed")
                Log("500 Error: Authentication failed", "#FF0000")
                return 
            }
        }

        var body = JSON.parse(ctx.body())
        threading.mainThread().postMessage(JSON.stringify(body))
        ctx.write("OK")
        // 200
    } else {
        ctx.setStatus(404)
    }
}

// 校验信号消息格式
function DiffObject(object1, object2) {
    const keys1 = Object.keys(object1)
    const keys2 = Object.keys(object2)
    if (keys1.length !== keys2.length) {
        return false
    }
    for (let i = 0; i < keys1.length; i++) {
        if (keys1[i] !== keys2[i]) {
            return false
        }
    }
    return true
}

function CheckSignal(Signal) {
    Signal.Price = parseFloat(Signal.Price)
    Signal.Amount = parseFloat(Signal.Amount)
    if (Signal.Exchange <= 0 || !Number.isInteger(Signal.Exchange)) {
        Log("交易所最小编号为1,并且为整数", Danger)
        return
    }
    if (Signal.Amount <= 0 || typeof(Signal.Amount) != "number") {
        Log("交易量不能小于0,并且为数值类型", typeof(Signal.Amount), Danger)
        return
    }
    if (typeof(Signal.Price) != "number") {
        Log("价格必须是数值", Danger)
        return
    }
    if (Signal.ContractType == "spot" && Signal.Action != "buy" && Signal.Action != "sell") {
        Log("指令为操作现货,Action错误,Action:", Signal.Action, Danger)
        return 
    }
    if (Signal.ContractType != "spot" && Signal.Action != "long" && Signal.Action != "short" && Signal.Action != "closesell" && Signal.Action != "closebuy") {
        Log("指令为操作期货,Action错误,Action:", Signal.Action, Danger)
        return 
    }
    return true
}

// 信号处理对象
function createManager() {
    var self = {}
    self.tasks = []
    
    self.process = function() {
        var processed = 0
        if (self.tasks.length > 0) {
            _.each(self.tasks, function(task) {
                if (!task.finished) {
                    processed++
                    self.pollTask(task)
                }
            })
            if (processed == 0) {
                self.tasks = []
            }
        }
    }
    
    self.newTask = function(signal) {
        // {"Flag":"45M103Buy","Exchange":1,"Currency":"BTC_USDT","ContractType":"swap","Price":"10000","Action":"buy","Amount":"0"}
        var task = {}
        task.Flag = signal["Flag"]
        task.Exchange = signal["Exchange"]
        task.Currency = signal["Currency"]
        task.ContractType = signal["ContractType"]
        task.Price = signal["Price"]
        task.Action = signal["Action"]
        task.Amount = signal["Amount"]
        task.exchangeIdx = signal["Exchange"] - 1
        task.pricePrecision = null
        task.amountPrecision = null 
        task.error = null 
        task.exchangeLabel = exchanges[task.exchangeIdx].GetLabel()
        task.finished = false 
        
        Log("创建任务:", task)
        self.tasks.push(task)
    }
    
    self.getPrecision = function(n) {
        var precision = null 
        var arr = n.toString().split(".")
        if (arr.length == 1) {
            precision = 0
        } else if (arr.length == 2) {
            precision = arr[1].length
        } 
        return precision
    }
    
    self.pollTask = function(task) {
        var e = exchanges[task.exchangeIdx]
        var name = e.GetName()
        var isFutures = true
        e.SetCurrency(task.Currency)
        if (task.ContractType != "spot" && name.indexOf("Futures_") != -1) {
            // 非现货,则设置合约
            e.SetContractType(task.ContractType)
        } else if (task.ContractType == "spot" && name.indexOf("Futures_") == -1) {
            isFutures = false 
        } else {
            task.error = "指令中的ContractType与配置的交易所对象类型不匹配"
            return 
        }
        
        var depth = e.GetDepth()
        if (!depth || !depth.Bids || !depth.Asks) {
            task.error = "订单薄数据异常"
            return 
        }
        
        if (depth.Bids.length == 0 && depth.Asks.length == 0) {
            task.error = "盘口无订单"
            return 
        }
        
        _.each([depth.Bids, depth.Asks], function(arr) {
            _.each(arr, function(order) {
                var pricePrecision = self.getPrecision(order.Price)
                var amountPrecision = self.getPrecision(order.Amount)
                if (Number.isInteger(pricePrecision) && !Number.isInteger(self.pricePrecision)) {
                    self.pricePrecision = pricePrecision
                } else if (Number.isInteger(self.pricePrecision) && Number.isInteger(pricePrecision) && pricePrecision > self.pricePrecision) {
                    self.pricePrecision = pricePrecision
                }
                if (Number.isInteger(amountPrecision) && !Number.isInteger(self.amountPrecision)) {
                    self.amountPrecision = amountPrecision
                } else if (Number.isInteger(self.amountPrecision) && Number.isInteger(amountPrecision) && amountPrecision > self.amountPrecision) {
                    self.amountPrecision = amountPrecision
                }
            })
        })

        if (!Number.isInteger(self.pricePrecision) || !Number.isInteger(self.amountPrecision)) {
            task.err = "获取精度失败"
            return 
        }
        
        e.SetPrecision(self.pricePrecision, self.amountPrecision)
        
        // buy:现货买入 , sell:现货卖出 , long:期货做多 , short:期货做空 , closesell:期货买入平空 , closebuy:期货卖出平多
        var direction = null 
        var tradeFunc = null 
        if (isFutures) {
            switch (task.Action) {
                case "long": 
                    direction = "buy"
                    tradeFunc = e.Buy 
                    break
                case "short": 
                    direction = "sell"
                    tradeFunc = e.Sell
                    break
                case "closesell": 
                    direction = "closesell"
                    tradeFunc = e.Buy 
                    break
                case "closebuy": 
                    direction = "closebuy"
                    tradeFunc = e.Sell
                    break
            }
            if (!direction || !tradeFunc) {
                task.error = "交易方向错误:" + task.Action
                return 
            }
            e.SetDirection(direction)
        } else {
            if (task.Action == "buy") {
                tradeFunc = e.Buy 
            } else if (task.Action == "sell") {
                tradeFunc = e.Sell 
            } else {
                task.error = "交易方向错误:" + task.Action
                return 
            }
        }
        var id = tradeFunc(task.Price, task.Amount)
        if (!id) {
            task.error = "下单失败"
        }
        
        task.finished = true
    }
    
    return self
}

function main() {
    // 重置日志信息
    if (isResetLog) {
        LogReset(1)
    }

    Log("交易类型[ buy:现货买入 , sell:现货卖出 , long:期货做多 , short:期货做空 , closesell:期货买入平空 , closebuy:期货卖出平多]", Danger)
    Log("指令模板:", JSON.stringify(Template), Danger)    
    if (!passPhrase || passPhrase.length == 0) {
        Log("webhook url:", `http://${serverIP}:${port}/CommandRobot`)
    } else {
        Log("webhook url:", `http://${serverIP}:${port}/CommandRobot?passPhrase=${passPhrase}`)
    }

    // 创建Http内置服务
    __Serve("http://0.0.0.0:" + port, serverFunc, ipWhiteList, passPhrase)

    // 初始化执行的代码
    if (initCode && initCode.length > 0) {
        try {
            Log("执行初始化代码:", initCode)
            eval(initCode)
        } catch(error) {
            Log("e.name:", error.name, "e.stack:", error.stack, "e.message:", error.message)
        }
    }    

    // 创建信号管理对象
    var manager = createManager()
    
    while (true) {
        try {
            // 检测交互控件,用于测试
            var cmd = GetCommand()
            if (cmd) {
                // 发送Http请求,模拟测试
                var arrCmd = cmd.split(":", 2)
                if (arrCmd[0] == "TestSignal") {
                    // {"Flag":"TestSignal","Exchange":1,"Currency":"BTC_USDT","ContractType":"swap","Price":"10000","Action":"long","Amount":"1"}
                    var signal = cmd.replace("TestSignal:", "")
                    if (!passPhrase || passPhrase.length == 0) {
                        var ret = HttpQuery(`http://${serverIP}:${port}/CommandRobot`, {"method": "POST", "body": signal})
                        Log("测试请求的应答:", ret)
                    } else {
                        var ret = HttpQuery(`http://${serverIP}:${port}/CommandRobot?passPhrase=${passPhrase}`, {"method": "POST", "body": signal})
                        Log("测试请求的应答:", ret)
                    }                    
                }
            }

            // 检测内置Http服务收到请求之后通知主线程的消息,写入manager对象的任务队列
            var msg = threading.mainThread().peekMessage(-1)
            if (msg) {
                Log("收到消息 msg:", msg)
                var objSignal = JSON.parse(msg)
                if (DiffObject(Template, objSignal)) {
                    Log("接收到交易信号指令:", objSignal)
                    buffSignal.push(objSignal)
                    
                    // 检查交易量、交易所编号
                    if (!CheckSignal(objSignal)) {
                        continue
                    }
                    
                    // 创建任务
                    if (objSignal["Flag"] == "TestSignal") {
                        Log("收到测试消息:", JSON.stringify(objSignal))
                    } else {
                        manager.newTask(objSignal)
                    }                    
                } else {
                    Log("指令无法识别", signal)
                }
            } else {
                Sleep(1000 * SleepInterval)
            }

            // 处理任务
            manager.process()
            
            // 状态栏显示信号
            if (buffSignal.length > maxBuffSignalRowDisplay) {
                buffSignal.shift()
            }
            var buffSignalTbl = {
                "type" : "table",
                "title" : "信号记录",
                "cols" : ["Flag", "Exchange", "Currency", "ContractType", "Price", "Action", "Amount"],
                "rows" : []
            }
            for (var i = buffSignal.length - 1 ; i >= 0 ; i--) {
                buffSignalTbl.rows.push([buffSignal[i].Flag, buffSignal[i].Exchange, buffSignal[i].Currency, buffSignal[i].ContractType, buffSignal[i].Price, buffSignal[i].Action, buffSignal[i].Amount])
            }

            LogStatus(_D(), "\n", "`" + JSON.stringify(buffSignalTbl) + "`")            
        } catch (error) {
            Log("e.name:", error.name, "e.stack:", error.stack, "e.message:", error.message)
        }        
    }
}

img

  • port 参数:如果使用Http协议,在Trading View上仅能设置80端口。
  • serverIP 参数:填写服务器的公网IP地址。
  • initCode 参数:可以用来切换基地址,用于交易所测试环境测试。

相对于使用扩展API方式接入外部信号的策略,策略改动不算大,仅仅是增加了一个serverFuncHttp服务处理函数,使用了FMZ平台新增的多线程消息传递方式:postMessage/peekMessage,其它代码几乎没有改动。

IP白名单

由于Trading View的Webhook发出的请求只从以下IP地址发出:

52.89.214.238
34.212.75.30
54.218.53.128
52.32.178.7

所以我们给策略增加一个参数ipWhiteList,用来设置IP白名单,不在这个IP地址白名单内的请求都忽略。

        // 校验IP地址
        var fromIP = ctx.remoteAddr().split(":")[0]        
        if (ipWhiteList && ipWhiteList.length > 0) {
            var ipList = ipWhiteList.split(",")
            if (!ipList.includes(fromIP)) {
                ctx.setStatus(500)
                ctx.write("IP address not in white list")
                Log("500 Error: IP address not in white list", "#FF0000")
                return 
            }
        }

验证口令

给策略增加一个参数passPhrase,用来设置验证口令,这个口令配置在Trading View上Webhook url设置中,验证口令不匹配的请求予以忽略。

例如我们设置的:test123456

        // 校验口令
        var pass = ctx.rawQuery().length > 0 ? ctx.query("passPhrase") : ""
        if (passPhrase && passPhrase.length > 0) {
            if (pass != passPhrase) {
                ctx.setStatus(500)
                ctx.write("Authentication failed")
                Log("500 Error: Authentication failed", "#FF0000")
                return 
            }
        }

外部信号

使用 Trading View 平台的PINE脚本作为外部信号触发来源,在Trading View官方公开的PINE脚本中随便选择了一个:

//@version=6
strategy("MovingAvg Cross", overlay=true)
length = input(9)
confirmBars = input(1)
price = close
ma = ta.sma(price, length)
bcond = price > ma
bcount = 0
bcount := bcond ? nz(bcount[1]) + 1 : 0
if (bcount == confirmBars)
	strategy.entry("MACrossLE", strategy.long, comment="long")
scond = price < ma
scount = 0
scount := scond ? nz(scount[1]) + 1 : 0
if (scount == confirmBars)
	strategy.entry("MACrossSE", strategy.short, comment="short")

当然在FMZ平台也可以直接运行PINE脚本执行实盘交易,但是如果希望让Trading View平台运行PINE脚本发出信号,就只能使用我们讨论的这些方案了。

我们需要关注这个脚本的下单函数,为了让这个PINE脚本适配我们的webhook请求中的消息,我们需要对交易函数中的comment进行修改,文章后续我们会提到。

WebhookUrl和请求body设置

对于WebhookUrl和请求body的设置,与之前的扩展API方式接入外部信号基本一致,相同的地方本篇就不再赘述,可以查阅之前的文章。

Webhook Url

img

当我们把这个PINE脚本添加到Trading View上某个市场(我们测试选择币安的ETH_USDT永续合约市场)的图表上之后,可以看到脚本已经开始运作。然后我们按照截图上所示,给脚本增加报警。

img

Webhook URL设置: 策略代码中已经设计好了自动生成webhook url,我们只用在策略运行开始的日志中复制过来就可以。

img

http://xxx.xxx.xxx.xxx:80/CommandRobot?passPhrase=test123456

Trading View的规定是Webhook url对于Http请求只能使用80端口,所以策略上我们也把端口参数设置为80,所以你看到策略生成的Webhook url的链接端口也是80。

body消息

img

然后在截图中「设置」标签里设置请求的body消息。

{
    "Flag":"{{strategy.order.id}}",
    "Exchange":1,
    "Currency":"ETH_USDT",
    "ContractType":"swap",
    "Price":"-1",
    "Action":"{{strategy.order.comment}}",
    "Amount":"{{strategy.order.contracts}}"
}

还记得刚才讲的PINE脚本中的下单代码吗?我们以代码中的开多仓为例:

strategy.entry("MACrossLE", strategy.long, comment="long")

其中"MACrossLE"就是将来触发报警时,对于"{{strategy.order.id}}"的填充内容。

其中"long"就是将来触发报警时,对于"{{strategy.order.comment}}"的填充内容。策略中识别的信号为(以下截图):

img

所以必须设置一致。这里我们给下单函数设置了"long"和"short",表示开多,开空的信号。

PINE脚本中没有指定每次的下单量是多少,所以Trading View发送报警消息时,使用的是默认下单量填充"{{strategy.order.contracts}}"部分。

实盘测试

img

img

当在Trading View 上运行的PINE脚本,执行交易函数时,因为我们设置了Webhook url报警,此时Trading View 平台就会向我们的策略内建的Http服务发送一个POST请求,这个请求query中包含一个用于验证的口令参数passPhrase。实际收到的请求body类似这样:

img

然后我们的策略根据这个body中的消息,执行对应的交易操作。

可以看到,策略在OKX仿真环境,根据Trading View上的PINE脚本进行了同步的信号交易。

策略地址

https://www.fmz.com/strategy/475235

感谢关注FMZ量化,感谢阅读。


More