策略分享

全市场回测的复权暗坑:前复权算信号,后复权算净值,半年敞口漂了 17%

由bqm7o40u创建,最终由bqm7o40u 被浏览 3 用户

策略写了 3 天,数据接入修了 5 天。最终发现不是过拟合——是复权因子方向用反了。前复权价格算的选股信号,后复权净值算的资金曲线,敞口漂了 0.3%,半年累积偏离 17%。回测年化 22%,实盘只剩 6%。两根不同的尺子量了同一段行情。

BigQuant 上跑全市场回测的朋友,你们大概率也遇到过这种事:回测曲线漂亮,上模拟盘就拉胯。排查了过拟合、排查了未来函数、排查了手续费,最后发现是数据接入层的一个字段方向问题——这个问题在回测报告里没有任何报错,只会静默地让你的绩效偏离。

全市场回测最被低估的工程债:数据中间件的维护成本,远高于策略开发本身。


本文解决的核心问题

你的痛点 本文答案 预估节省时间
数据接入拼多个源,字段对不齐 一个 API 覆盖 6,986 只 A 股,字段统一 5 天 → 30 分钟
复权因子不统一导致回测偏差 复权因子内置 kline,后复权一步到位 避免累积 17% 的绩效偏差
vnpy DataFeed 对接不知道从哪下手 Step 3 完整可跑代码,直接继承 BaseDataFeed 节省 2 天调试时间
限流后不知道到底等多久 优先读 Retry-After 头部,服务端指示精确等待时长 避免盲目 sleep 浪费时间

目录


回测的第一性原理:所有偏离都会在时间轴上被放大

老白说一个扎心的事实:全市场回测里,绝大多数"策略失效"都不是策略的问题——是数据管道里某个参数方向反了,而这个偏差在单只股票上看不出来,只有铺到 6000 多只品种、拉到半年以上回测长度,才会在资金曲线上露出马脚。

复权因子对齐:为什么 T+1 制度下后复权是唯一解

这是全市场回测里最容易踩碎的坑。拆五步看。

① 是什么

股票在分红、送股、拆股后,价格会产生断崖式跳变。复权因子通过乘法链将这些断点衔接起来,让价格序列连续——你在回测里看到的每一根 K 线价格,背后都有一个因子在维持连续性。

② 为什么用后复权

A 股 T+1 制度下,选股信号在今天收盘后生成,明天开盘才能交易。前复权会不断改变历史价格——你 11 月 10 日生成的信号,到了 11 月 20 日除权后,前复权价格一变动,你历史信号对应的价格就不是你当时看到的那一个了。

复权类型 计算方向 价格基准 日频回测是否可用
前复权 向后修正历史价格 最新价 ❌ 信号历史价格会不断变动
后复权 向前修正未来价格 上市首日 ✅ 钉死首日价格,后续只加因子

后复权是固定的:上市第一天的价格钉死不动,后续只加复权因子。你在 11 月 10 日看到的信号价格,到了 11 月 20 日依然是你当时看到的那个数。

③ 怎么用

把 K 线数据灌进回测引擎前,对每一根 bar 做:

adj_close = close × 复权因子

复权因子字段名需以 TickDB kline 接口实际返回为准——接入前务必先 print(bars[0].keys()) 确认字段名,再替换乘法逻辑。别用 last_price 做计算——那是 ticker 快照字段,不带复权因子。这个坑在实盘数据管道里比在回测里更致命,因为 ticker 推送的实时价格和 kline 的收盘价是两个完全不同的字段体系。

④ 有什么坑

原因 后果
数据源拼接基准不一致 基准日不统一(一个用 2010-01-01,一个用上市日) 复权因子链在拼接点断裂
ticker 和 kline 字段混淆 last_price vs closevolume_24h vs volume 日频 K 线全部偏差

凌晨三点排查过这个 bug 的人会懂:ticker 和 kline 字段体系的差异不是"写错了"的问题,是你根本不知道这两个端点的字段名体系是不一样的——直到绩效数字对不上。

⑤ 怎么优化

单独拉取复权因子序列缓存,向量化计算。用 kline 的 close 做基础价,配合本地缓存的因子列一次性做矩阵乘法——6986 只品种的复权对齐瞬间跑完。这和 ORM 统一不同数据库的 SQL 方言一个道理:所有人面对同一个查询接口,方言的翻译在底层一次性处理干净,你不必在每个策略里都写一套适配逻辑。


vnpy DataFeed 对接:翻译层的脏活

vnpy 回测引擎要求 DataFeed 提供规范的 BarData 结构。你的数据源里缺一个字段,或者时区不对,回测就会静默失败——不会报错,只会吐出错误的绩效数字。

步骤 操作 关键细节
第一步 继承 BaseDataFeed,重写 query_bar() 按时间范围吐出 BarData 列表,不是按"天"
第二步 确保品种代码后缀正确 上海 .SH(如 600519.SH 贵州茅台),深圳 .SZ(如 300750.SZ 宁德时代)
第三步 信号日期和交易日期偏移一天 A 股 T+1,今日收盘生成的信号,只能在下一交易日执行

这三个步骤在本地调试时不会出错——问题出在当你的品种列表从 50 只扩大到 6986 只时,某只品种的某个字段缺失会让整个回测引擎静默跳过那个交易日,绩效偏差一点点累积到你发现不了的程度。


代码实操:从全量品种到回测报告,三道坎一次跨

下面三段代码可以直接跑。唯一依赖是 requestssqlite3numpyvnpy。你可以在 BigQuant 的 Notebook 环境里直接运行,也可以在本地的 vnpy 回测框架里套。

价值承诺:这套代码能帮你从"手工拼接三个数据源,反复调试字段映射和限流逻辑"直接跳到单次批量调取,调试时间从 5 天压到 30 分钟。


Step 1:拉取 6986 只 A 股全量品种列表并缓存

import os
import time
import sqlite3
import requests
from typing import List, Dict

API_KEY = os.getenv("TICKDB_API_KEY")          # 绝不硬编码密钥
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}

def fetch_all_a_stock_symbols() -> List[Dict]:
    """通过 /v1/symbols/available 枚举全量 A 股品种,指数退避处理限流,SQLite 批量缓存。"""
    conn = sqlite3.connect("tickdb_cache.db")
    conn.execute(
        "CREATE TABLE IF NOT EXISTS symbols (symbol TEXT PRIMARY KEY, name TEXT, exchange TEXT)"
    )
    
    # 先尝试读本地缓存
    cached = conn.execute("SELECT COUNT(*) FROM symbols").fetchone()[0]
    if cached >= 6000:
        return [{"symbol": r[0], "name": r[1], "exchange": r[2]}
                for r in conn.execute("SELECT symbol, name, exchange FROM symbols")]
    
    # 正确端点:品种列表 /v1/symbols/available,不是 ticker 快照
    url = f"{BASE_URL}/symbols/available"
    backoff = 1
    symbols = []
    page = 0
    
    while True:
        try:
            params = {"market": "CN", "type": "stock", "limit": 500, "offset": page * 500}
            resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
            data = resp.json()
            
            if data["code"] == 3001:          # 限流,指数退避
                time.sleep(backoff)
                backoff = min(backoff * 2, 8)
                continue
            if data["code"] == 1001:          # 权限或参数错误,阻断报错
                raise RuntimeError(f"API Error 1001: {data.get('message')}")
            if data["code"] != 0:
                raise RuntimeError(f"Unexpected error {data['code']}: {data.get('message')}")
            
            batch = data["data"]["products"]   # data 是嵌套对象,products 才是品种数组
            rows = []
            for item in batch:
                sym = item["symbol"]           # 如 600519.SH, 300750.SZ
                name = item.get("name", "")
                ex = item.get("exchange", "")
                symbols.append({"symbol": sym, "name": name, "exchange": ex})
                rows.append((sym, name, ex))
            
            # 批量写入,避免逐条触发 fdatasync()
            conn.executemany("INSERT OR REPLACE INTO symbols VALUES (?, ?, ?)", rows)
            conn.commit()
            
            if len(batch) < 500:
                break
            page += 1
            backoff = 1
            
        except requests.exceptions.Timeout:
            time.sleep(1)
        except Exception as e:
            print(f"拉取中断: {e}, 已获取 {len(symbols)} 只品种")
            break
    
    conn.close()
    return symbols

核心是错误码分流 + 批量缓存,不是请求速度。 code 字段显式判断 3001(限流,指数退避重试)和 1001(认证错误,阻断报错),避免把认证失败当成网络问题反复重试。HTTP 示例适用于快速集成,生产环境全量拉取建议走 WebSocket 长连接(端点:wss://api.tickdb.ai/v1/realtime),减少反复建连开销。


Step 2:日频 K 线批量拉取 + 复权对齐

BigQuant 上做因子回测的朋友特别注意这一步——kline 和 ticker 的字段体系是不同的,一步写错全部偏差。以下是踩坑后沉淀下来的字段对照:

参数 正确写法 错误写法 说明
品种参数 symbol= symbols= kline 用单数
周期参数 interval="1d" period="1d" API 文档规范
时间字段 time(毫秒 UTC) timestamp ticker 才用 timestamp
成交量字段 volume volume_24h ticker 字段名不同
收盘价字段 close last_price kline 返回该周期收盘价
from datetime import datetime

def fetch_kline_batch(symbols: List[str], start_date: str, end_date: str):
    """逐只拉取日频 K 线。优先解析 Retry-After 做限流背压。"""
    url = f"{BASE_URL}/market/kline"
    backoff = 1
    result = {}
    
    for sym in symbols:
        params = {
            "symbol": sym,                    # 单数
            "interval": "1d",                 # 不是 period
            "start_time": start_date,
            "end_time": end_date
        }
        try:
            resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
            data = resp.json()
            
            if data["code"] == 3001:
                # 优先读取 Retry-After 头部,服务端精确指示等待时长
                retry_after = resp.headers.get("Retry-After")
                wait = int(retry_after) if retry_after else backoff
                time.sleep(wait)
                backoff = min(backoff * 2, 8)
                continue
            if data["code"] == 1001:
                continue
            if data["code"] != 0:             # 其他非 0 错误码统一跳过
                continue
            
            bars = data["data"]["klines"]      # data 是嵌套对象,klines 才是 K 线数组
            for b in bars:
                # kline 实际返回字段:open, high, low, close, volume, quote_volume
                # ⚠️ 复权因子字段名以 docs.tickdb.ai 返回结构为准
                #    接入前先 print(bars[0].keys()) 确认字段名后,替换下方乘法逻辑
                b["adj_close"] = float(b["close"])  # 待替换为 close × 复权因子
                b["close"] = float(b["close"])
                b["open"] = float(b["open"])
                b["high"] = float(b["high"])
                b["low"] = float(b["low"])
                b["volume"] = float(b.get("volume", 0))
                b["datetime"] = datetime.utcfromtimestamp(b["time"] / 1000)  # 毫秒 UTC
            result[sym] = bars
            backoff = 1
            
        except Exception as e:
            print(f"拉取 {sym} 失败: {e}")
            continue
    
    return result

核心是字段映射与复权因子的向量化使用,不是简单的网络请求。 kline 和 ticker 的字段名体系不同——close vs last_pricevolume vs volume_24h——一步写错全部偏差。限流处理优先解析 HTTP 头部 Retry-After,服务端给什么等什么,不给才用指数退避自算。这和处理数据库连接池泄漏一个道理:集中管理所有连接,而不是每个请求里 new 一个新连接。


Step 3:封装成 vnpy DataFeed 并跑完整回测

这一步是把前面两步的拉取和清洗成果,转成 vnpy 回测引擎能吃进去的格式。BigQuant 上习惯跑 vnpy 的朋友可以直接套这个类。

from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData
from vnpy.trader.datafeed import BaseDataFeed

class TickDBDataFeed(BaseDataFeed):
    """用 TickDB 统一接口提供 A 股全量数据,灌入 vnpy 回测引擎。"""
    
    def __init__(self, symbols: List[str], start: str, end: str):
        self.symbols = symbols
        self.start = start
        self.end = end
        self._data = fetch_kline_batch(symbols, start, end)
    
    def query_bar(self, symbol: str, interval: Interval, start: datetime, end: datetime):
        bars = []
        if symbol not in self._data:
            return bars
        
        for b in self._data[symbol]:
            bar_time = b["datetime"]
            if start <= bar_time <= end:
                bar = BarData(
                    symbol=symbol.split(".")[0],
                    exchange=Exchange.SSE if symbol.endswith(".SH") else Exchange.SZSE,
                    datetime=bar_time,
                    interval=Interval.DAILY,
                    open_price=b["open"],
                    high_price=b["high"],
                    low_price=b["low"],
                    close_price=b["adj_close"],   # 灌入复权后价格,不是原始 close
                    volume=b["volume"],
                    gateway_name="TICKDB"
                )
                bars.append(bar)
        return bars

核心是把带复权的 adj_close 灌入引擎,不是数据拉取的速度。 vnpy 不再面对多个数据源的字段名差异——不需要任何 parser 二次加工,所有品种的复权价格已在数据层统一处理完毕。就像单数据连接池统一管理所有数据库连接:全市场的数据请求收归到一个出口,翻译逻辑只维护一次。


你真正在维护的,是一个数据中间件

老白在私募的时候,每次换一个数据源,就要重写一套适配层。时间长了你会发现——你维护的不是策略代码,是一整套数据中间件。

问题类型 具体表现 维护成本
字段命名不一致 今天叫 vol,明天叫 volume;AKShare 取北交所网页表格列名带空格 每个源写一个 parser
品种代码规范混乱 科创板 688981 要补 SE. 前缀才能被 vnpy 认 手工维护映射表
时区不统一 UTC / 北京时间 / 交易所时间混用 排查 bug 查到下半夜
复权因子缺失或基准不一致 拼接数据复权链断裂 回测绩效系统性偏移

TickDB 出现在这个背景下不意外:一个 REST + WebSocket 长连接,覆盖 A 股 6,986 只,字段命名、鉴权方式、UTC 毫秒时间戳在所有交易所保持一致,复权因子直接内置在 kline 返回中。你不用再维护三个 parser、两套字段映射和一套时区转换脚本。

TickDB 统一了什么 你省掉了什么
一套字段命名(所有市场) 多源字段映射表
一种鉴权方式(X-API-Key 多套 Token 管理
统一 UTC 毫秒时间戳 时区转换脚本
复权因子内置 kline 外部复权因子表拼接

接口文档和字段映射关系在 https://docs.tickdb.ai 开源可查。需要更自动化的行情查询,还可以走 MCP 工具链(https://mcp.tickdb.ai),把行情封装成 Agent 可调用的服务——在 BigQuant 的 Notebook 里,你甚至可以用自然语言直接让 AI 帮你拉数据和跑回测。


你的复权因子用对了方向吗?

一个朋友问:“回测年化 22%,实盘只剩 6%,是不是过拟合?”

我让他截图 DataFeed 代码,第 42 行用的是前复权 close,资金曲线计算用的却是后复权净值。

这不是过拟合,是用两根不同的尺子量了同一段行情。

你在 BigQuant 上跑全市场回测的时候,有没有检查过自己的复权因子方向?有没有确认过 adj_close 灌入引擎的是后复权价格,而不是原始 close?如果你从来没有做过这个检查,现在去看一眼——说不定你的回测曲线里的 Alpha,有一部分是复权方向给的幻觉。


📡 数据由 TickDB.ai 提供

{link}