import pandas as pd from binance.client import AsyncClient from datetime import datetime, timedelta import aiohttp import json def utc_to_local(utc_dt):#转换为北京时间 local_tz = datetime.timezone(datetime.timedelta(hours=8)) # 东八区时差 local_dt = utc_dt.replace(tzinfo=datetime.timezone.utc).astimezone(local_tz) return local_dt
async def init_client(): client = await AsyncClient.create(api_key=api_key, api_secret=api_secret)
return client
async def get_klines(client, symbol, start_time, end_time, interval): klines = await client.futures_klines(symbol=symbol, interval=interval, startTime=start_time.timestamp()*1000, endTime=end_time.timestamp()*1000) df = pd.DataFrame(klines, columns=[‘timestamp’, ‘open’, ‘high’, ‘low’, ‘close’, ‘volume’, ‘close_time’, ‘quote_asset_volume’, ‘number_of_trades’, ‘taker_buy_base_asset_volume’, ‘taker_buy_quote_asset_volume’, ‘ignore’]) df[‘timestamp’] = pd.to_datetime(df[‘timestamp’], unit=‘ms’) df[‘close_time’] = pd.to_datetime(df[‘close_time’], unit=‘ms’) df.set_index(‘timestamp’, inplace=True) df.drop(columns=[‘close_time’, ‘ignore’], inplace=True) df = df.astype(‘float’) return df
async def close_client(client): await client.close_connection()
async def main(): client = await init_client() # 获取所有 USDT 永续合约的交易对 exchange_info = await client.futures_exchange_info() symbols = [symbol_info[‘symbol’] for symbol_info in exchange_info[‘symbols’] if symbol_info[‘contractType’] == ‘PERPETUAL’ and symbol_info[‘quoteAsset’] == ‘USDT’] # 创建空DataFrame df = pd.DataFrame(columns=[‘Symbol’, ‘Open’, ‘High’, ‘Low’, ‘Close’, ‘Change’, ‘Volume’]) df.set_index(‘Symbol’, inplace=True)
# 将所有交易对添加到 DataFrame 中
for symbol in symbols:
df.loc[symbol] = [None] * len(df.columns)
# 遍历所有交易对
for symbol in symbols:
# 设置起止时间
start_time = datetime.utcnow() - timedelta(minutes=16)#开始时间为16分钟前
end_time = datetime.utcnow()
try:
# 获取M15k线
current_klines = await get_klines(client, symbol, end_time - timedelta(minutes=15), end_time, '15m')
except Exception as e:
Log(f"An error occurred: {e}")
current_klines = []
# 将数据存入 DataFrame
# 更新对应的行
df.loc[symbol, 'Open'] = current_klines['open'].iloc[-1]
df.loc[symbol, 'High'] = current_klines['high'].iloc[-1]
df.loc[symbol, 'Low'] = current_klines['low'].iloc[-1]
df.loc[symbol, 'Close'] = current_klines['close'].iloc[-1]
df.loc[symbol, 'Change'] = current_change
df.loc[symbol, 'Volume'] = current_klines['volume'].iloc[-1]
# 关闭客户端
Log(df)
await close_client(client)
if name == ‘main’: import asyncio asyncio.run(main())
Log(df)一次之后 就出错了;就是说更新一次所有symbol的价格等数据后出错:Traceback (most recent call last): File “”, line 1246, in init_ctx File “”, line 147, in TypeError: Object of type coroutine is not JSON serializable sys:1: RuntimeWarning: coroutine ‘main’ was never awaited RuntimeWarning: Enable tracemalloc to get the object allocation traceback
没有如果 可以通过多线程去收集,将每个线程收集的到数据放在一个 公共的对象里;然后判断 所有线程有没有结束
lzhqlj 关于聚合函数 梦总能提点一两个关键字吗
lzhqlj 关于聚合函数 梦总能提点一两个关键字吗
小草 没用过这个库,不过最好一步一步来调试。另外直接并发上百个交易对也不太好。用聚合行情接口自己记录下更简单
小草 /fapi/v1/ticker/price 币安文档有