넓은 글에 따라 모집된 여론 중앙 지도 더 많은 중앙의 지도를 추가 유권자들에 대한 여론을 알아보는 것이 흥미롭습니다. 이 글은 한 가지 더 중요한 부분입니다.https://www.fmz.cn/market-offer/199
''' start: 2020-10-1 00:00:00 end: 2020-10-15 16:00:00 period: 1h basePeriod: 1h exchanges: [{"eid":"OKEX","currency":"BTC_USDT","stocks":1}] ''' import pandas as pd from fmz import * # 导入所有FMZ函数 task = VCtx(__doc__) # 初始化 #!pip install --user mplfinance #import sys #sys.path.append('/home/quant/.local/lib/python3.6/site-packages') #import mplfinance
# 第三方函数库 import numpy as np import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt #import mplfinance as mpf import matplotlib.patches as patches import talib import datetime import warnings warnings.filterwarnings("ignore") def get_k_series(): # # 获取k线序列,默认为30分钟级别 # 输入:n是级别,单位是分钟 # 输出:pandas, k线序列 n = 1 one_min_data = pd.DataFrame(exchange.GetRecords()) one_min_data = one_min_data.rename(columns={'Time':'date','Open':'open','Close':'close','High':'high','Low':'low'}) one_min_data['date'] = one_min_data['date'].apply(lambda x:_D(x/1000)) one_min_data['Date'] = one_min_data['date'].apply(lambda x:pd.to_datetime(x)) one_min_data.set_index('Date',inplace=True) #print(one_min_data) n_min_data = pd.DataFrame() for i in range(n, len(one_min_data) + 1, n): interval = one_min_data.iloc[i - n:i] interval_open = interval.open.iloc[0] interval_high = max(interval.high) interval_low = min(interval.low) interval_date = interval.date interval_k = pd.DataFrame(interval[-1:]) # 新建DataFrame,否则会报SettingWithCopyWarning interval_k.open = interval_open interval_k.high = interval_high interval_k.low = interval_low interval_k.date = interval_date #print(interval_k) n_min_data = pd.concat([n_min_data, interval_k], axis=0) n_min_data = n_min_data.reset_index() #del n_min_data['instrument'] #del n_min_data['index'] #print(n_min_data) return n_min_data def get_binary_positions(k_data): # # 计算k线序列的二分位值 # 输入:k线序列 # 输出:list, k线序列对应的二分位值 binary_positions = [] for i in range(len(k_data)): temp_y = (k_data.high[i] + k_data.low[i]) / 2.0 binary_positions.append(temp_y) return binary_positions def adjust_by_cintainment(k_data): # # 判断k线的包含关系,便于寻找顶分型和底分型 # 输入:k线序列 # 输出:adjusted_k_data, 处理后的k线序列 trend = [0] adjusted_k_data = pd.DataFrame() temp_data = k_data[:1] #print(temp_data) #return for i in range(len(k_data)): #print("处理:",i) is_equal = temp_data.high.iloc[-1] == k_data.high.iloc[i] and temp_data.low.iloc[-1] == k_data.low.iloc[i] # 第1根等于第2根 # 向右包含 if temp_data.high.iloc[-1] >= k_data.high.iloc[i] and temp_data.low.iloc[-1] <= k_data.low.iloc[i] and not is_equal: if trend[-1] == -1: temp_data.high.iloc[-1] = k_data.high.iloc[i] else: temp_data.low.iloc[-1] = k_data.low.iloc[i] # 向左包含 elif temp_data.high.iloc[-1] <= k_data.high.iloc[i] and temp_data.low.iloc[-1] >= k_data.low.iloc[i] and not is_equal: if trend[-1] == -1: temp_data.low.iloc[-1] = k_data.low.iloc[i] else: temp_data.high.iloc[-1] = k_data.high.iloc[i] elif is_equal: trend.append(0) elif temp_data.high.iloc[-1] > k_data.high.iloc[i] and temp_data.low.iloc[-1] > k_data.low.iloc[i]: trend.append(-1) temp_data = k_data[i:i + 1] elif temp_data.high.iloc[-1] < k_data.high.iloc[i] and temp_data.low.iloc[-1] < k_data.low.iloc[i]: trend.append(1) temp_data = k_data[i:i + 1] #print("处理判断完毕:",i) #print("调整收盘价和开盘价:",i) # 调整收盘价和开盘价 if temp_data.open.iloc[-1] > temp_data.close.iloc[-1]: if temp_data.open.iloc[-1] > temp_data.high.iloc[-1]: temp_data.open.iloc[-1] = temp_data.high.iloc[-1] if temp_data.close.iloc[-1] < temp_data.low.iloc[-1]: temp_data.close.iloc[-1] = temp_data.low.iloc[-1] else: if temp_data.open.iloc[-1] < temp_data.low.iloc[-1]: temp_data.open.iloc[-1] = temp_data.low.iloc[-1] if temp_data.close.iloc[-1] > temp_data.high.iloc[-1]: temp_data.close.iloc[-1] = temp_data.high.iloc[-1] adjusted_data = k_data[i:i + 1] adjusted_data.open.iloc[-1] = temp_data.open.iloc[-1] adjusted_data.close.iloc[-1] = temp_data.close.iloc[-1] adjusted_data.high.iloc[-1] = temp_data.high.iloc[-1] adjusted_data.low.iloc[-1] = temp_data.low.iloc[-1] #print("调整收盘价和开盘价完毕:",i) adjusted_k_data = pd.concat([adjusted_k_data, adjusted_data], axis=0) return adjusted_k_data def get_fx(adjusted_k_data): # # 寻找顶分型和底分型 # 1)连续分型选择最极端值 # 2)分型之间保证3根k线 # 输入:调整后的k线序列 # 输出:顶分型和底分型的位置 temp_num = 0 # 上一个顶或底的位置 temp_high = 0 # 上一个顶的high值 temp_low = 0 # 上一个底的low值 temp_type = 0 # 上一个记录位置的类型 fx_type = [] # 记录分型点的类型,1为顶分型,-1为底分型 fx_time = [] # 记录分型点的时间 fx_plot = [] # 记录点的数值,为顶分型取high值,为底分型取low值 fx_data = pd.DataFrame() # 记录分型 fx_offset = [] # 加上线段起点 fx_type.append(0) fx_offset.append(0) #fx_time.append(adjusted_k_data.index[0].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date[0]) fx_data = pd.concat([fx_data, adjusted_k_data[:1]], axis=0) fx_plot.append((adjusted_k_data.low[0] + adjusted_k_data.high[0]) / 2) i = 1 while (i < len(adjusted_k_data) - 1): top = adjusted_k_data.high[i - 1] <= adjusted_k_data.high[i] \ and adjusted_k_data.high[i] > adjusted_k_data.high[i + 1] # 顶分型 bottom = adjusted_k_data.low[i - 1] >= adjusted_k_data.low[i] \ and adjusted_k_data.low[i] < adjusted_k_data.low[i + 1] # 底分型 if top: if temp_type == 1: # 如果上一个分型为顶分型,则进行比较,选取高点更高的分型 if adjusted_k_data.high[i] <= temp_high: i += 1 else: temp_high = adjusted_k_data.high[i] temp_low = adjusted_k_data.low[i] temp_num = i temp_type = 1 i += 2 # 两个分型之间至少有3根k线 elif temp_type == -1: # 如果上一个分型为底分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型 if temp_low >= adjusted_k_data.high[i]: # 如果上一个底分型的底比当前顶分型的顶高,则跳过当前顶分型。 i += 1 else: fx_type.append(-1) #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date.iloc[temp_num]) fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0) fx_plot.append(temp_low) fx_offset.append(temp_num) temp_high = adjusted_k_data.high[i] temp_low = adjusted_k_data.low[i] temp_num = i temp_type = 1 i += 2 # 两个分型之间至少有3根k线 else: temp_high = adjusted_k_data.high[i] temp_low = adjusted_k_data.low[i] temp_num = i temp_type = 1 i += 2 elif bottom: if temp_type == -1: # 如果上一个分型为底分型,则进行比较,选取低点更低的分型 if adjusted_k_data.low[i] >= temp_low: i += 1 else: temp_low = adjusted_k_data.low[i] temp_high = adjusted_k_data.high[i] temp_num = i temp_type = -1 i += 2 elif temp_type == 1: # 如果上一个分型为顶分型,则记录上一个分型,用当前分型与后面的分型比较,选取同向更极端的分型 if temp_high <= adjusted_k_data.low[i]: # 如果上一个顶分型的底比当前底分型的底低,则跳过当前底分型。 i += 1 else: fx_type.append(1) #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date.iloc[temp_num]) fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0) fx_plot.append(temp_high) fx_offset.append(temp_num) temp_low = adjusted_k_data.low[i] temp_high = adjusted_k_data.high[i] temp_num = i temp_type = -1 i += 2 else: temp_low = adjusted_k_data.low[i] temp_high = adjusted_k_data.high[i] temp_num = i temp_type = -1 i += 2 else: i += 1 # 加上最后一个分型(上面的循环中最后的一个分型并未处理) if temp_type == -1: fx_type.append(-1) #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date.iloc[temp_num]) fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0) fx_plot.append(temp_low) fx_offset.append(temp_num) elif temp_type == 1: fx_type.append(1) #fx_time.append(adjusted_k_data.index[temp_num].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date.iloc[temp_num]) fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0) fx_plot.append(temp_high) fx_offset.append(temp_num) # 加上线段终点 fx_type.append(0) fx_offset.append(len(adjusted_k_data) - 1) #fx_time.append(adjusted_k_data.index[-1].strftime("%Y-%m-%d %H:%M:%S")) fx_time.append(adjusted_k_data.date.iloc[-1]) fx_data = pd.concat([fx_data, adjusted_k_data[-1:]], axis=0) fx_plot.append((adjusted_k_data.low.iloc[-1] + adjusted_k_data.high.iloc[-1]) / 2) return fx_type, fx_time, fx_data, fx_plot, fx_offset def get_pivot(fx_plot, fx_offset, fx_observe): # # 计算最近的中枢 # 注意:一个中枢至少有三笔 # fx_plot 笔的节点股价 # fx_offset 笔的节点时间点(偏移) # fx_observe 所观测的分型点 if fx_observe < 1: # 处理边界 right_bound = 0 left_bount = 0 min_high = 0 max_low = 0 pivot_x_interval = [left_bount, right_bound] pivot_price_interval = [max_low, min_high] return pivot_x_interval, pivot_price_interval right_bound = (fx_offset[fx_observe] + fx_offset[fx_observe - 1]) / 2 # 右边界是所观察分型的上一笔中位 left_bount = 0 min_high = 0 max_low = 0 if fx_plot[fx_observe] >= fx_plot[fx_observe - 1]: # 所观察分型的上一笔是往上的一笔 min_high = fx_plot[fx_observe] max_low = fx_plot[fx_observe - 1] else: # 所观察分型的上一笔是往下的一笔 max_low = fx_plot[fx_observe] min_high = fx_plot[fx_observe - 1] i = fx_observe - 1 cover = 0 # 记录走势的重叠区,至少为3才能画中枢 while (i >= 1): if fx_plot[i] >= fx_plot[i - 1]: # 往上的一笔 if fx_plot[i] < max_low or fx_plot[i - 1] > min_high: # 已经没有重叠区域了 left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2 break else: # 有重叠区域 # 计算更窄的中枢价格区间 cover += 1 min_high = min(fx_plot[i], min_high) max_low = max(fx_plot[i - 1], max_low) elif fx_plot[i] < fx_plot[i - 1]: # 往下的一笔 if fx_plot[i] > min_high or fx_plot[i - 1] < max_low: # 已经没有重叠区域了 left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2 break else: # 有重叠区域 # 计算更窄的中枢价格区间 cover += 3 min_high = min(fx_plot[i - 1], min_high) max_low = max(fx_plot[i], max_low) i -= 1 if cover < 3: # 不满足中枢定义 right_bound = -1 left_bount = -1 min_high = -1 max_low = -1 pivot_x_interval = [left_bount, right_bound] pivot_price_interval = [max_low, min_high] return pivot_x_interval, pivot_price_interval,i def plot_k_series(ax,k_data): # 画k线 num_of_ticks = len(k_data) # fig, ax = plt.subplots(figsize=(num_of_ticks, 20)) # fig.subplots_adjust(bottom=0.2) dates = k_data.date # print dates ax.set_xticks(np.linspace(1, num_of_ticks, num_of_ticks)) ax.set_xticklabels(list(dates)) """ xticks = list(range(0, len(dates), 10)) # 这里设置的是x轴点的位置(40设置的就是间隔了) xlabels = [dates[x] for x in xticks ] # 这里设置X轴上的点对应在数据集中的值(这里用的数据为totalSeed) xticks.append(len(dates)) xlabels.append(dates[-1]) ax.set_xticks(xticks) ax.set_xticklabels(xlabels, rotation=40) """ #T.plot(k_data,candlestick=True) print("绘制K线") plt.plot(k_data.close) #print(1) # mpf.candlestick2_ochl( # ax, # list(k_data.open), list(k_data.close), list(k_data.high), list(k_data.low), # width=0.6, colorup='r', colordown='b', alpha=0.75 # ) #mpf.plot(k_data,type='candle') plt.grid(True) plt.setp(plt.gca().get_xticklabels(), rotation=30) return dates def plot_lines(ax, fx_plot, fx_offset): # 绘制笔和线段 # ax 绘图区域 # fx_plot plt.plot(fx_offset, fx_plot, 'k', lw=1) plt.plot(fx_offset, fx_plot, 'o') def plot_pivot(ax, pivot_date_interval, pivot_price_interval): # # 绘制中枢 start_point = (pivot_date_interval[0], pivot_price_interval[0]) width = pivot_date_interval[1] - pivot_date_interval[0] height = pivot_price_interval[1] - pivot_price_interval[0] print( "中枢:", start_point, # (x,y) width, # width height, # height ) plt.gca().add_patch( patches.Rectangle( start_point, # (x,y) width, # width height, # height linewidth=8, edgecolor='g', facecolor='none' ) ) return def plot_all(select_deta=10,price_percent=0.01):#select_deta 中枢最小间隔 price_percent幅度小于某个值 k_series = get_k_series() kk=k_series if(len(kk)<10): print('k线数量不足') return fig = plt.figure(figsize=(50, 20)) plt.rcParams.update({'figure.max_open_warning': 0}) ax2= fig.add_subplot(212) print("处理K线...") adjusted_k_data = adjust_by_cintainment(k_series) plot_k_series(ax2,adjusted_k_data) # 调整后的k线图 fx_type, fx_time, fx_data, fx_plot, fx_offset = get_fx(adjusted_k_data) print (fx_type, fx_time) plot_lines(ax2, fx_plot, fx_offset) pivot_x_interval, pivot_price_interval,now_index = None,None,len(fx_offset)-2 #last_pivot_x_interval, last_pivot_price_interval,last_index = get_pivot(fx_plot, fx_offset, now_index) while now_index >= 1: pivot_x_interval, pivot_price_interval,now_index = get_pivot(fx_plot, fx_offset, now_index) if pivot_x_interval[0] == -1: break else: if pivot_x_interval[1] - pivot_x_interval[0] < select_deta: print("pivot_x_interval[1] - pivot_x_interval[0] < select_deta") continue hhv = max(k_series.high[int(pivot_x_interval[0]):int(pivot_x_interval[1])]) llv = min(k_series.low[int(pivot_x_interval[0]):int(pivot_x_interval[1])]) hhv_deta = abs((hhv - pivot_price_interval[1]) / pivot_price_interval[1]) llv_deta = abs((llv - pivot_price_interval[0]) / pivot_price_interval[0]) if (hhv_deta > price_percent or llv_deta > price_percent): print(" (hhv_deta > 1.0 / price_percent or llv_deta > 1.0 / price_percent)") continue plot_pivot(ax2, pivot_x_interval, pivot_price_interval) plot_all( select_deta = 2,#中枢最小间隔 price_percent = 0.5#幅度小于某个值 )
处理K线... 绘制K线 [0, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 0] ['2020-09-22 16:00:00', '2020-09-22 17:00:00', '2020-09-23 01:00:00', '2020-09-23 07:00:00', '2020-09-23 13:00:00', '2020-09-23 15:00:00', '2020-09-23 17:00:00', '2020-09-23 21:00:00', '2020-09-24 03:00:00', '2020-09-24 05:00:00', '2020-09-24 17:00:00', '2020-09-24 19:00:00', '2020-09-24 23:00:00', '2020-09-25 10:00:00', '2020-09-25 20:00:00', '2020-09-26 01:00:00', '2020-09-26 06:00:00', '2020-09-26 10:00:00', '2020-09-26 18:00:00', '2020-09-26 22:00:00', '2020-09-27 01:00:00', '2020-09-27 12:00:00', '2020-09-27 16:00:00', '2020-09-27 18:00:00', '2020-09-28 02:00:00', '2020-09-28 09:00:00', '2020-09-28 13:00:00', '2020-09-28 15:00:00', '2020-09-28 17:00:00', '2020-09-28 19:00:00', '2020-09-28 21:00:00', '2020-09-29 02:00:00', '2020-09-29 04:00:00', '2020-09-29 06:00:00', '2020-09-29 11:00:00', '2020-09-29 17:00:00', '2020-09-30 00:00:00', '2020-09-30 09:00:00', '2020-09-30 17:00:00', '2020-09-30 20:00:00', '2020-10-01 00:00:00'] 中枢: (151.5, 10681.6) 43.0 30.299999999999272 中枢: (126.0, 10855.3) 22.0 31.80000000000109 中枢: (43.0, 10702.8) 78.0 31.80000000000109 中枢: (27.0, 10270.8) 9.0 70.60000000000036 中枢: (0, 10450.0) 24.0 19.549999999999272 <Figure size 3600x1440 with 1 Axes>
mb9177그리고,
설교괜찮은 느낌이지만 코드가 좀 엉망이고, 연산 펜과 중앙에 대한 방법을 이해하지 못해서, 더 자세히 배우려고 합니다. 또한, 미래에 함수가 있을 가능성이 있는지 물어보십시오.
초목흠, 이미지에 기반한 설명을 코드로 변환하는 것은 여전히 어렵습니다.
abc_quant역사적인 부문의 계산의 중심에 따라 미래 함수는 존재하지 않습니다.