【策略构建】回测无法抓取股票
由bqercbbx创建,最终由small_q 被浏览 10 用户
老师们好,我本意是想构建一个滚动10年PE分位点的策略,即在PE分位点低时买入,高时卖出,并仅筛选市值大于500亿以上的股票,以下是我的代码,因没有编程基础,经过一顿折腾终于不报错了,但目前回测时无法显示股票成交,显示没有信号。以下是代码,麻烦老师们看看,谢谢:
import numpy as np # 👈 新增的NumPy导入
from bigmodule import M
import dai
import pandas as pd
print("""\n
=======================
开始数据验证流程 (共6步)
=======================
""")
# 步骤1:获取股票列表
try:
print("步骤1/6 正在获取股票列表...")
m2_raw = dai.query(
"SELECT instrument, date FROM cn_stock_instruments",
filters={"date": ["2005-01-01", "2024-12-31"]}
).df()
if m2_raw.empty:
print("❌ 问题:股票列表为空!")
else:
print(f"✅ 成功获取股票数据,共 {len(m2_raw)} 条记录")
print(m2_raw.head(3))
except Exception as e:
print(f"❌ 步骤1出错:{str(e)}")
# 步骤2验证:获取行业信息
try:
print("\n步骤2/6 正在获取行业数据...")
m3_industry = dai.query(
"SELECT instrument, date, sw2021_level1 AS industry FROM cn_stock_prefactors",
filters={"date": ["2005-01-01", "2024-12-31"]}
).df()
if m3_industry.empty:
print("❌ 问题:行业数据为空!请确认sw2021_level1字段是否存在")
else:
print(f"✅ 成功获取行业数据,共 {len(m3_industry)} 条记录")
print("行业分布示例:")
print(m3_industry['industry'].value_counts().head(5))
except Exception as e:
print(f"❌ 步骤2出错:{str(e)}")
# 步骤3验证:获取财务数据
try:
print("\n步骤3/6 正在获取财务指标...")
m4_features = dai.query(
"SELECT instrument, date, pe_ttm, total_market_cap, float_market_cap FROM cn_stock_prefactors",
filters={"date": ["2005-01-01", "2024-12-31"]}
).df()
if m4_features.empty:
print("❌ 问题:财务数据为空!请检查pe_ttm等字段名称")
else:
print(f"✅ 成功获取财务数据,共 {len(m4_features)} 条记录")
print("PE值分布:最小值{:.2f},中位数{:.2f},最大值{:.2f}".format(
m4_features['pe_ttm'].min(),
m4_features['pe_ttm'].median(),
m4_features['pe_ttm'].max()
))
except Exception as e:
print(f"❌ 步骤3出错:{str(e)}")
# 步骤4验证:数据合并
try:
print("\n步骤4/6 正在合并数据...")
if 'm4_features' not in locals() or 'm3_industry' not in locals():
print("❌ 合并失败:缺少前置数据")
else:
m2_combined = m4_features.merge(
m3_industry,
on=["instrument", "date"],
how="left"
)
print(f"✅ 合并完成,总记录数:{len(m2_combined)}")
print("合并后字段:", list(m2_combined.columns))
print("缺失行业数量:", m2_combined['industry'].isnull().sum())
except Exception as e:
print(f"❌ 步骤4出错:{str(e)}")
# 步骤5验证:筛选市值大于500亿
try:
print("\n步骤5/6 正在筛选市值大于500亿...")
if 'm2_combined' not in locals() or m2_combined.empty:
print("❌ 筛选失败:输入数据为空")
else:
# 动态筛选(假设市值为人民币元)
m2_combined = m2_combined[m2_combined['total_market_cap'] > 5e10] # 500亿=5e11元
# 验证结果
if m2_combined.empty:
print("❌ 问题:没有股票满足市值条件!可能原因:")
print("1. 市值单位错误(如应为万元但被误读为元)")
print("2. 筛选时间段内无大市值股票")
print("3. total_market_cap字段存在空值")
else:
print(f"✅ 筛选成功,剩余记录数:{len(m2_combined)}")
print("最新市值分布(亿元):")
print((m2_combined['total_market_cap'] / 1e8).describe().apply(lambda x: round(x,2)))
# 验证典型大盘股存在
sample_check = m2_combined[m2_combined['instrument'].isin(['600519.SH','601318.SH','300750.SZ'])]
if not sample_check.empty:
print("\n包含龙头股示例:")
print(sample_check[['instrument','date','total_market_cap']].head())
else:
print("⚠️ 警告:未包含贵州茅台/中国平安/宁德时代等典型大盘股")
except Exception as e:
print(f"❌ 步骤5出错:{str(e)}")
# 步骤6验证:PE分位计算
try:
print("\n步骤6/6 正在计算PE分位点...")
if 'm2_combined' not in locals() or m2_combined.empty:
print("❌ 无法计算:输入数据为空")
else:
def safe_process(df):
# 添加调试输出
print("正在处理 {} 条记录".format(len(df)))
print("有效PE数量:", df['pe_ttm'].notnull().sum())
df['pe_rank'] = df.groupby('instrument')['pe_ttm'].transform(
lambda x: x.rolling(2520, min_periods=1).apply(
lambda s: s.rank(pct=True).iloc[-1] if not s.isnull().all() else None
)
)
print("计算完成后PE分位点分布:")
print(df['pe_rank'].describe())
filtered = df[(df['pe_rank'] < 0.15) | (df['pe_rank'] > 0.80)]
print("筛选条件生效后剩余记录数:", len(filtered))
return filtered
m5_processed = safe_process(m2_combined.copy())
if m5_processed.empty:
print("❌ 最终数据为空!可能原因:")
print("1. PE值全为空或计算错误")
print("2. 没有股票满足分位点条件")
print("3. 滚动窗口设置过大(2520约10年数据)")
else:
print("✅ 最终数据验证通过!")
print(m5_processed.head(3))
except Exception as e:
print(f"❌ 步骤6出错:{str(e)}")
print("""\n
=======================
验证完成!请根据上述输出:
1. 查找带有 ❌ 的错误提示
2. 确认每个✅步骤的数据量是否正常
3. 特别注意步骤5和步骤6的输出
=======================
""")
# ================= 新增回测模块 =================
# 策略逻辑定义
strategy_logic = {
'initialize': '''
def initialize(context):
context.set_commission(0.0003)
context.set_slippage(0.0005)
# 将 'datetime' 列重命名为 'date' 以符合 'handle_data' 的需求
context.signal_data = context.get_data("signals").rename(columns={'datetime':'date'})
''',
'handle_data': '''
def handle_data(context, data):
current_date = context.current_dt.strftime('%%Y-%%m-%%d')
signals = context.signal_data[context.signal_data['date'] == current_date]
for _, row in signals.iterrows():
if row['signal'] == 1:
context.order_target_percent(row.instrument, 0.1)
elif row['signal'] == -1:
context.order_target_percent(row.instrument, 0)
'''
}
# 生成信号数据
try:
print("\n生成交易信号...")
signals_df = m5_processed.assign(
signal=lambda x: np.where(x.pe_rank < 0.15, 1,
np.where(x.pe_rank > 0.8, -1, 0))
)[['instrument', 'date', 'signal']].dropna()
# 以正确方式将 'date' 列转换为 datetime
signals_df['date'] = pd.to_datetime(signals_df['date'])
print(f"✅ 信号数据生成成功,共 {len(signals_df)} 条记录")
print("字段列表:", list(signals_df.columns)) # 应显示['instrument','date','signal']
except Exception as e:
print(f"❌ 信号生成失败:{str(e)}")
raise
def initialize(context):
from bigtrader.finance.commission import PerOrder
context.set_commission(
PerOrder(
buy_cost=0.0003, # 买入佣金
sell_cost=0.0003, # 卖出佣金
min_cost=5 # 最小佣金
)
)
context.set_slippage(0.0005)
# 直接将信号数据传递到 context 中
context.signal_data = context.data.rename(columns={'datetime': 'date'})
def handle_data(context, data):
current_date = context.current_dt
signals = context.signal_data[context.signal_data['date'] == current_date]
for _, row in signals.iterrows():
if row['signal'] == 1:
context.order_target_percent(row.instrument, 0.1)
elif row['signal'] == -1:
context.order_target_percent(row.instrument, 0)
strategy_logic = {
'initialize': initialize,
'handle_data': handle_data
}
# 执行回测
try:
print("\n启动回测引擎...")
m1 = M.bigtrader.v38(
start_date="2015-01-01",
end_date="2024-12-31",
capital_base=100000000,
frequency="daily",
product_type="股票",
benchmark="000300.SH",
data=signals_df, # 数据直接从这里传入
handle_data=strategy_logic['handle_data'],
initialize=strategy_logic['initialize'],
volume_limit=0.1,
order_price_field_buy="open",
order_price_field_sell="close",
plot_charts=True,
m_name="PE分位策略-最终版"
)
print("✅ 回测任务已提交")
except Exception as e:
print(f"❌ 回测启动失败:{str(e)}")
raise
\