【代码报错】策略运行报错崩溃
由bqn737zt创建,最终由hxgre 被浏览 7 用户
运行我编写的行业轮动模型,系统一直崩溃。我的回测系统是D2,请老师帮我检查一下原因?
from bigquant import bigtrader
from bigquant import dai
import pandas as pd
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.rebalance_period = bigtrader.MonthlyRebalance(roll_forward=True, context=context) # 月度调仓
context.logger.info("开始计算行业轮动选股数据...")
# 1. 查询股票和行业数据
df = dai.query("""
SELECT date, instrument, sw2021_level2, sw2021_level2_name, close, pe_ttm
FROM cn_stock_prefactors
""", filters={"date": [context.add_trading_days(context.start_date, -365), context.end_date]}).df()
df_inds = dai.query("""
SELECT
date, industry_code, industry_name, pe_ttm
FROM
cn_stock_industry_valuation
WHERE industry = 'sw2021'
AND industry_level = 2
""", filters={"date": [context.add_trading_days(context.start_date, -365), context.end_date]}).df()
# 2. 数据合并
df2 = df.merge(df_inds[['date','industry_code','pe_ttm']],
left_on=['date', 'sw2021_level2'],
right_on=['date', 'industry_code'],
how='left', suffixes=('_stock', '_industry'))
df2.drop(columns=['industry_code'], inplace=True)
# 3. 筛选行业股票数量大于10的行业
stocks_count = df2.groupby(['date','sw2021_level2']).count().reset_index()
big_industry_dates = stocks_count[stocks_count['instrument']>10]['date'].unique()
df3 = df2[df2['date'].isin(big_industry_dates)]
big_industry = stocks_count[stocks_count['instrument']>10]['sw2021_level2'].unique()
df4 = df3[df3.sw2021_level2.isin(big_industry)]
inds = df_inds.sort_values(['industry_name','date']).reset_index(drop=True)
inds1 = inds[inds.industry_code.isin(big_industry)]
inds2 = inds1[(inds1['pe_ttm'] != 0)]
# 4. 计算60日行业PE的最大和最小值
tmp_pe = inds2.groupby('industry_code').rolling(60, min_periods=60).pe_ttm
pe_max = tmp_pe.max().rename('pe_max')
pe_min = tmp_pe.min().rename('pe_min')
tmp_df = pd.concat([pe_max, pe_min], axis=1)
tmp_df = tmp_df.reset_index(level=0) # 将 industry_code 变回列
pe_df = pd.merge(inds2, tmp_df, on=['industry_code'], how='left').dropna() # 使用 merge 基于 'industry_code' 合并
# 5. 计算 PE 在60日区间所处的位置
print("Columns in pe_df before calculating pe_position:", pe_df.columns) # 添加这行代码
pe_df['pe_position'] = round((pe_df.pe_ttm - pe_df.pe_min)/(pe_df.pe_max - pe_min),2)
# 6. 获取月末交易日
trading_days_df = dai.query("""
SELECT DISTINCT date
FROM cn_stock_industry_valuation
WHERE date BETWEEN $start_date AND $end_date
ORDER BY date ASC
""", params={'start_date': context.start_date, 'end_date': context.end_date}).df()
trading_days_df['date'] = pd.to_datetime(trading_days_df['date'])
last_trading_days = trading_days_df.groupby(trading_days_df['date'].dt.to_period('M'))['date'].max().tolist()
last_trading_days_dt = pd.to_datetime(last_trading_days)
month_pe = pe_df[pe_df['date'].isin(last_trading_days_dt)].copy()
# 7. 选出每月pe_position最小的6个行业
sort_func = lambda df_group, n : df_group.sort_values('pe_position').head(n)
top_six = month_pe.groupby('date').apply(sort_func, n=6)
# 8. 定义选股函数
def select_stocks(industry_codes_series, industry_stocks_df, stocks_pe_df, date):
month_portfolio = []
for industry_code in industry_codes_series:
if pd.isna(industry_code):
continue
stocks = industry_stocks_df[industry_stocks_df.sw2021_level2 == str(industry_code)]['instrument'].unique()
date_str = pd.to_datetime(date).strftime('%Y-%m-%d')
stocks_pe_df_index_str_list = stocks_pe_df.index.strftime('%Y-%m-%d').tolist()
if date_str not in stocks_pe_df_index_str_list:
continue
available_stocks_pe = stocks_pe_df.loc[date_str, stocks_pe_df.columns.isin(stocks)]
sorted_pe_series = available_stocks_pe.sort_values(ascending=True)
selected_stocks = sorted_pe_series.index[:min(5, len(sorted_pe_series))] # 每行业最多5只
month_portfolio.extend(selected_stocks.tolist())
# 确保每次选出的总股票数量为30(如果不足30,则买入实有数量)
if len(month_portfolio) > 30:
month_portfolio = month_portfolio[:30]
return month_portfolio
# 9. 应用选股函数,获取每月持仓股票
portfolio_dict = {}
for date in top_six.index:
industry_codes_val = top_six.loc[date, 'industry_code']
industry_codes_series = pd.Series(industry_codes_val) if not isinstance(industry_codes_val, pd.Series) else industry_codes_val
portfolio_dict[date] = select_stocks(
industry_codes_series,
industry_stocks_df=df4,
stocks_pe_df=pe_df,
date=date
)
portfolio_series = pd.Series(portfolio_dict)
# 10. 整理context.data
context_data_list = []
for date, instruments in portfolio_series.items():
if not instruments:
context.logger.warning(f"Date {date} has no stocks selected, skipping.")
continue
weight = 1.0 / len(instruments) if instruments else 0.0
for instrument in instruments:
context_data_list.append({'date': date, 'instrument': instrument, 'weight': weight})
context.data = pd.DataFrame(context_data_list)
context.logger.info(f"行业轮动选股数据计算完成,共选择了 {len(context.data)} 条股票数据")
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
bigtrader.HandleDataLib.handle_data_weight_based(context, data)
performance = bigtrader.run(
market=bigtrader.Market.CN_STOCK,
frequency=bigtrader.Frequency.DAILY,
instruments=[],
start_date="2020-01-01",
end_date="2025-04-07",
capital_base=100000,
initialize=initialize,
handle_data=handle_data,
benchmark='510300.SH', # 沪深300ETF
)
performance.render()
\