BigQuant 2026年度私享会

方正 暗流涌动 因子构建 基础

由bq355jhd创建,最终由bq355jhd 被浏览 8 用户

研报:https://mp.weixin.qq.com/s/h9S6UCvV0Ffa9taHI8qjrA

因子分析 计算了2021到近期的,多头预测能力并不算突出,魔改版本还在进行中…

--因子分析代码(图下面)

--因子计算代码(最下面)

改为5日return 的IC = 0.0328

因子分析原文

https://bigquant.com/codesharev3/215ccfb3-3bfb-49ce-af79-27c328013d05

因子构建 已经平滑20日(好像python文件不能分享,就粘贴代码了)

""" 暗流涌动因子 - 严格原文版 完全按照方正金工原文逻辑实现

原文核心逻辑: 1. 成交量熵值因子:

  • 240分钟划分为48个5分钟区间
  • 计算相对成交量(个股/全市场)
  • 计算每个5分钟区间的相对成交量占比
  • 香农熵测量分布集中度
  • 均值距离化处理
  • 20日低频化(均值+标准差)

2. 流动性弹性因子:

  • 激增定义:超过过去5分钟均量1倍
  • 价格波动:(最高-最低)/开盘
  • 敏感系数 = 激增时刻波动/普通时刻波动
  • 弹性 = 1 - 敏感系数
  • 均值距离化处理
  • 20日低频化(均值+标准差)

3. 合成:等权合成 """

import numpy as np import pandas as pd import dai from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore')

class UndercurrentFactorOriginal: """暗流涌动因子 - 严格原文版"""

def __init__(self):
    """严格按照原文参数"""
    self.lookback_days = 20  # 低频化窗口

def fetch_minute_data(self, start_date, end_date):
    """获取分钟数据"""
    print(f"  获取分钟数据 {start_date} \~ {end_date}...", end='')

    sql = f"""
    WITH minute_bars AS (
        SELECT
            instrument,
            date,
            close,
            high,
            low,
            open,
            volume,
            CAST(date AS DATE) as trade_date,
            EXTRACT(HOUR FROM date) \* 60 + EXTRACT(MINUTE FROM date) as minute_of_day
        FROM cn_stock_bar1m_c
        WHERE date >= '{start_date} 09:00:00'
            AND date <= '{end_date} 15:30:00'
            AND volume > 0
            AND close > 0
            AND high > 0
            AND low > 0
            AND open > 0
    ),
    valid_stocks AS (
        SELECT DISTINCT
            instrument,
            CAST(date AS DATE) as trade_date
        FROM cn_stock_prefactors
        WHERE date >= '{start_date}'
            AND date <= '{end_date}'
            AND st_status = 0
            AND DATE_DIFF('day', list_date, date) >= 60
    )
    SELECT
        m.instrument,
        m.trade_date,
        m.date,
        m.close,
        m.high,
        m.low,
        m.open,
        m.volume,
        m.minute_of_day
    FROM minute_bars m
    INNER JOIN valid_stocks v
        ON m.instrument = v.instrument
        AND m.trade_date = v.trade_date
    WHERE m.minute_of_day >= 570  -- 09:30
        AND m.minute_of_day <= 900  -- 15:00 (不含最后一分钟)
    ORDER BY m.instrument, m.trade_date, m.date
    """

    df = dai.query(sql).df()
    print(f" ✓ {len(df):,}条")
    return df

def calculate_volume_entropy_factor(self, df_minute):
    """
    成交量熵值因子 - 严格按照原文

    步骤:
    1. 将240分钟划分为48个5分钟区间
    2. 计算相对成交量(个股/全市场)
    3. 计算香农熵
    4. 均值距离化
    5. 20日低频化
    """
    print("  计算成交量熵值因子...", end='')

    # 步骤1: 创建5分钟区间索引 (0-47)
    # 09:30-11:30 = 120分钟 = 24个区间
    # 13:00-15:00 = 120分钟 = 24个区间
    df = df_minute.copy()

    def get_interval_index(minute_of_day):
        """将分钟转换为区间索引"""
        if minute_of_day < 690:  # 09:30-11:30
            return (minute_of_day - 570) // 5
        else:  # 13:00-15:00
            return 24 + (minute_of_day - 780) // 5

    df\['interval'\] = df\['minute_of_day'\].apply(get_interval_index)

    # 步骤2: 计算相对成交量
    # 先计算全市场每分钟的总成交量
    market_volume = df.groupby(\['trade_date', 'date'\])\['volume'\].sum().reset_index()
    market_volume.columns = \['trade_date', 'date', 'market_volume'\]

    df = df.merge(market_volume, on=\['trade_date', 'date'\], how='left')
    df\['relative_volume'\] = df\['volume'\] / df\['market_volume'\]

    # 步骤3: 计算每个5分钟区间的相对成交量占比
    interval_stats = df.groupby(\['instrument', 'trade_date', 'interval'\])\['relative_volume'\].sum().reset_index()
    interval_stats.columns = \['instrument', 'trade_date', 'interval', 'interval_volume'\]

    # 计算每只股票每天的总相对成交量
    daily_total = interval_stats.groupby(\['instrument', 'trade_date'\])\['interval_volume'\].sum().reset_index()
    daily_total.columns = \['instrument', 'trade_date', 'total_volume'\]

    interval_stats = interval_stats.merge(daily_total, on=\['instrument', 'trade_date'\], how='left')
    interval_stats\['prob'\] = interval_stats\['interval_volume'\] / interval_stats\['total_volume'\]

    # 计算香农熵
    def calc_entropy(group):
        probs = group\['prob'\].values
        probs = probs\[probs > 0\]  # 只保留非零概率
        if len(probs) == 0:
            return 0
        entropy = -np.sum(probs \* np.log(probs))
        return entropy

    entropy_daily = interval_stats.groupby(\['instrument', 'trade_date'\]).apply(calc_entropy).reset_index()
    entropy_daily.columns = \['instrument', 'date', 'entropy'\]

    # 步骤4: 均值距离化处理(减去截面均值取绝对值)
    entropy_daily\['entropy_mean'\] = entropy_daily.groupby('date')\['entropy'\].transform('mean')
    entropy_daily\['entropy_distance'\] = abs(entropy_daily\['entropy'\] - entropy_daily\['entropy_mean'\])

    # 步骤5: 20日低频化(移动平均 + 移动标准差)
    entropy_daily = entropy_daily.sort_values(\['instrument', 'date'\])
    entropy_daily\['entropy_20d_mean'\] = entropy_daily.groupby('instrument')\['entropy_distance'\].transform(
        lambda x: x.rolling(self.lookback_days, min_periods=1).mean()
    )
    entropy_daily\['entropy_20d_std'\] = entropy_daily.groupby('instrument')\['entropy_distance'\].transform(
        lambda x: x.rolling(self.lookback_days, min_periods=1).std()
    )

    # 合成:均值 + 标准差
    entropy_daily\['volume_entropy_factor'\] = entropy_daily\['entropy_20d_mean'\] + entropy_daily\['entropy_20d_std'\].fillna(0)

    print(f" ✓")
    return entropy_daily\[\['instrument', 'date', 'volume_entropy_factor'\]\]

def calculate_liquidity_elasticity_factor(self, df_minute):
    """
    流动性弹性因子 - 严格按照原文

    步骤:
    1. 定义激增时刻:超过过去5分钟均量1倍
    2. 计算价格波动幅度:(最高-最低)/开盘
    3. 计算激增时刻和普通时刻的平均波动
    4. 敏感系数 = 激增波动/普通波动
    5. 弹性 = 1 - 敏感系数
    6. 均值距离化
    7. 20日低频化
    """
    print("  计算流动性弹性因子...", end='')

    df = df_minute.copy()
    df = df.sort_values(\['instrument', 'trade_date', 'date'\]).reset_index(drop=True)

    # 步骤1: 计算过去5分钟滚动均量
    df\['volume_5min_avg'\] = df.groupby('instrument')\['volume'\].transform(
        lambda x: x.rolling(5, min_periods=1).mean()
    )

    # 定义激增时刻:超过5分钟均量1倍(即2倍)
    df\['is_surge'\] = df\['volume'\] > (df\['volume_5min_avg'\] \* 2)

    # 步骤2: 计算价格波动幅度
    df\['price_volatility'\] = (df\['high'\] - df\['low'\]) / df\['open'\]

    # 步骤3: 分别计算激增时刻和普通时刻的平均波动
    def calc_elasticity(group):
        surge_data = group\[group\['is_surge'\]\]
        normal_data = group\[\~group\['is_surge'\]\]

        if len(surge_data) < 3 or len(normal_data) < 3:
            return pd.Series({
                'surge_volatility': np.nan,
                'normal_volatility': np.nan,
                'elasticity_raw': np.nan
            })

        surge_vol = surge_data\['price_volatility'\].mean()
        normal_vol = normal_data\['price_volatility'\].mean()

        if normal_vol == 0 or np.isnan(normal_vol):
            return pd.Series({
                'surge_volatility': surge_vol,
                'normal_volatility': normal_vol,
                'elasticity_raw': np.nan
            })

        # 步骤4: 敏感系数 = 激增波动 / 普通波动
        sensitivity = surge_vol / normal_vol

        # 步骤5: 弹性 = 1 - 敏感系数
        elasticity = 1 - sensitivity

        return pd.Series({
            'surge_volatility': surge_vol,
            'normal_volatility': normal_vol,
            'elasticity_raw': elasticity
        })

    elasticity_daily = df.groupby(\['instrument', 'trade_date'\]).apply(calc_elasticity).reset_index()
    elasticity_daily.columns = \['instrument', 'date', 'surge_volatility', 'normal_volatility', 'elasticity_raw'\]

    # 步骤6: 均值距离化处理
    elasticity_daily\['elasticity_mean'\] = elasticity_daily.groupby('date')\['elasticity_raw'\].transform('mean')
    elasticity_daily\['elasticity_distance'\] = abs(elasticity_daily\['elasticity_raw'\] - elasticity_daily\['elasticity_mean'\])

    # 步骤7: 20日低频化
    elasticity_daily = elasticity_daily.sort_values(\['instrument', 'date'\])
    elasticity_daily\['elasticity_20d_mean'\] = elasticity_daily.groupby('instrument')\['elasticity_distance'\].transform(
        lambda x: x.rolling(self.lookback_days, min_periods=1).mean()
    )
    elasticity_daily\['elasticity_20d_std'\] = elasticity_daily.groupby('instrument')\['elasticity_distance'\].transform(
        lambda x: x.rolling(self.lookback_days, min_periods=1).std()
    )

    # 合成:均值 + 标准差
    elasticity_daily\['liquidity_elasticity_factor'\] = elasticity_daily\['elasticity_20d_mean'\] + elasticity_daily\['elasticity_20d_std'\].fillna(0)

    print(f" ✓")
    return elasticity_daily\[\['instrument', 'date', 'liquidity_elasticity_factor'\]\]

def combine_factors(self, df_entropy, df_elasticity):
    """等权合成两个子因子"""
    print("  合成因子...", end='')

    df = pd.merge(df_entropy, df_elasticity, on=\['instrument', 'date'\], how='inner')

    # 等权合成
    df\['undercurrent'\] = df\['volume_entropy_factor'\] + df\['liquidity_elasticity_factor'\]

    print(f" ✓")
    return df\[\['instrument', 'date', 'volume_entropy_factor', 'liquidity_elasticity_factor', 'undercurrent'\]\]

def calculate_batch(self, start_date, end_date):
    """计算单个批次"""
    # 获取分钟数据
    df_minute = self.fetch_minute_data(start_date, end_date)

    if len(df_minute) == 0:
        return pd.DataFrame()

    # 计算两个子因子
    df_entropy = self.calculate_volume_entropy_factor(df_minute)
    df_elasticity = self.calculate_liquidity_elasticity_factor(df_minute)

    # 合成
    df_factor = self.combine_factors(df_entropy, df_elasticity)

    return df_factor

def save_to_bdb(self, df_factor, table_name='factor_undercurrent_original'):
    """保存到数据库"""
    if len(df_factor) == 0:
        return

    df_save = df_factor.copy()
    df_save\['date'\] = pd.to_datetime(df_save\['date'\])
    df_save.columns = \['instrument', 'date', 'entropy', 'elasticity', 'undercurrent'\]

    try:
        dai.DataSource.write_bdb(
            data=df_save,
            id=table_name,
            unique_together=\['instrument', 'date'\],
            indexes=\['date'\]
        )
        print(f"  保存 ✓ {len(df_save)}条")
    except Exception as e:
        print(f"  保存 ✗ {str(e)\[:100\]}")

def calculate_range(self, start_date, end_date, batch_days=5,
                   save_to_db=True, table_name='factor_undercurrent_original'):
    """批量计算"""
    print("=" \* 80)
    print(" " \* 25 + "暗流涌动因子 - 严格原文版")
    print("=" \* 80)
    print("核心逻辑:")
    print("  1. 成交量熵值:48个5分钟区间 + 相对成交量 + 均值距离化 + 20日低频化")
    print("  2. 流动性弹性:激增定义(5分钟均量\*2) + 敏感系数 + 弹性=1-敏感 + 20日低频化")
    print("  3. 等权合成")
    print("=" \* 80)

    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')

    batches = \[\]
    current = start
    while current <= end:
        batch_end = min(current + timedelta(days=batch_days-1), end)
        batches.append((current.strftime('%Y-%m-%d'), batch_end.strftime('%Y-%m-%d')))
        current = batch_end + timedelta(days=1)

    print(f"\\n总共 {len(batches)} 个批次\\n")

    all_factors = \[\]
    success_count = 0

    for i, (batch_start, batch_end) in enumerate(batches, 1):
        print(f"\[批次 {i}/{len(batches)}\] {batch_start} \~ {batch_end}")

        try:
            df_batch = self.calculate_batch(batch_start, batch_end)

            if len(df_batch) > 0:
                all_factors.append(df_batch)
                success_count += 1

                if save_to_db:
                    self.save_to_bdb(df_batch, table_name)

        except Exception as e:
            print(f"  错误: {str(e)\[:100\]}")
            import traceback
            traceback.print_exc()
            continue

    print("\\n" + "=" \* 80)
    print(f"完成!成功 {success_count}/{len(batches)} 个批次")
    print("=" \* 80)

    if all_factors:
        df_all = pd.concat(all_factors, ignore_index=True)
        return df_all
    else:
        return pd.DataFrame()

def main(): """主函数""" calculator = UndercurrentFactorOriginal()

# 计算2021-2026
df = calculator.calculate_range(
    start_date='2022-01-01',
    end_date='2026-01-28',
    batch_days=5,
    save_to_db=True,
    table_name='factor_undercurrent_original'
)

if len(df) > 0:
    print("\\n因子预览:")
    print(df.head(20))

    print("\\n因子统计:")
    print(df\[\['entropy', 'elasticity', 'undercurrent'\]\].describe())

if name == "main": main()

{link}