最近看到了布欧的量化日记提到可以用利用负相关币种来进行选币,根据价差突破来开仓获利。数字货币基本上都是正相关的,负相关的是少数币种,往往有着特殊的行情,比如前一段时间的MEME币的独立行情,完全不跟大盘走势,筛选出来这些币种,突破后做多,这种方式在特定行情下能够获利。但量化交易领域最常见的还是利用正相关性做配对交易,本文将简要介绍这个策略。
数字货币配对交易是一种基于统计套利的交易策略,通过同时买入和卖出两种相关性较强的数字货币永续合约,以获取价格偏离带来的利润。本文将详细介绍该策略的原理、盈利机制、筛选币种的方法、潜在风险及其改进方式,并提供一些实用的Python代码示例。
配对交易策略依赖于两种数字货币价格之间的历史相关性。当两种币种价格呈现强相关性时,它们的价格走势大体同步。如果某一时刻两者的价格比值出现显著偏离,则可以认为这是暂时性的异常,价格会趋于恢复正常水平。数字货币市场具有高度的联动性,当某一种主要数字货币(如比特币)出现大幅波动时,通常会引发其他数字货币的联动反应。而有些币种可能由于是相同投资机构、相同做市商、相同赛道的原因,出现非常明显的正相关性,并且能够持续。有些币种则出现负相关,但负相关的币种较少,并且由于都受大盘走势的影响,很多时候也会出现一致的行情走势。
假设币种A和币种B具有较高的价格相关性。在某一时刻,A/B价格比值的平均值为1。如果某一时刻,A/B价格比值上涨偏离超过0.001,即超过1.001,此时可以通过以下方式进行交易: 开仓做多B,开仓做空A。反之,当A/B价格比值低于0.999时: 开仓做多A,开仓做空B。
盈利的关键在于价格偏离回归正常时的差价收益。由于价格偏离通常是短暂的,交易者可以在价格回归到均值时平仓,从中获利,赚取差价。
这些代码可以直接使用,最好下载个Anancoda,在jupyer notebook中调试。直接包含了常用数据分析的包。
import requests
from datetime import date,datetime
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests, zipfile, io
%matplotlib inline
Info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo')
b_symbols = [s['symbol'] for s in Info.json()['symbols'] if s['contractType'] == 'PERPETUAL' and s['status'] == 'TRADING' and s['quoteAsset'] == 'USDT']
b_symbols = list(filter(lambda x: x[-4:] == 'USDT', [s.split('_')[0] for s in b_symbols]))
b_symbols = [x[:-4] for x in b_symbols]
print(b_symbols) # 获取所有的正在交易的交易对
GetKlines函数的主要功能是从币安交易所获取指定交易对永续合约的历史K线数据,并将这些数据存储在一个Pandas DataFrame中。K线数据包括开盘价、最高价、最低价、收盘价、成交量等信息。这次我们主要用到收盘价的数据。
def GetKlines(symbol='BTCUSDT',start='2020-8-10',end='2024-7-01',period='1h',base='fapi',v = 'v1'):
Klines = []
start_time = int(time.mktime(datetime.strptime(start, "%Y-%m-%d").timetuple()))*1000 + 8*60*60*1000
end_time = min(int(time.mktime(datetime.strptime(end, "%Y-%m-%d").timetuple()))*1000 + 8*60*60*1000,time.time()*1000)
intervel_map = {'m':60*1000,'h':60*60*1000,'d':24*60*60*1000}
while start_time < end_time:
time.sleep(0.3)
mid_time = start_time+1000*int(period[:-1])*intervel_map[period[-1]]
url = 'https://'+base+'.binance.com/'+base+'/'+v+'/klines?symbol=%s&interval=%s&startTime=%s&endTime=%s&limit=1000'%(symbol,period,start_time,mid_time)
res = requests.get(url)
res_list = res.json()
if type(res_list) == list and len(res_list) > 0:
start_time = res_list[-1][0]+int(period[:-1])*intervel_map[period[-1]]
Klines += res_list
if type(res_list) == list and len(res_list) == 0:
start_time = start_time+1000*int(period[:-1])*intervel_map[period[-1]]
if mid_time >= end_time:
break
df = pd.DataFrame(Klines,columns=['time','open','high','low','close','amount','end_time','volume','count','buy_amount','buy_volume','null']).astype('float')
df.index = pd.to_datetime(df.time,unit='ms')
return df
数据量比较大,为了更快的下载,只获取了最近3个月的小时K线数据。df_close包含所有的币种的收盘价数据
start_date = '2024-04-01'
end_date = '2024-07-05'
period = '1h'
df_dict = {}
for symbol in b_symbols:
print(symbol)
if symbol in df_dict.keys():
continue
df_s = GetKlines(symbol=symbol+'USDT',start=start_date,end=end_date,period=period)
if not df_s.empty:
df_dict[symbol] = df_s
df_close = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq=period),columns=df_dict.keys())
for symbol in symbols:
df_close[symbol] = df_dict[symbol].close
df_close = df_close.dropna(how='all')
定义了一个Exchange对象,用于接下来的回测
class Exchange:
def __init__(self, trade_symbols, fee=0.0002, initial_balance=10000):
self.initial_balance = initial_balance #初始的资产
self.fee = fee
self.trade_symbols = trade_symbols
self.account = {'USDT':{'realised_profit':0, 'unrealised_profit':0, 'total':initial_balance,
'fee':0, 'leverage':0, 'hold':0, 'long':0, 'short':0}}
for symbol in trade_symbols:
self.account[symbol] = {'amount':0, 'hold_price':0, 'value':0, 'price':0, 'realised_profit':0,'unrealised_profit':0,'fee':0}
def Trade(self, symbol, direction, price, amount):
cover_amount = 0 if direction*self.account[symbol]['amount'] >=0 else min(abs(self.account[symbol]['amount']), amount)
open_amount = amount - cover_amount
self.account['USDT']['realised_profit'] -= price*amount*self.fee #扣除手续费
self.account['USDT']['fee'] += price*amount*self.fee
self.account[symbol]['fee'] += price*amount*self.fee
if cover_amount > 0: #先平仓
self.account['USDT']['realised_profit'] += -direction*(price - self.account[symbol]['hold_price'])*cover_amount #利润
self.account[symbol]['realised_profit'] += -direction*(price - self.account[symbol]['hold_price'])*cover_amount
self.account[symbol]['amount'] -= -direction*cover_amount
self.account[symbol]['hold_price'] = 0 if self.account[symbol]['amount'] == 0 else self.account[symbol]['hold_price']
if open_amount > 0:
total_cost = self.account[symbol]['hold_price']*direction*self.account[symbol]['amount'] + price*open_amount
total_amount = direction*self.account[symbol]['amount']+open_amount
self.account[symbol]['hold_price'] = total_cost/total_amount
self.account[symbol]['amount'] += direction*open_amount
def Buy(self, symbol, price, amount):
self.Trade(symbol, 1, price, amount)
def Sell(self, symbol, price, amount):
self.Trade(symbol, -1, price, amount)
def Update(self, close_price): #对资产进行更新
self.account['USDT']['unrealised_profit'] = 0
self.account['USDT']['hold'] = 0
self.account['USDT']['long'] = 0
self.account['USDT']['short'] = 0
for symbol in self.trade_symbols:
if not np.isnan(close_price[symbol]):
self.account[symbol]['unrealised_profit'] = (close_price[symbol] - self.account[symbol]['hold_price'])*self.account[symbol]['amount']
self.account[symbol]['price'] = close_price[symbol]
self.account[symbol]['value'] = self.account[symbol]['amount']*close_price[symbol]
if self.account[symbol]['amount'] > 0:
self.account['USDT']['long'] += self.account[symbol]['value']
if self.account[symbol]['amount'] < 0:
self.account['USDT']['short'] += self.account[symbol]['value']
self.account['USDT']['hold'] += abs(self.account[symbol]['value'])
self.account['USDT']['unrealised_profit'] += self.account[symbol]['unrealised_profit']
self.account['USDT']['total'] = round(self.account['USDT']['realised_profit'] + self.initial_balance + self.account['USDT']['unrealised_profit'],6)
self.account['USDT']['leverage'] = round(self.account['USDT']['hold']/self.account['USDT']['total'],3)
相关性计算是统计学中的一种方法,用于衡量两个变量之间的线性关系。最常用的相关性计算方法是皮尔森相关系数。下面是相关性计算的原理、公式和实现方法。皮尔森相关系数用于衡量两个变量之间的线性关系,取值范围在-1到1之间:
皮尔森相关系数通过计算两个变量的协方差和标准差来确定其相关性。公式如下:
[ \rho_{X,Y} = \frac{\text{cov}(X,Y)}{\sigma_X \sigma_Y} ]
其中:
当然,不用太关心是如何计算的,使用Python 1行代码就能计算出所有币种的相关性。如图展示的是相关性热力图,红色的代表正相关,蓝色代表负相关,颜色越深相关性越强。可以看到大片的都是深红色,所以说数字货币的正相关性很强。
import seaborn as sns
corr = df_close.corr()
plt.figure(figsize=(20, 20))
sns.heatmap(corr, annot=False, cmap='coolwarm', vmin=-1, vmax=1)
plt.title('Correlation Heatmap of Cryptocurrency Closing Prices', fontsize=20);
根据相关性,筛选出最相关的前20个币种对。结果如下。它们的相关性都非常之强,都在0.99以上。
MANA SAND 0.996562
ICX ZIL 0.996000
STORJ FLOW 0.994193
FLOW SXP 0.993861
STORJ SXP 0.993822
IOTA ZIL 0.993204
SAND 0.993095
KAVA SAND 0.992303
ZIL SXP 0.992285
SAND 0.992103
DYDX ZIL 0.992053
DENT REEF 0.991789
RDNT MANTA 0.991690
STMX STORJ 0.991222
BIGTIME ACE 0.990987
RDNT HOOK 0.990718
IOST GAS 0.990643
ZIL HOOK 0.990576
MATIC FLOW 0.990564
MANTA HOOK 0.990563
相应的代码如下:
corr_pairs = corr.unstack()
# 移除自身相关性(即对角线上的值)
corr_pairs = corr_pairs[corr_pairs != 1]
sorted_corr_pairs = corr_pairs.sort_values(kind="quicksort")
# 提取最相关和最不相关的前20个币种对
most_correlated = sorted_corr_pairs.tail(40)[::-2]
print("最相关的前20个币种对:")
print(most_correlated)
具体回测代码如下。演示策略的主要观察两个加密货币(IOTA 和 ZIL)的价格比率,并根据这个比率的变化来进行交易。具体步骤如下:
初始化:
e
,初始余额为10000美元,交易费用为0.02%。avg
。value = 1000
。迭代处理价格数据:
df_close
。diff
。aim_value
,每偏离0.01,交易一个value。并基于当前账户持仓和价格情况决定买卖操作。pair_a
和买入 pair_b
操作。pair_a
和卖出 pair_b
操作。调整平均值:
avg
,以便反映最新的价格比率。更新账户和记录:
res_list
。结果输出:
res_list
转换为dataframe res
,以便进一步分析和展示。pair_a = 'IOTA'
pair_b = "ZIL"
e = Exchange([pair_a,pair_b], fee=0.0002, initial_balance=10000) #Exchange定义放在评论区
res_list = []
index_list = []
avg = df_close[pair_a][0] / df_close[pair_b][0]
value = 1000
for idx, row in df_close.iterrows():
diff = (row[pair_a] / row[pair_b] - avg)/avg
aim_value = -value * diff / 0.01
if -aim_value + e.account[pair_a]['amount']*row[pair_a] > 0.5*value:
e.Sell(pair_a,row[pair_a],(-aim_value + e.account[pair_a]['amount']*row[pair_a])/row[pair_a])
e.Buy(pair_b,row[pair_b],(-aim_value - e.account[pair_b]['amount']*row[pair_b])/row[pair_b])
if -aim_value + e.account[pair_a]['amount']*row[pair_a] < -0.5*value:
e.Buy(pair_a, row[pair_a],(aim_value - e.account[pair_a]['amount']*row[pair_a])/row[pair_a])
e.Sell(pair_b, row[pair_b],(aim_value + e.account[pair_b]['amount']*row[pair_b])/row[pair_b])
avg = 0.99*avg + 0.01*row[pair_a] / row[pair_b]
index_list.append(idx)
e.Update(row)
res_list.append([e.account['USDT']['total'],e.account['USDT']['hold'],
e.account['USDT']['fee'],e.account['USDT']['long'],e.account['USDT']['short']])
res = pd.DataFrame(data=res_list, columns=['total','hold', 'fee', 'long', 'short'],index = index_list)
res['total'].plot(grid=True);
共回测了4组币种,结果比较理想。目前相关性的计算用到了未来数据,所以不是很精确。本文也把数据分为了两部分,根据前面的计算相关性,后面的回测交易。结果差了一些但也不错。留给用户自己练习验证。
尽管配对交易策略在理论上可以盈利,但在实际操作中仍存在一些风险:币种之间的相关性可能随时间发生变化,导致策略失效;极端市场条件下,价格偏离可能加剧,导致较大的损失;某些币种的流动性较低,可能导致交易难以执行或成本增加;频繁交易产生的手续费可能侵蚀利润。
为降低风险并提高策略的稳定性,可以考虑以下改进措施:定期重新计算币种间的相关性,及时调整交易对;设置止损和止盈点,控制单笔交易的最大损失;同时交易多个币种对,分散风险。
数字货币配对交易策略通过利用币种价格的相关性,在价格偏离时进行套利操作,从而实现盈利。该策略具有较高的理论可行性。随后将放出一个基于该策略的简单的实盘策略源码。如果有更多问题或需要进一步讨论,欢迎随时交流。
77924998 这个值得研究,码源呢?
豆豆888 张总加班 - -! 哈哈哈