问答交流

【策略构建】回测无法抓取股票

由bqercbbx创建,最终由small_q 被浏览 10 用户

老师们好,我本意是想构建一个滚动10年PE分位点的策略,即在PE分位点低时买入,高时卖出,并仅筛选市值大于500亿以上的股票,以下是我的代码,因没有编程基础,经过一顿折腾终于不报错了,但目前回测时无法显示股票成交,显示没有信号。以下是代码,麻烦老师们看看,谢谢:

import numpy as np  # 👈 新增的NumPy导入
from bigmodule import M
import dai
import pandas as pd

print("""\n
=======================
开始数据验证流程 (共6步)
=======================
""")

# 步骤1:获取股票列表
try:
    print("步骤1/6 正在获取股票列表...")
    m2_raw = dai.query(
        "SELECT instrument, date FROM cn_stock_instruments",
        filters={"date": ["2005-01-01", "2024-12-31"]}
    ).df()
    
    if m2_raw.empty:
        print("❌ 问题:股票列表为空!")
    else:
        print(f"✅ 成功获取股票数据,共 {len(m2_raw)} 条记录")
        print(m2_raw.head(3))
except Exception as e:
    print(f"❌ 步骤1出错:{str(e)}")

# 步骤2验证:获取行业信息
try:
    print("\n步骤2/6 正在获取行业数据...")
    m3_industry = dai.query(
        "SELECT instrument, date, sw2021_level1 AS industry FROM cn_stock_prefactors",
        filters={"date": ["2005-01-01", "2024-12-31"]}
    ).df()
    
    if m3_industry.empty:
        print("❌ 问题:行业数据为空!请确认sw2021_level1字段是否存在")
    else:
        print(f"✅ 成功获取行业数据,共 {len(m3_industry)} 条记录")
        print("行业分布示例:")
        print(m3_industry['industry'].value_counts().head(5))
except Exception as e:
    print(f"❌ 步骤2出错:{str(e)}")

# 步骤3验证:获取财务数据
try:
    print("\n步骤3/6 正在获取财务指标...")
    m4_features = dai.query(
        "SELECT instrument, date, pe_ttm, total_market_cap, float_market_cap FROM cn_stock_prefactors",
        filters={"date": ["2005-01-01", "2024-12-31"]}
    ).df()
    
    if m4_features.empty:
        print("❌ 问题:财务数据为空!请检查pe_ttm等字段名称")
    else:
        print(f"✅ 成功获取财务数据,共 {len(m4_features)} 条记录")
        print("PE值分布:最小值{:.2f},中位数{:.2f},最大值{:.2f}".format(
            m4_features['pe_ttm'].min(),
            m4_features['pe_ttm'].median(),
            m4_features['pe_ttm'].max()
        ))
except Exception as e:
    print(f"❌ 步骤3出错:{str(e)}")

# 步骤4验证:数据合并
try:
    print("\n步骤4/6 正在合并数据...")
    if 'm4_features' not in locals() or 'm3_industry' not in locals():
        print("❌ 合并失败:缺少前置数据")
    else:
        m2_combined = m4_features.merge(
            m3_industry, 
            on=["instrument", "date"], 
            how="left"
        )
        print(f"✅ 合并完成,总记录数:{len(m2_combined)}")
        print("合并后字段:", list(m2_combined.columns))
        print("缺失行业数量:", m2_combined['industry'].isnull().sum())
except Exception as e:
    print(f"❌ 步骤4出错:{str(e)}")

# 步骤5验证:筛选市值大于500亿
try:
    print("\n步骤5/6 正在筛选市值大于500亿...")
    if 'm2_combined' not in locals() or m2_combined.empty:
        print("❌ 筛选失败:输入数据为空")
    else:
        # 动态筛选(假设市值为人民币元)
        m2_combined = m2_combined[m2_combined['total_market_cap'] > 5e10]  # 500亿=5e11元
        
        # 验证结果
        if m2_combined.empty:
            print("❌ 问题:没有股票满足市值条件!可能原因:")
            print("1. 市值单位错误(如应为万元但被误读为元)")
            print("2. 筛选时间段内无大市值股票")
            print("3. total_market_cap字段存在空值")
        else:
            print(f"✅ 筛选成功,剩余记录数:{len(m2_combined)}")
            print("最新市值分布(亿元):")
            print((m2_combined['total_market_cap'] / 1e8).describe().apply(lambda x: round(x,2)))
            
            # 验证典型大盘股存在
            sample_check = m2_combined[m2_combined['instrument'].isin(['600519.SH','601318.SH','300750.SZ'])]
            if not sample_check.empty:
                print("\n包含龙头股示例:")
                print(sample_check[['instrument','date','total_market_cap']].head())
            else:
                print("⚠️ 警告:未包含贵州茅台/中国平安/宁德时代等典型大盘股")

except Exception as e:
    print(f"❌ 步骤5出错:{str(e)}")

# 步骤6验证:PE分位计算
try:
    print("\n步骤6/6 正在计算PE分位点...")
    if 'm2_combined' not in locals() or m2_combined.empty:
        print("❌ 无法计算:输入数据为空")
    else:
        def safe_process(df):
            # 添加调试输出
            print("正在处理 {} 条记录".format(len(df)))
            print("有效PE数量:", df['pe_ttm'].notnull().sum())
            
            df['pe_rank'] = df.groupby('instrument')['pe_ttm'].transform(
                lambda x: x.rolling(2520, min_periods=1).apply(
                    lambda s: s.rank(pct=True).iloc[-1] if not s.isnull().all() else None
                )
            )
            print("计算完成后PE分位点分布:")
            print(df['pe_rank'].describe())
            
            filtered = df[(df['pe_rank'] < 0.15) | (df['pe_rank'] > 0.80)]
            print("筛选条件生效后剩余记录数:", len(filtered))
            return filtered
        
        m5_processed = safe_process(m2_combined.copy())
        
        if m5_processed.empty:
            print("❌ 最终数据为空!可能原因:")
            print("1. PE值全为空或计算错误")
            print("2. 没有股票满足分位点条件")
            print("3. 滚动窗口设置过大(2520约10年数据)")
        else:
            print("✅ 最终数据验证通过!")
            print(m5_processed.head(3))
except Exception as e:
    print(f"❌ 步骤6出错:{str(e)}")

print("""\n
=======================
验证完成!请根据上述输出:
1. 查找带有 ❌ 的错误提示
2. 确认每个✅步骤的数据量是否正常
3. 特别注意步骤5和步骤6的输出
=======================
""")

# ================= 新增回测模块 =================
# 策略逻辑定义
strategy_logic = {
    'initialize': '''
def initialize(context):
    context.set_commission(0.0003)
    context.set_slippage(0.0005)
    # 将 'datetime' 列重命名为 'date' 以符合 'handle_data' 的需求
    context.signal_data = context.get_data("signals").rename(columns={'datetime':'date'})
''',
    
    'handle_data': '''
def handle_data(context, data):
    current_date = context.current_dt.strftime('%%Y-%%m-%%d')
    signals = context.signal_data[context.signal_data['date'] == current_date]
    
    for _, row in signals.iterrows():
        if row['signal'] == 1:
            context.order_target_percent(row.instrument, 0.1)
        elif row['signal'] == -1:
            context.order_target_percent(row.instrument, 0)
'''
}

# 生成信号数据
try:
    print("\n生成交易信号...")
    signals_df = m5_processed.assign(
        signal=lambda x: np.where(x.pe_rank < 0.15, 1,
                                  np.where(x.pe_rank > 0.8, -1, 0))
    )[['instrument', 'date', 'signal']].dropna()

    # 以正确方式将 'date' 列转换为 datetime
    signals_df['date'] = pd.to_datetime(signals_df['date'])
    
    print(f"✅ 信号数据生成成功,共 {len(signals_df)} 条记录")
    print("字段列表:", list(signals_df.columns))  # 应显示['instrument','date','signal']
except Exception as e:
    print(f"❌ 信号生成失败:{str(e)}")
    raise

def initialize(context):
    from bigtrader.finance.commission import PerOrder
    context.set_commission(
        PerOrder(
            buy_cost=0.0003,    # 买入佣金
            sell_cost=0.0003,   # 卖出佣金
            min_cost=5          # 最小佣金
        )
    )
    context.set_slippage(0.0005)
    
    # 直接将信号数据传递到 context 中
    context.signal_data = context.data.rename(columns={'datetime': 'date'})

def handle_data(context, data):
    current_date = context.current_dt
    signals = context.signal_data[context.signal_data['date'] == current_date]
    
    for _, row in signals.iterrows():
        if row['signal'] == 1:
            context.order_target_percent(row.instrument, 0.1)
        elif row['signal'] == -1:
            context.order_target_percent(row.instrument, 0)

strategy_logic = {
    'initialize': initialize,
    'handle_data': handle_data
}

# 执行回测
try:
    print("\n启动回测引擎...")
    m1 = M.bigtrader.v38(
        start_date="2015-01-01",
        end_date="2024-12-31",
        capital_base=100000000,
        frequency="daily",
        product_type="股票",
        benchmark="000300.SH",
        data=signals_df,  # 数据直接从这里传入
        handle_data=strategy_logic['handle_data'],
        initialize=strategy_logic['initialize'],
        volume_limit=0.1,
        order_price_field_buy="open",
        order_price_field_sell="close",
        plot_charts=True,
        m_name="PE分位策略-最终版"
    )
    print("✅ 回测任务已提交")
except Exception as e:
    print(f"❌ 回测启动失败:{str(e)}")
    raise

\

标签

股票筛选
{link}