BigQuant 2026年度私享会

新版保温杯LGBM带实盘信息版

由bql6ph74创建,最终由bql6ph74 被浏览 19 用户

from bigquant import bigtrader, dai
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
import lightgbm as lgb
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import lightgbm as lgb
from sklearn.datasets import load_breast_cancer   # 示例数据
from sklearn.model_selection import train_test_split
import joblib          # 用于保存模型
import os
import json

# 模型和状态文件路径
MODEL_PATH = '/home/aiuser/work/AIQuANT/lgb模型/models/lgb_model_with_state.txt'
STAGING_FILE = '/home/aiuser/work/AIQuANT/lgb模型/states/portfolio_state.json'

def get_last_portfolio_value():
    """从 JSON 文件读取上次的 portfolio_value"""
    try:
        if os.path.exists(STAGING_FILE):
            with open(STAGING_FILE, 'r') as f:
                data = json.load(f)
                return data.get('last_portfolio_value')
    except Exception as e:
        print(f"读取 portfolio_state.json 失败: {e}")
    return None

def save_portfolio_value(value, additional_stats=None):
    """保存 portfolio_value 和其他状态信息到 JSON 文件"""
    os.makedirs(os.path.dirname(STAGING_FILE), exist_ok=True)
    try:
        # 准备要保存的数据
        save_data = {
            'last_portfolio_value': value,
            'timestamp': datetime.now().isoformat(),
            'model_training_count': additional_stats.get('model_training_count', 0) if additional_stats else 0,
            'trade_count': additional_stats.get('trade_count', 0) if additional_stats else 0,
            'turnover_count': additional_stats.get('turnover_count', 0) if additional_stats else 0,
        }
        
        with open(STAGING_FILE, 'w') as f:
            json.dump(save_data, f, indent=2)
    except Exception as e:
        print(f"保存 portfolio_state.json 失败: {e}")

def initialize(context: bigtrader.IContext):
    from bigtrader.finance.commission import PerOrder
    
    # 提前创建一个简单的ZXJ_fac02表结构
    try:
        import pandas as pd
        import dai
        
        # 创建少量示例数据以建立表结构
        sample_data = pd.DataFrame({
            'date': pd.to_datetime(['2024-01-01']),
            'instrument': ['000001.SZ'],
            'zxj_factor1': [0.0],
            'zxj_factor2': [0.0]
        })
        # 明确设置数据类型以保持一致性
        sample_data = sample_data.astype({'zxj_factor1': 'float64', 'zxj_factor2': 'float64'})
        sample_data[dai.DEFAULT_PARTITION_FIELD] = sample_data["date"].dt.year
        
        # 创建ZXJ_fac02表
        dai.DataSource.write_bdb(
            data=sample_data,
            id="ZXJ_fac02",
            unique_together=["date", "instrument"],
            indexes=["date"],
        )
        print("[INFO] ZXJ_fac02表结构已创建")
    except Exception as e:
        # 如果表已存在,会报错,这属于正常情况
        print(f"[INFO] ZXJ_fac02表结构处理完成 (可能已存在): {str(e)}")

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    context.sql = """
    SELECT
    date,
    instrument,
    
    --市值风格
    cn_stock_prefactors.total_market_cap AS fac0,
    cn_stock_prefactors.float_market_cap AS fac1,
    M_nanstd(cn_stock_prefactors.amount,5) AS fac2,

    --动量反转风格
    M_avg(cn_stock_prefactors.close,5) AS fac3,
    M_avg(cn_stock_prefactors.close,20) AS fac4,
    M_avg(cn_stock_prefactors.close,60) AS fac5,

    --流动性
    M_avg(cn_stock_prefactors.turn,5) AS fac6,
    M_avg(cn_stock_prefactors.turn,20) AS fac7,
    M_avg(cn_stock_prefactors.turn,60) AS fac8,

    --财务指标
    pct_rank_by(date,dividend_yield_ratio) AS fac9,
    pct_rank_by(date,pe_ttm) AS fac10,
    COALESCE(ZXJ_fac02.zxj_factor1, 0) as fac12,
    COALESCE(ZXJ_fac02.zxj_factor2, 0) as fac13,


    list_sector as _l,
    is_risk_warning as _warning,

    cn_stock_prefactors.open AS _o,
    cn_stock_prefactors.close AS _c,
    pct_rank_by(date,M_LEAD(_c,5) / M_LEAD(_o,1)) AS label,

    FROM
        cn_stock_prefactors
    LEFT JOIN
        ZXJ_fac02 using(date,instrument)

    WHERE
        _l !=4
        AND
        list_days > 120
        AND
        is_risk_warning = 0
        
    --QUALIFY COLUMNS(*) IS NOT NULL
    ORDER by date
    """
    
    context.holding_days = 5
    context.target_hold_count = 10

    # 数据集长度
    context.train_days = 480
    #测试集长度
    context.test_days  = 15
    # 预测需要取得天数
    context.pred_days  = 100
    # 重新训练
    context.retrain = 60

    context.feature_list = [
    'fac0',
    'fac1',
    'fac2',
    'fac3',
    'fac4',
    'fac5',
    'fac6',
    'fac7',
    'fac8',
    'fac9',
    'fac10',
    'fac12',
    'fac13',

    ]
    context.label_list   = ['label']
    
    context.pred=[]
    context.sv = 1
    context.ev = 1
    # 初始化统计变量
    context.model_training_count = 0
    context.trade_count = 0
    context.turnover_count = 0


def befor_trading(context, data):
    current_portfolio_value = context.portfolio.portfolio_value

    # 从 json 文件读取上次的 portfolio_value
    last_value = get_last_portfolio_value()

    if last_value is not None:
        chg = current_portfolio_value / last_value
    else:
        # 第一次运行,无法计算 chg
        chg = 1.0

    # 保存当前的 portfolio_value 到 json
    additional_stats = {
        'model_training_count': getattr(context, 'model_training_count', 0),
        'trade_count': getattr(context, 'trade_count', 0),
        'turnover_count': getattr(context, 'turnover_count', 0),
    }
    save_portfolio_value(current_portfolio_value, additional_stats)

    current_date = data.current_dt.strftime("%Y-%m-%d")
    date_str = current_date
    print(f"日期:{current_date}, 当前持仓市值:{round(context.portfolio.portfolio_value, 3)}, Change: {round(chg, 3)}")
    print(f"模型训练次数: {getattr(context, 'model_training_count', 0)}, 交易次数: {getattr(context, 'trade_count', 0)}, 调仓次数: {getattr(context, 'turnover_count', 0)}")

    #TODO
    #=========定义训练列========
    #HINT  因子列名
    #=========================

    if context.trading_day_index % context.retrain == 0 or chg<0.98:
        try:
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            n_days_ago = date_obj - timedelta(days=context.train_days)
            # 调整训练数据结束时间,往前推6个交易日,确保标签数据有效性
            train_end = context.add_trading_days(current_date, -6)
            df = dai.query(context.sql,filters={'date':[n_days_ago,train_end]}).df()
            print("开始预测","======训练集开始时间",n_days_ago,"======训练集结束时间",train_end)
            print(len(df))
            # 检查数据是否足够进行训练
            df.dropna(inplace=True)
            print(len(df))
            
            # 如果数据不足,跳过本次训练
            if len(df) == 0 or len(df) < 100:  # 设置最小数据量阈值
                print(f"警告: 训练数据不足,当前数据量: {len(df)},跳过本次训练")
                # 如果ZXJ_fac02表没有数据,我们可以创建一些基础数据
                if len(df) == 0:
                    # 尝试使用基础数据进行训练
                    df_fallback = dai.query(context.sql,filters={'date':[n_days_ago,train_end]}).df()
                    # 只对必要的列进行dropna,而不是所有列
                    df_fallback.dropna(subset=['date', 'instrument', 'label'] + [col for col in context.feature_list if col not in ['fac12', 'fac13'] ], inplace=True)
                    if len(df_fallback) > 0:
                        df = df_fallback
                        df.fillna(value={'fac12': 0, 'fac13': 0}, inplace=True)  # 为fac12和fac13填充值
                        print(f"使用回退数据进行训练,数据量: {len(df)}")
                    else:
                        print("回退数据也不足,跳过本次训练")
                        return  # 结束本次训练
            #TODO
            #============机器学习训练模型==========
            #HINT 对因子和label进行划分
            #训练模型

            import lightgbm as lgb

            X_train = df[context.feature_list]
            y_train = df['label']
            
            # 创建LGBM数据集
            dtrain = lgb.Dataset(X_train, label=y_train)

            params = {
                'objective': 'regression',
                'metric': 'rmse',
                'boosting_type': 'gbdt',
                'num_leaves': 15,
                'max_depth': 5,
                'learning_rate': 0.1,
                'feature_fraction': 0.8,
                'bagging_fraction': 0.8,
                'bagging_freq': 3,
                'min_data_in_leaf': 50,
                'verbosity': -1,
                'seed': 42,
                'feature_pre_filter': False
            }
            num_boost_round = 20

            # 训练模型
            context.model = lgb.train(params, dtrain, num_boost_round)

            # 增加模型训练计数
            context.model_training_count = getattr(context, 'model_training_count', 0) + 1

            # 使用模型对训练集进行预测,得到因子值
            # 注意:对于LGBM Dataset对象,我们需要直接使用特征数据进行预测
            X_train = df[context.feature_list]
            train_predictions = context.model.predict(X_train)
            df['zxj_factor1'] = train_predictions.astype('float64')  # 明确指定数据类型
            
            # 计算第二个因子(可以是其他有意义的计算)
            df['zxj_factor2'] = df.groupby('date')['zxj_factor1'].rank(pct=True).astype('float64')  # 明确指定数据类型
            
            # 构造ZXJ_fac02表所需的数据格式
            zxj_data = df[['date', 'instrument', 'zxj_factor1', 'zxj_factor2']].copy()
            
            # 使用BigQuant平台的dai.DataSource.write_bdb将数据保存到ZXJ_fac02表
            try:
                # 添加分区信息以提高数据访问效率
                zxj_data[dai.DEFAULT_PARTITION_FIELD] = zxj_data["date"].dt.year
                
                # 上传数据到ZXJ_fac02表
                dai.DataSource.write_bdb(
                    data=zxj_data,
                    id="ZXJ_fac02",
                    unique_together=["date", "instrument"],
                    indexes=["date"],
                )
                print(f"[INFO] 成功将数据写入ZXJ_fac02表,共{len(zxj_data)}条记录")
            except Exception as e:
                print(f"[ERROR] 写入ZXJ_fac02表失败: {str(e)}")
            
            print(f"[INFO] 训练完成,样本数: {len(df)}, 已准备写入ZXJ_fac02表, 训练次数: {context.model_training_count}")

            def save_lgb_model(bst, save_path):
                """
                安全保存 LightGBM 模型(原生 txt 格式)。
                若文件已存在则先删除,再写入新模型,并自动创建缺失目录。
                """
                # 1. 自动创建缺失的目录
                os.makedirs(os.path.dirname(save_path), exist_ok=True)

                # 2. 若文件已存在,则删除旧文件
                if os.path.isfile(save_path):
                    os.remove(save_path)

                # 3. 保存新模型
                bst.save_model(save_path)
                print(f"[INFO] 模型已保存 -> {save_path}")

            save_lgb_model(context.model, MODEL_PATH)

                        
        except Exception as e:
            print(f"predition error on {current_date}: {str(e)}")


    if context.trading_day_index % context.holding_days == 0 or chg<0.98:
        # 增加调仓计数
        context.turnover_count = getattr(context, 'turnover_count', 0) + 1
        
        import lightgbm as lgb
        #TODO
        try:
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            # 预测需要获取当前日期的数据,但训练数据截止日期往前推6个交易日
            # 因此预测查询应该包含当前日期
            n_days_ago = date_obj - timedelta(days=context.pred_days)
            df = dai.query(context.sql,filters={'date':[n_days_ago,current_date]}).df()
            
            # 只在预测时使用当前日期的数据,但确保ZXJ_fac02表中有当前日期的数据
            # 如果当前日期没有数据,可以使用最近可用的数据    

            df.dropna(subset=context.feature_list,inplace=True)
            print("开始生成信号",len(df),"===当前调仓期==",current_date)
                
            # 筛选出当前日期的数据,并检查是否存在
            df_current = df[df.date==current_date]
                
            # 检查是否有足够的数据进行预测
            if df_current.empty:
                print(f"警告: 当前日期 {current_date} 没有数据")
                context.pred = []
                return  # 结束本次预测
                
            # 检查特征列是否都有数据
            missing_features = [col for col in context.feature_list if col not in df_current.columns or df_current[col].isna().all()]
            if missing_features:
                print(f"警告: 当前日期 {current_date} 缺少特征数据: {missing_features}")
                # 尝试填充缺失值
                for col in missing_features:
                    if col in ['fac12', 'fac13']:  # 这些是我们自定义的因子
                        df_current[col] = df_current[col].fillna(0)  # 用0填充自定义因子
                    else:
                        # 对于其他因子,使用前向填充或均值填充
                        df_current[col] = df_current[col].fillna(method='pad').fillna(df_current[col].mean())
                
            # 确保所有特征列都有有效值
            df_current = df_current.dropna(subset=context.feature_list)
                
            if df_current.empty or len(df_current[context.feature_list]) == 0:
                print(f"警告: 当前日期 {current_date} 没有足够的有效特征数据进行预测")
                context.pred = []
                return  # 结束本次预测
                
            X_test = df_current[context.feature_list]
            
            import lightgbm as lgb
            bst = lgb.Booster(model_file=MODEL_PATH)
            df_current['score'] = bst.predict(X_test)
                
            # 更新context.pred
            context.pred = df_current[['date', 'instrument', 'score']]

            
        except Exception as e:
            print(f"predition error on {current_date}: {str(e)}")

    #结果赋值给context.pred
def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):

    import pandas as pd 
    from bigtrader.constant  import Direction 
    from bigtrader.constant  import OrderType
    
    if len(context.pred) < 1:
        return

    df = context.pred

    buy_list = list(df.sort_values(by='score',ascending=False).head(context.target_hold_count).instrument)
    len_ = len(buy_list)
    target_hold_instruments = set(buy_list)


    # 获取当前已持有股票
    current_hold_instruments = set(context.get_account_positions().keys())

    # 卖出不在目标持有列表中的股票
    for instrument in current_hold_instruments - target_hold_instruments:
        context.order_target_percent(instrument, 0)
        context.trade_count = getattr(context, 'trade_count', 0) + 1
            
    # 买入目标持有列表中的股票
    for instrument in target_hold_instruments - current_hold_instruments:
        context.order_target_percent(instrument, 1/len_)
        context.trade_count = getattr(context, 'trade_count', 0) + 1

    context.pred = []
    

performance = bigtrader.run(
    market=bigtrader.Market.CN_STOCK,
    frequency=bigtrader.Frequency.DAILY,
    start_date='2024-08-01',  
    end_date='2026-01-20',  
    capital_base=50000,     # 设置初始资金
    initialize=initialize,     # 传入初始化函数
    handle_data=handle_data,   # 传入数据处理函数
    before_trading_start = befor_trading,
    order_price_field_buy='open',
    order_price_field_sell='open',
)

# 渲染绩效报告,展示回测结果
performance.render()

\

标签

量化交易
{link}