问答交流

【代码报错】策略运行报错崩溃

由bqn737zt创建,最终由hxgre 被浏览 7 用户

运行我编写的行业轮动模型,系统一直崩溃。我的回测系统是D2,请老师帮我检查一下原因?

from bigquant import bigtrader
from bigquant import dai
import pandas as pd

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    context.rebalance_period = bigtrader.MonthlyRebalance(roll_forward=True, context=context) # 月度调仓

    context.logger.info("开始计算行业轮动选股数据...")

    # 1. 查询股票和行业数据
    df = dai.query("""
        SELECT  date, instrument, sw2021_level2, sw2021_level2_name, close, pe_ttm
        FROM  cn_stock_prefactors
    """, filters={"date": [context.add_trading_days(context.start_date, -365), context.end_date]}).df()

    df_inds = dai.query("""
        SELECT
            date, industry_code, industry_name, pe_ttm
        FROM
            cn_stock_industry_valuation
        WHERE industry = 'sw2021'
        AND industry_level = 2
    """, filters={"date": [context.add_trading_days(context.start_date, -365), context.end_date]}).df()

    # 2. 数据合并
    df2 = df.merge(df_inds[['date','industry_code','pe_ttm']],
                         left_on=['date', 'sw2021_level2'],
                         right_on=['date', 'industry_code'],
                         how='left', suffixes=('_stock', '_industry'))
    df2.drop(columns=['industry_code'], inplace=True)

    # 3. 筛选行业股票数量大于10的行业
    stocks_count = df2.groupby(['date','sw2021_level2']).count().reset_index()
    big_industry_dates = stocks_count[stocks_count['instrument']>10]['date'].unique()
    df3 = df2[df2['date'].isin(big_industry_dates)]
    big_industry = stocks_count[stocks_count['instrument']>10]['sw2021_level2'].unique()
    df4 = df3[df3.sw2021_level2.isin(big_industry)]

    inds = df_inds.sort_values(['industry_name','date']).reset_index(drop=True)
    inds1 = inds[inds.industry_code.isin(big_industry)]
    inds2 = inds1[(inds1['pe_ttm'] != 0)]

    # 4. 计算60日行业PE的最大和最小值
    tmp_pe = inds2.groupby('industry_code').rolling(60, min_periods=60).pe_ttm
    pe_max = tmp_pe.max().rename('pe_max')
    pe_min = tmp_pe.min().rename('pe_min')
    tmp_df = pd.concat([pe_max, pe_min], axis=1)
    tmp_df = tmp_df.reset_index(level=0) # 将 industry_code 变回列
    pe_df = pd.merge(inds2, tmp_df, on=['industry_code'], how='left').dropna() # 使用 merge 基于 'industry_code' 合并


    # 5. 计算 PE 在60日区间所处的位置
    print("Columns in pe_df before calculating pe_position:", pe_df.columns) # 添加这行代码
    pe_df['pe_position'] = round((pe_df.pe_ttm - pe_df.pe_min)/(pe_df.pe_max - pe_min),2)

    # 6. 获取月末交易日
    trading_days_df = dai.query("""
        SELECT DISTINCT date
        FROM cn_stock_industry_valuation
        WHERE date BETWEEN $start_date AND $end_date
        ORDER BY date ASC
    """, params={'start_date': context.start_date, 'end_date': context.end_date}).df()
    trading_days_df['date'] = pd.to_datetime(trading_days_df['date'])
    last_trading_days = trading_days_df.groupby(trading_days_df['date'].dt.to_period('M'))['date'].max().tolist()
    last_trading_days_dt = pd.to_datetime(last_trading_days)
    month_pe = pe_df[pe_df['date'].isin(last_trading_days_dt)].copy()

    # 7. 选出每月pe_position最小的6个行业
    sort_func = lambda df_group, n : df_group.sort_values('pe_position').head(n)
    top_six = month_pe.groupby('date').apply(sort_func, n=6)

    # 8. 定义选股函数
    def select_stocks(industry_codes_series, industry_stocks_df, stocks_pe_df, date):
        month_portfolio = []
        for industry_code in industry_codes_series:
            if pd.isna(industry_code):
                continue
            stocks = industry_stocks_df[industry_stocks_df.sw2021_level2 == str(industry_code)]['instrument'].unique()

            date_str = pd.to_datetime(date).strftime('%Y-%m-%d')
            stocks_pe_df_index_str_list = stocks_pe_df.index.strftime('%Y-%m-%d').tolist()
            if date_str not in stocks_pe_df_index_str_list:
                continue

            available_stocks_pe = stocks_pe_df.loc[date_str, stocks_pe_df.columns.isin(stocks)]
            sorted_pe_series = available_stocks_pe.sort_values(ascending=True)
            selected_stocks = sorted_pe_series.index[:min(5, len(sorted_pe_series))]  # 每行业最多5只
            month_portfolio.extend(selected_stocks.tolist())

        # 确保每次选出的总股票数量为30(如果不足30,则买入实有数量)
        if len(month_portfolio) > 30:
            month_portfolio = month_portfolio[:30]
        return month_portfolio

    # 9. 应用选股函数,获取每月持仓股票
    portfolio_dict = {}
    for date in top_six.index:
        industry_codes_val = top_six.loc[date, 'industry_code']
        industry_codes_series = pd.Series(industry_codes_val) if not isinstance(industry_codes_val, pd.Series) else industry_codes_val
        portfolio_dict[date] = select_stocks(
            industry_codes_series,
            industry_stocks_df=df4,
            stocks_pe_df=pe_df,
            date=date
        )

    portfolio_series = pd.Series(portfolio_dict)

    # 10. 整理context.data
    context_data_list = []
    for date, instruments in portfolio_series.items():
        if not instruments:
            context.logger.warning(f"Date {date} has no stocks selected, skipping.")
            continue
        weight = 1.0 / len(instruments) if instruments else 0.0
        for instrument in instruments:
            context_data_list.append({'date': date, 'instrument': instrument, 'weight': weight})

    context.data = pd.DataFrame(context_data_list)
    context.logger.info(f"行业轮动选股数据计算完成,共选择了 {len(context.data)} 条股票数据")


def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
    bigtrader.HandleDataLib.handle_data_weight_based(context, data)

performance = bigtrader.run(
    market=bigtrader.Market.CN_STOCK,
    frequency=bigtrader.Frequency.DAILY,
    instruments=[],
    start_date="2020-01-01",
    end_date="2025-04-07",
    capital_base=100000,
    initialize=initialize,
    handle_data=handle_data,
    benchmark='510300.SH',  # 沪深300ETF
)

performance.render()

\

评论
  • 可以分享一下报错的策略,我们看看具体报错,如下所示:
  • 地址如下:
  • undefined
  • 这行代码有问题,检查下
{link}