Tick 回测demos
由qxiao创建,最终由qxiao 被浏览 4 用户
股票 Tick 策略一:VWAP 突破策略
沪深300成分股中日成交额最大的 30 只股票,基于 tick 价格的短期 VWAP 突破信号。买入:最新价上穿近 N 个 tick 的成交量加权均价(VWAP);卖出:价格下穿 VWAP,或触发止盈/止损/超时。风控:止盈 1.5%,止损 0.8%,最大持仓 60 分钟。
from bigquant import bigtrader, dai
from collections import deque
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.take_profit = 0.015 # 止盈 1.5%
context.stop_loss = 0.008 # 止损 0.8%
context.max_hold_minutes = 60 # 最大持仓 60 分钟
context.vwap_window = 20 # VWAP 计算窗口(tick 数)
prev_day = context.add_trading_days(context.start_date, -1)
stock_sql = """
SELECT instrument, amount
FROM cn_stock_prefactors
WHERE
date = $prev_day
AND is_hs300 = 1
AND st_status = 0
AND suspended = 0
AND amount > 500000000
ORDER BY amount DESC
LIMIT 30
"""
stock_df = dai.query(stock_sql, params={"prev_day": prev_day}).df()
context.stock_pool = stock_df["instrument"].tolist()
context.price_buf = {s: deque(maxlen=context.vwap_window) for s in context.stock_pool}
context.vol_buf = {s: deque(maxlen=context.vwap_window) for s in context.stock_pool}
context.prev_price = {s: None for s in context.stock_pool}
context.prev_vwap = {s: None for s in context.stock_pool}
context.hold_start = {}
def before_trading_start(context: bigtrader.IContext, data: bigtrader.IBarData):
context.subscribe(context.stock_pool)
def handle_tick(context: bigtrader.IContext, tick: bigtrader.ITickData):
instrument = tick.instrument
if instrument not in context.stock_pool:
return
current_dt = tick.datetime
time_str = current_dt.strftime("%H:%M")
if time_str < "09:31" or time_str > "14:55":
return
price = tick.last_price
volume = tick.volume
if price is None or price <= 0:
return
prev_vol = context.vol_buf[instrument][-1] if context.vol_buf[instrument] else 0
tick_vol = max(volume - prev_vol, 1)
context.price_buf[instrument].append(price)
context.vol_buf[instrument].append(tick_vol)
if len(context.price_buf[instrument]) < context.vwap_window:
context.prev_price[instrument] = price
return
prices = list(context.price_buf[instrument])
volumes = list(context.vol_buf[instrument])
total_vol = sum(volumes)
vwap = sum(p * v for p, v in zip(prices, volumes)) / total_vol
prev_price = context.prev_price[instrument]
prev_vwap = context.prev_vwap[instrument]
context.prev_price[instrument] = price
context.prev_vwap[instrument] = vwap
if prev_price is None or prev_vwap is None:
return
position = context.get_position(instrument, create_if_none=False)
holding = position is not None and position.current_qty > 0
# ---- 卖出逻辑 ----
if holding:
cost_price = position.cost_price
pnl_ratio = (price - cost_price) / cost_price
if pnl_ratio >= context.take_profit:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
if pnl_ratio <= -context.stop_loss:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
start_dt = context.hold_start.get(instrument)
if start_dt is not None:
elapsed_min = (current_dt - start_dt).seconds / 60
if elapsed_min >= context.max_hold_minutes:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
if prev_price >= prev_vwap and price < vwap:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
# ---- 买入逻辑 ----
else:
if prev_price <= prev_vwap and price > vwap:
context.order_target_percent(instrument, 0.05)
context.hold_start[instrument] = current_dt
performance = bigtrader.run(
market=bigtrader.Market.CN_STOCK,
frequency=bigtrader.Frequency.TICK,
start_date="2026-04-28",
end_date="2026-04-30",
capital_base=1000000,
initialize=initialize,
before_trading_start=before_trading_start,
handle_tick=handle_tick,
order_price_field_buy="close",
order_price_field_sell="close",
)
股票 Tick 策略二:短期超卖反弹策略
连续 N 个 tick 价格下跌后,出现第一个上涨 tick(超卖反弹)时买入;连续 M 个 tick 价格上涨后出现下跌(动量衰竭)时卖出。风控:止盈 1.5%,止损 0.8%,最大持仓 60 分钟。
from bigquant import bigtrader, dai
from collections import deque
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.take_profit = 0.015
context.stop_loss = 0.008
context.max_hold_minutes = 60
context.down_streak_entry = 3 # 触发买入所需的连续下跌 tick 数
context.up_streak_exit = 3 # 触发卖出所需的连续上涨 tick 数
context.price_window = 20
prev_day = context.add_trading_days(context.start_date, -1)
stock_sql = """
SELECT instrument, amount
FROM cn_stock_prefactors
WHERE date = $prev_day AND is_hs300 = 1 AND st_status = 0
AND suspended = 0 AND amount > 500000000
ORDER BY amount DESC LIMIT 1
"""
stock_df = dai.query(stock_sql, params={"prev_day": prev_day}).df()
context.stock_pool = list(stock_df["instrument"].unique())
context.price_buf = {s: deque(maxlen=context.price_window) for s in context.stock_pool}
context.hold_start = {}
def before_trading_start(context: bigtrader.IContext, data: bigtrader.IBarData):
context.subscribe(context.stock_pool)
for s in context.stock_pool:
context.price_buf[s].clear()
def handle_tick(context: bigtrader.IContext, tick: bigtrader.ITickData):
instrument = tick.instrument
if instrument not in context.stock_pool:
return
current_dt = tick.datetime
time_str = current_dt.strftime("%H:%M")
if time_str < "09:31" or time_str > "14:55":
return
price = tick.last_price
if price is None or price <= 0:
return
context.price_buf[instrument].append(price)
if len(context.price_buf[instrument]) < context.price_window:
return
prices = list(context.price_buf[instrument])
directions = []
for i in range(1, len(prices)):
if prices[i] > prices[i - 1]:
directions.append(1)
elif prices[i] < prices[i - 1]:
directions.append(-1)
else:
directions.append(0)
position = context.get_position(instrument, create_if_none=False)
holding = position is not None and position.current_qty > 0
if holding:
cost_price = position.cost_price
pnl_ratio = (price - cost_price) / cost_price
if pnl_ratio >= context.take_profit:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
if pnl_ratio <= -context.stop_loss:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
start_dt = context.hold_start.get(instrument)
if start_dt is not None:
elapsed_min = (current_dt - start_dt).seconds / 60
if elapsed_min >= context.max_hold_minutes:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
recent = directions[-context.up_streak_exit:]
if all(d == 1 for d in recent[:-1]) and directions[-1] == -1:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
else:
prior = directions[-(context.down_streak_entry + 1):-1]
last_dir = directions[-1]
if len(prior) == context.down_streak_entry and all(d == -1 for d in prior) and last_dir == 1:
context.order_target_percent(instrument, 0.05)
context.hold_start[instrument] = current_dt
performance = bigtrader.run(
market=bigtrader.Market.CN_STOCK,
frequency=bigtrader.Frequency.TICK,
start_date="2026-04-22",
end_date="2026-04-30",
capital_base=10000000,
initialize=initialize,
before_trading_start=before_trading_start,
handle_tick=handle_tick,
order_price_field_buy="close",
order_price_field_sell="close",
)
ETF Tick 策略一:双 ETF 价差均值回归
沪深300ETF(510300.SH)+ 黄金ETF(518880.SH),基于两只 ETF 归一化价格之差的均值回归。价差低于均值 - N 倍标准差时买入沪深300ETF,高于均值 + N 倍标准差时买入黄金ETF,价差回归均值附近时平仓。
from bigquant import bigtrader, dai
from collections import deque
ETF_POOL = ["510300.SH", "518880.SH"]
SPREAD_WINDOW = 60
ENTRY_ZSCORE = 2.0
EXIT_ZSCORE = 0.5
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.take_profit = 0.015
context.stop_loss = 0.008
context.max_hold_minutes = 60
context.stock_pool = ETF_POOL
context.price_buf = {s: deque(maxlen=SPREAD_WINDOW) for s in context.stock_pool}
context.hold_start = {}
def before_trading_start(context: bigtrader.IContext, data: bigtrader.IBarData):
context.subscribe(context.stock_pool)
for s in context.stock_pool:
context.price_buf[s].clear()
def _calc_zscore(spread_series):
if len(spread_series) < 2:
return 0.0
mean = sum(spread_series) / len(spread_series)
variance = sum((x - mean) ** 2 for x in spread_series) / len(spread_series)
std = variance ** 0.5
if std < 1e-10:
return 0.0
return (spread_series[-1] - mean) / std
def handle_tick(context: bigtrader.IContext, tick: bigtrader.ITickData):
instrument = tick.instrument
if instrument not in context.stock_pool:
return
current_dt = tick.datetime
time_str = current_dt.strftime("%H:%M")
if time_str < "09:31" or time_str > "14:55":
return
price = tick.last_price
if price is None or price <= 0:
return
context.price_buf[instrument].append(price)
etf_300 = ETF_POOL[0]
etf_gold = ETF_POOL[1]
if (len(context.price_buf[etf_300]) < SPREAD_WINDOW or
len(context.price_buf[etf_gold]) < SPREAD_WINDOW):
return
prices_300 = list(context.price_buf[etf_300])
prices_gold = list(context.price_buf[etf_gold])
base_300 = prices_300[0] if prices_300[0] != 0 else 1.0
base_gold = prices_gold[0] if prices_gold[0] != 0 else 1.0
norm_300 = [p / base_300 for p in prices_300]
norm_gold = [p / base_gold for p in prices_gold]
spread_series = [a - b for a, b in zip(norm_300, norm_gold)]
zscore = _calc_zscore(spread_series)
pos_300 = context.get_position(etf_300, create_if_none=False)
pos_gold = context.get_position(etf_gold, create_if_none=False)
holding_300 = pos_300 is not None and pos_300.current_qty > 0
holding_gold = pos_gold is not None and pos_gold.current_qty > 0
# 平仓逻辑(止盈/止损/价差回归)
if holding_300:
pnl = (prices_300[-1] - pos_300.cost_price) / pos_300.cost_price
if pnl >= context.take_profit or pnl <= -context.stop_loss or abs(zscore) <= EXIT_ZSCORE:
context.order_target_percent(etf_300, 0)
context.hold_start.pop(etf_300, None)
if holding_gold:
pnl = (prices_gold[-1] - pos_gold.cost_price) / pos_gold.cost_price
if pnl >= context.take_profit or pnl <= -context.stop_loss or abs(zscore) <= EXIT_ZSCORE:
context.order_target_percent(etf_gold, 0)
context.hold_start.pop(etf_gold, None)
# 开仓逻辑
if not holding_300 and zscore <= -ENTRY_ZSCORE:
context.order_target_percent(etf_300, 0.45)
context.hold_start[etf_300] = current_dt
elif not holding_gold and zscore >= ENTRY_ZSCORE:
context.order_target_percent(etf_gold, 0.45)
context.hold_start[etf_gold] = current_dt
performance = bigtrader.run(
market=bigtrader.Market.CN_FUND,
frequency=bigtrader.Frequency.TICK,
start_date="2026-04-28",
end_date="2026-04-30",
capital_base=1000000,
initialize=initialize,
before_trading_start=before_trading_start,
handle_tick=handle_tick,
order_price_field_buy="close",
order_price_field_sell="close",
)
ETF Tick 策略二:三 ETF 动量轮动
沪深300ETF / 纳指ETF / 黄金ETF,基于 tick 级别的短期动量。每个 tick 计算各 ETF 相对当日开盘价的涨幅(日内动量),买入当前动量最强的 1 只,满仓持有;当持仓 ETF 不再是动量最强时,切换到更强的品种。风控:止损 1%,最大持仓 120 分钟,尾盘强制平仓。
from bigquant import bigtrader, dai
from collections import deque
ETF_POOL = ["510300.SH", "513100.SH", "518880.SH"]
MOMENTUM_WINDOW = 30
MIN_SWITCH_GAP = 0.0005
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.stop_loss = 0.01
context.max_hold_minutes = 120
context.stock_pool = ETF_POOL
context.price_buf = {s: deque(maxlen=MOMENTUM_WINDOW) for s in context.stock_pool}
context.open_price = {s: None for s in context.stock_pool}
context.hold_start = {}
def before_trading_start(context: bigtrader.IContext, data: bigtrader.IBarData):
context.subscribe(context.stock_pool)
for s in context.stock_pool:
context.price_buf[s].clear()
context.open_price[s] = None
def handle_tick(context: bigtrader.IContext, tick: bigtrader.ITickData):
instrument = tick.instrument
if instrument not in context.stock_pool:
return
current_dt = tick.datetime
time_str = current_dt.strftime("%H:%M")
if time_str < "09:31" or time_str > "14:55":
return
price = tick.last_price
if price is None or price <= 0:
return
if context.open_price[instrument] is None:
context.open_price[instrument] = price
context.price_buf[instrument].append(price)
# 尾盘 14:50 强制平仓
if time_str >= "14:50":
for etf in context.stock_pool:
pos = context.get_position(etf, create_if_none=False)
if pos is not None and pos.current_qty > 0:
context.order_target_percent(etf, 0)
context.hold_start.pop(etf, None)
return
if any(len(context.price_buf[s]) < MOMENTUM_WINDOW for s in context.stock_pool):
return
if any(context.open_price[s] is None for s in context.stock_pool):
return
momentum = {}
for s in context.stock_pool:
buf = list(context.price_buf[s])
avg_price = sum(buf) / len(buf)
open_ref = context.open_price[s]
momentum[s] = (avg_price / open_ref - 1) if open_ref > 0 else 0.0
best_etf = max(momentum, key=lambda s: momentum[s])
best_mom = momentum[best_etf]
current_holding = None
for etf in context.stock_pool:
pos = context.get_position(etf, create_if_none=False)
if pos is not None and pos.current_qty > 0:
current_holding = etf
break
# 止损检查
if current_holding is not None:
pos = context.get_position(current_holding, create_if_none=False)
cur_price = list(context.price_buf[current_holding])[-1]
pnl = (cur_price - pos.cost_price) / pos.cost_price
if pnl <= -context.stop_loss:
context.order_target_percent(current_holding, 0)
context.hold_start.pop(current_holding, None)
return
start_dt = context.hold_start.get(current_holding)
if start_dt and (current_dt - start_dt).seconds / 60 >= context.max_hold_minutes:
context.order_target_percent(current_holding, 0)
context.hold_start.pop(current_holding, None)
return
# 轮动逻辑
if current_holding is None:
if best_mom > 0:
context.order_target_percent(best_etf, 0.95)
context.hold_start[best_etf] = current_dt
elif current_holding != best_etf:
cur_mom = momentum[current_holding]
if best_mom - cur_mom > MIN_SWITCH_GAP:
context.order_target_percent(current_holding, 0)
context.order_target_percent(best_etf, 0.95)
context.hold_start.pop(current_holding, None)
context.hold_start[best_etf] = current_dt
performance = bigtrader.run(
market=bigtrader.Market.CN_FUND,
frequency=bigtrader.Frequency.TICK,
start_date="2026-04-28",
end_date="2026-04-30",
capital_base=1000000,
initialize=initialize,
before_trading_start=before_trading_start,
handle_tick=handle_tick,
order_price_field_buy="close",
order_price_field_sell="close",
)
可转债 Tick 策略:低溢价转债日内均价回归
低转股溢价率、高流动性可转债(筛选 5 只),基于 tick 价格偏离当日 VWAP 的均值回归。买入:价格跌破 VWAP 一定幅度(超卖);卖出:价格回归至 VWAP 上方,或触发止盈/止损/超时。可转债支持 T+0 交易。
from bigquant import bigtrader, dai
from collections import deque
VWAP_WINDOW = 40
ENTRY_THRESHOLD = 0.002 # 0.2%
EXIT_THRESHOLD = 0.001 # 0.1%
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
context.take_profit = 0.01
context.stop_loss = 0.005
context.max_hold_minutes = 90
prev_day = context.add_trading_days(context.start_date, -1)
cb_sql = """
SELECT m.instrument, m.conversion_premium_rate, m.bond_balance, b.amount
FROM cn_cbond_analyze_metric AS m
INNER JOIN cn_cbond_bar1d AS b USING (date, instrument)
WHERE
m.date = $prev_day
AND m.bond_balance > 2
AND m.remaining_days > 180
AND m.conversion_premium_rate >= 0
AND m.conversion_premium_rate < 20
AND b.amount > 5000000
ORDER BY m.conversion_premium_rate ASC, b.amount DESC
LIMIT 5
"""
cb_df = dai.query(cb_sql, params={"prev_day": prev_day}).df()
context.stock_pool = cb_df["instrument"].tolist()
context.price_buf = {s: deque(maxlen=VWAP_WINDOW) for s in context.stock_pool}
context.vol_buf = {s: deque(maxlen=VWAP_WINDOW) for s in context.stock_pool}
context.last_cum_vol = {s: 0 for s in context.stock_pool}
context.hold_start = {}
def before_trading_start(context: bigtrader.IContext, data: bigtrader.IBarData):
context.subscribe(context.stock_pool)
for s in context.stock_pool:
context.price_buf[s].clear()
context.vol_buf[s].clear()
context.last_cum_vol[s] = 0
def handle_tick(context: bigtrader.IContext, tick: bigtrader.ITickData):
instrument = tick.instrument
if instrument not in context.stock_pool:
return
current_dt = tick.datetime
time_str = current_dt.strftime("%H:%M")
if time_str < "09:31" or time_str > "14:55":
return
price = tick.last_price
cum_vol = tick.volume
if price is None or price <= 0:
return
tick_vol = max(cum_vol - context.last_cum_vol[instrument], 1)
context.last_cum_vol[instrument] = cum_vol
context.price_buf[instrument].append(price)
context.vol_buf[instrument].append(tick_vol)
if len(context.price_buf[instrument]) < VWAP_WINDOW:
return
prices = list(context.price_buf[instrument])
volumes = list(context.vol_buf[instrument])
total_vol = sum(volumes)
vwap = sum(p * v for p, v in zip(prices, volumes)) / total_vol
position = context.get_position(instrument, create_if_none=False)
holding = position is not None and position.current_qty > 0
if holding:
cost_price = position.cost_price
pnl_ratio = (price - cost_price) / cost_price
if pnl_ratio >= context.take_profit:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
if pnl_ratio <= -context.stop_loss:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
start_dt = context.hold_start.get(instrument)
if start_dt is not None:
elapsed_min = (current_dt - start_dt).seconds / 60
if elapsed_min >= context.max_hold_minutes:
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
return
if price >= vwap * (1 + EXIT_THRESHOLD):
context.order_target_percent(instrument, 0)
context.hold_start.pop(instrument, None)
else:
if price <= vwap * (1 - ENTRY_THRESHOLD):
context.order_target_percent(instrument, 0.20)
context.hold_start[instrument] = current_dt
performance = bigtrader.run(
market=bigtrader.Market.CN_CBOND,
frequency=bigtrader.Frequency.TICK,
start_date="2026-04-28",
end_date="2026-04-30",
capital_base=1000000,
initialize=initialize,
before_trading_start=before_trading_start,
handle_tick=handle_tick,
order_price_field_buy="close",
order_price_field_sell="close",
)
\