缠论中枢绘制 - 技术分享

Author: abc_quant, Created: 2020-10-15 17:09:00, Updated:

根据某宽的文章移植的缠论中枢绘图 增加了多个中枢的绘制 有兴趣的可以了解一下缠论 顺便接一下策略定制:https://www.fmz.cn/market-offer/199

'''
start: 2020-10-1 00:00:00
end: 2020-10-15 16:00:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"OKEX","currency":"BTC_USDT","stocks":1}]
'''
import pandas as pd
from fmz import * # 导入所有FMZ函数
task = VCtx(__doc__) # 初始化
#!pip install --user mplfinance
#import sys
#sys.path.append('/home/quant/.local/lib/python3.6/site-packages')
#import mplfinance
# 第三方函数库
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
#import mplfinance as mpf
import matplotlib.patches as patches
import talib
import datetime
import warnings
warnings.filterwarnings("ignore")

def get_k_series():
    #
    # 获取k线序列,默认为30分钟级别
    # 输入:n是级别,单位是分钟
    # 输出:pandas, k线序列
    n = 1

    one_min_data = pd.DataFrame(exchange.GetRecords())
    one_min_data = one_min_data.rename(columns={'Time':'date','Open':'open','Close':'close','High':'high','Low':'low'})
    one_min_data['date'] = one_min_data['date'].apply(lambda x:_D(x/1000))
    one_min_data['Date'] = one_min_data['date'].apply(lambda x:pd.to_datetime(x))
    one_min_data.set_index('Date',inplace=True)
    
    #print(one_min_data)
    n_min_data = pd.DataFrame()
    for i in range(n, len(one_min_data) + 1, n):
        
        interval = one_min_data.iloc[i - n:i]
        interval_open = interval.open.iloc[0]
        interval_high = max(interval.high)
        interval_low = min(interval.low)
        interval_date = interval.date
        interval_k = pd.DataFrame(interval[-1:])  # 新建DataFrame,否则会报SettingWithCopyWarning
        interval_k.open = interval_open
        interval_k.high = interval_high
        interval_k.low = interval_low
        interval_k.date = interval_date
        #print(interval_k)
        n_min_data = pd.concat([n_min_data, interval_k], axis=0)
    n_min_data = n_min_data.reset_index()
    #del n_min_data['instrument']
    #del n_min_data['index']
    #print(n_min_data)
    return n_min_data


def get_binary_positions(k_data):
    #
    # 计算k线序列的二分位值
    # 输入:k线序列
    # 输出:list, k线序列对应的二分位值
    binary_positions = []
    for i in range(len(k_data)):
        temp_y = (k_data.high[i] + k_data.low[i]) / 2.0
        binary_positions.append(temp_y)
    return binary_positions


def adjust_by_cintainment(k_data):
    #
    # 判断k线的包含关系,便于寻找顶分型和底分型
    # 输入:k线序列
    # 输出:adjusted_k_data, 处理后的k线序列
    trend = [0]
    adjusted_k_data = pd.DataFrame()
    temp_data = k_data[:1]
    #print(temp_data)
    #return
    for i in range(len(k_data)):
        #print("处理:",i)
        is_equal = temp_data.high.iloc[-1] == k_data.high.iloc[i] and temp_data.low.iloc[-1] == k_data.low.iloc[i]  # 第1根等于第2根

        # 向右包含
        if temp_data.high.iloc[-1] >= k_data.high.iloc[i] and temp_data.low.iloc[-1] <= k_data.low.iloc[i] and not is_equal:
            if trend[-1] == -1:
                temp_data.high.iloc[-1] = k_data.high.iloc[i]
            else:
                temp_data.low.iloc[-1] = k_data.low.iloc[i]

        # 向左包含
        elif temp_data.high.iloc[-1] <= k_data.high.iloc[i] and temp_data.low.iloc[-1] >= k_data.low.iloc[i] and not is_equal:
            if trend[-1] == -1:
                temp_data.low.iloc[-1] = k_data.low.iloc[i]
            else:
                temp_data.high.iloc[-1] = k_data.high.iloc[i]

        elif is_equal:
            trend.append(0)

        elif temp_data.high.iloc[-1] > k_data.high.iloc[i] and temp_data.low.iloc[-1] > k_data.low.iloc[i]:
            trend.append(-1)
            temp_data = k_data[i:i + 1]

        elif temp_data.high.iloc[-1] < k_data.high.iloc[i] and temp_data.low.iloc[-1] < k_data.low.iloc[i]:
            trend.append(1)
            temp_data = k_data[i:i + 1]
        
        #print("处理判断完毕:",i)
        
        #print("调整收盘价和开盘价:",i)
        # 调整收盘价和开盘价
        if temp_data.open.iloc[-1] > temp_data.close.iloc[-1]:
            if temp_data.open.iloc[-1] > temp_data.high.iloc[-1]:
                temp_data.open.iloc[-1] = temp_data.high.iloc[-1]
            if temp_data.close.iloc[-1] < temp_data.low.iloc[-1]:
                temp_data.close.iloc[-1] = temp_data.low.iloc[-1]
        else:
            if temp_data.open.iloc[-1] < temp_data.low.iloc[-1]:
                temp_data.open.iloc[-1] = temp_data.low.iloc[-1]
            if temp_data.close.iloc[-1] > temp_data.high.iloc[-1]:
                temp_data.close.iloc[-1] = temp_data.high.iloc[-1]

        adjusted_data = k_data[i:i + 1]
        adjusted_data.open.iloc[-1] = temp_data.open.iloc[-1]
        adjusted_data.close.iloc[-1] = temp_data.close.iloc[-1]
        adjusted_data.high.iloc[-1] = temp_data.high.iloc[-1]
        adjusted_data.low.iloc[-1] = temp_data.low.iloc[-1]
        #print("调整收盘价和开盘价完毕:",i)
        adjusted_k_data = pd.concat([adjusted_k_data, adjusted_data], axis=0)

    return adjusted_k_data


def get_fx(adjusted_k_data):
    #
    # 寻找顶分型和底分型
    # 1)连续分型选择最极端值
    # 2)分型之间保证3根k线
    # 输入:调整后的k线序列
    # 输出:顶分型和底分型的位置

    temp_num = 0  # 上一个顶或底的位置
    temp_high = 0  # 上一个顶的high值
    temp_low = 0  # 上一个底的low值
    temp_type = 0  # 上一个记录位置的类型

    fx_type = []  # 记录分型点的类型,1为顶分型,-1为底分型
    fx_time = []  # 记录分型点的时间
    fx_plot = []  # 记录点的数值,为顶分型取high值,为底分型取low值
    fx_data = pd.DataFrame()  # 记录分型
    fx_offset = []

    # 加上线段起点
    fx_type.append(0)
    fx_offset.append(0)
    #fx_time.append(adjusted_k_data.index[0].strftime("%Y-%m-%d %H:%M:%S"))
    fx_time.append(adjusted_k_data.date[0])
    fx_data = pd.concat([fx_data, adjusted_k_data[:1]], axis=0)
    fx_plot.append((adjusted_k_data.low[0] + adjusted_k_data.high[0]) / 2)

    i = 1
    while (i < len(adjusted_k_data) - 1):

        top = adjusted_k_data.high[i - 1] <= adjusted_k_data.high[i] \
              and adjusted_k_data.high[i] > adjusted_k_data.high[i + 1]  # 顶分型
        bottom = adjusted_k_data.low[i - 1] >= adjusted_k_data.low[i] \
                 and adjusted_k_data.low[i] < adjusted_k_data.low[i + 1]  # 底分型

        if top:
            if temp_type == 1:
                # 如果上一个分型为顶分型,则进行比较,选取高点更高的分型
                if adjusted_k_data.high[i] <= temp_high:
                    i += 1
                else:
                    temp_high = adjusted_k_data.high[i]
                    temp_low = adjusted_k_data.low[i]
                    temp_num = i
                    temp_type = 1
                    i += 2  # 两个分型之间至少有3根k线
            elif temp_type == -1:
                # 如果上一个分型为底分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型
                if temp_low >= adjusted_k_data.high[i]:
                    # 如果上一个底分型的底比当前顶分型的顶高,则跳过当前顶分型。
                    i += 1
                else:
                    fx_type.append(-1)
                    #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S"))
                    fx_time.append(adjusted_k_data.date.iloc[temp_num])
                    fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
                    fx_plot.append(temp_low)
                    fx_offset.append(temp_num)
                    temp_high = adjusted_k_data.high[i]
                    temp_low = adjusted_k_data.low[i]
                    temp_num = i
                    temp_type = 1
                    i += 2  # 两个分型之间至少有3根k线
            else:
                temp_high = adjusted_k_data.high[i]
                temp_low = adjusted_k_data.low[i]
                temp_num = i
                temp_type = 1
                i += 2

        elif bottom:
            if temp_type == -1:
                # 如果上一个分型为底分型,则进行比较,选取低点更低的分型
                if adjusted_k_data.low[i] >= temp_low:
                    i += 1
                else:
                    temp_low = adjusted_k_data.low[i]
                    temp_high = adjusted_k_data.high[i]
                    temp_num = i
                    temp_type = -1
                    i += 2
            elif temp_type == 1:
                # 如果上一个分型为顶分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型
                if temp_high <= adjusted_k_data.low[i]:
                    # 如果上一个顶分型的底比当前底分型的底低,则跳过当前底分型。
                    i += 1
                else:
                    fx_type.append(1)
                    #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S"))
                    fx_time.append(adjusted_k_data.date.iloc[temp_num])
                    fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
                    fx_plot.append(temp_high)
                    fx_offset.append(temp_num)
                    temp_low = adjusted_k_data.low[i]
                    temp_high = adjusted_k_data.high[i]
                    temp_num = i
                    temp_type = -1
                    i += 2
            else:
                temp_low = adjusted_k_data.low[i]
                temp_high = adjusted_k_data.high[i]
                temp_num = i
                temp_type = -1
                i += 2
        else:
            i += 1

    # 加上最后一个分型(上面的循环中最后的一个分型并未处理)
    if temp_type == -1:
        fx_type.append(-1)
        #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S"))
        fx_time.append(adjusted_k_data.date.iloc[temp_num])
        fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
        fx_plot.append(temp_low)
        fx_offset.append(temp_num)
    elif temp_type == 1:
        fx_type.append(1)
        #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S"))
        fx_time.append(adjusted_k_data.date.iloc[temp_num])
        fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
        fx_plot.append(temp_high)
        fx_offset.append(temp_num)

    # 加上线段终点
    fx_type.append(0)
    fx_offset.append(len(adjusted_k_data) - 1)
    #fx_time.append(adjusted_k_data.index[-1].strftime("%Y-%m-%d %H:%M:%S"))
    fx_time.append(adjusted_k_data.date.iloc[-1])
    fx_data = pd.concat([fx_data, adjusted_k_data[-1:]], axis=0)
    fx_plot.append((adjusted_k_data.low.iloc[-1] + adjusted_k_data.high.iloc[-1]) / 2)

    return fx_type, fx_time, fx_data, fx_plot, fx_offset


def get_pivot(fx_plot, fx_offset, fx_observe):
    #
    # 计算最近的中枢
    # 注意:一个中枢至少有三笔
    # fx_plot 笔的节点股价
    # fx_offset 笔的节点时间点(偏移)
    # fx_observe 所观测的分型点

    if fx_observe < 1:
        # 处理边界
        right_bound = 0
        left_bount = 0
        min_high = 0
        max_low = 0
        pivot_x_interval = [left_bount, right_bound]
        pivot_price_interval = [max_low, min_high]
        return pivot_x_interval, pivot_price_interval

    right_bound = (fx_offset[fx_observe] + fx_offset[fx_observe - 1]) / 2
    # 右边界是所观察分型的上一笔中位
    left_bount = 0
    min_high = 0
    max_low = 0

    if fx_plot[fx_observe] >= fx_plot[fx_observe - 1]:
        # 所观察分型的上一笔是往上的一笔
        min_high = fx_plot[fx_observe]
        max_low = fx_plot[fx_observe - 1]
    else:  # 所观察分型的上一笔是往下的一笔
        max_low = fx_plot[fx_observe]
        min_high = fx_plot[fx_observe - 1]

    i = fx_observe - 1
    cover = 0  # 记录走势的重叠区,至少为3才能画中枢
    while (i >= 1):
        if fx_plot[i] >= fx_plot[i - 1]:
            # 往上的一笔
            if fx_plot[i] < max_low or fx_plot[i - 1] > min_high:
                # 已经没有重叠区域了
                left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2
                break
            else:
                # 有重叠区域
                # 计算更窄的中枢价格区间
                cover += 1
                min_high = min(fx_plot[i], min_high)
                max_low = max(fx_plot[i - 1], max_low)


        elif fx_plot[i] < fx_plot[i - 1]:
            # 往下的一笔
            if fx_plot[i] > min_high or fx_plot[i - 1] < max_low:
                # 已经没有重叠区域了
                left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2
                break
            else:
                # 有重叠区域
                # 计算更窄的中枢价格区间
                cover += 3
                min_high = min(fx_plot[i - 1], min_high)
                max_low = max(fx_plot[i], max_low)

        i -= 1

    if cover < 3:
        # 不满足中枢定义
        right_bound = -1
        left_bount = -1
        min_high = -1
        max_low = -1

    pivot_x_interval = [left_bount, right_bound]
    pivot_price_interval = [max_low, min_high]
    return pivot_x_interval, pivot_price_interval,i


def plot_k_series(ax,k_data):
    # 画k线
    num_of_ticks = len(k_data)

   # fig, ax = plt.subplots(figsize=(num_of_ticks, 20))
   # fig.subplots_adjust(bottom=0.2)
    dates = k_data.date
    # print dates
    ax.set_xticks(np.linspace(1, num_of_ticks, num_of_ticks))
    ax.set_xticklabels(list(dates))
    """
    xticks = list(range(0, len(dates), 10))  # 这里设置的是x轴点的位置(40设置的就是间隔了)
    xlabels = [dates[x] for x in xticks ]  # 这里设置X轴上的点对应在数据集中的值(这里用的数据为totalSeed)
    xticks.append(len(dates))
    xlabels.append(dates[-1])
    ax.set_xticks(xticks)
    ax.set_xticklabels(xlabels, rotation=40)
    """
    #T.plot(k_data,candlestick=True)
    print("绘制K线")
    plt.plot(k_data.close)
    #print(1)
#     mpf.candlestick2_ochl(
#         ax,
#         list(k_data.open), list(k_data.close), list(k_data.high), list(k_data.low),
#         width=0.6, colorup='r', colordown='b', alpha=0.75
#     )
    #mpf.plot(k_data,type='candle')
    
    plt.grid(True)
    plt.setp(plt.gca().get_xticklabels(), rotation=30)
    return dates


def plot_lines(ax, fx_plot, fx_offset):
    # 绘制笔和线段
    # ax 绘图区域
    # fx_plot

    plt.plot(fx_offset, fx_plot, 'k', lw=1)
    plt.plot(fx_offset, fx_plot, 'o')


def plot_pivot(ax, pivot_date_interval, pivot_price_interval):
    #
    # 绘制中枢

    start_point = (pivot_date_interval[0], pivot_price_interval[0])
    width = pivot_date_interval[1] - pivot_date_interval[0]
    height = pivot_price_interval[1] - pivot_price_interval[0]
    print(
        "中枢:",
        start_point,  # (x,y)
        width,  # width
        height,  # height
    )
    plt.gca().add_patch(
        patches.Rectangle(
            start_point,  # (x,y)
            width,  # width
            height,  # height
            linewidth=8,
            edgecolor='g',
            facecolor='none'
        )
    )
    return


def plot_all(select_deta=10,price_percent=0.01):#select_deta 中枢最小间隔  price_percent幅度小于某个值
    k_series = get_k_series()
    kk=k_series
    if(len(kk)<10):
        print('k线数量不足')
        return
    fig = plt.figure(figsize=(50, 20))
    plt.rcParams.update({'figure.max_open_warning': 0})

    ax2= fig.add_subplot(212)
    
    print("处理K线...")
    adjusted_k_data = adjust_by_cintainment(k_series)
    plot_k_series(ax2,adjusted_k_data) # 调整后的k线图

    fx_type, fx_time, fx_data, fx_plot, fx_offset = get_fx(adjusted_k_data)
    print (fx_type, fx_time)
    plot_lines(ax2, fx_plot, fx_offset)
    pivot_x_interval, pivot_price_interval,now_index = None,None,len(fx_offset)-2
    #last_pivot_x_interval, last_pivot_price_interval,last_index = get_pivot(fx_plot, fx_offset, now_index)
    
    while now_index >= 1:
        pivot_x_interval, pivot_price_interval,now_index = get_pivot(fx_plot, fx_offset, now_index)
        if pivot_x_interval[0] == -1:
            break
        else:
            if pivot_x_interval[1] - pivot_x_interval[0] < select_deta:
                print("pivot_x_interval[1] - pivot_x_interval[0] < select_deta")
                continue
            hhv = max(k_series.high[int(pivot_x_interval[0]):int(pivot_x_interval[1])])
            llv = min(k_series.low[int(pivot_x_interval[0]):int(pivot_x_interval[1])])
            hhv_deta = abs((hhv - pivot_price_interval[1]) / pivot_price_interval[1])
            llv_deta = abs((llv - pivot_price_interval[0]) / pivot_price_interval[0])
            if (hhv_deta > price_percent or llv_deta > price_percent):
                print(" (hhv_deta > 1.0 / price_percent or llv_deta > 1.0 / price_percent)")
                continue
            plot_pivot(ax2, pivot_x_interval, pivot_price_interval)
    
plot_all(
    select_deta = 2,#中枢最小间隔
    price_percent = 0.5#幅度小于某个值
)
处理K线...
绘制K线
[0, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 0] ['2020-09-22 16:00:00', '2020-09-22 17:00:00', '2020-09-23 01:00:00', '2020-09-23 07:00:00', '2020-09-23 13:00:00', '2020-09-23 15:00:00', '2020-09-23 17:00:00', '2020-09-23 21:00:00', '2020-09-24 03:00:00', '2020-09-24 05:00:00', '2020-09-24 17:00:00', '2020-09-24 19:00:00', '2020-09-24 23:00:00', '2020-09-25 10:00:00', '2020-09-25 20:00:00', '2020-09-26 01:00:00', '2020-09-26 06:00:00', '2020-09-26 10:00:00', '2020-09-26 18:00:00', '2020-09-26 22:00:00', '2020-09-27 01:00:00', '2020-09-27 12:00:00', '2020-09-27 16:00:00', '2020-09-27 18:00:00', '2020-09-28 02:00:00', '2020-09-28 09:00:00', '2020-09-28 13:00:00', '2020-09-28 15:00:00', '2020-09-28 17:00:00', '2020-09-28 19:00:00', '2020-09-28 21:00:00', '2020-09-29 02:00:00', '2020-09-29 04:00:00', '2020-09-29 06:00:00', '2020-09-29 11:00:00', '2020-09-29 17:00:00', '2020-09-30 00:00:00', '2020-09-30 09:00:00', '2020-09-30 17:00:00', '2020-09-30 20:00:00', '2020-10-01 00:00:00']
中枢: (151.5, 10681.6) 43.0 30.299999999999272
中枢: (126.0, 10855.3) 22.0 31.80000000000109
中枢: (43.0, 10702.8) 78.0 31.80000000000109
中枢: (27.0, 10270.8) 9.0 70.60000000000036
中枢: (0, 10450.0) 24.0 19.549999999999272

<Figure size 3600x1440 with 1 Axes>


更多内容

mb9177 有没有要

homily 感觉不错,不过代码有点乱,没有看懂计算笔和和中枢的方法,我要仔细学习一下。另外,请问会不会有未来函数的可能。

小草 点赞,把基于图像的描述转化为代码还是很难的

abc_quant 根据历史行情计算的中枢,不存在未来函数