BigQuant使用文档

BigQuant SDK 使用文档

由small_q创建,最终由游客 被浏览 79 用户

介绍

BigQuant SDK 提供 Python API 接口,帮助开发量化投资策略和使用 BigQuant:

  • 📊 数据查询:海量金融数据的 SQL 查询接口
  • 📈 模拟交易:策略回测与模拟交易管理
  • 💰 账户管理:交易账户的持仓、订单、资金查询
  • 🔬 回测引擎:高性能本地策略回测
  • ☁️ 云端计算:BigQuant AIStudio 云计算环境
  • 🚀 分布式计算:基于BigQuant FAI 的大规模并行计算

安装

pip install bigquant

支持系统

  • 操作系统:Windows / Linux / macOS
  • Python 3.11:目前仅支持 3.11,将支持更多 Python 版本

登录

bq --save-auth --aksk apikey
  • 配置成功后,终端会显示:“✓ Config saved to XXX”

数据

  • ” 数据
    • 您可以通过将本地数据写入 bdb 数据源,实现云端复用数据
    • 示例代码:
import bigquant


# 1、准备要写入的数据
my_data = pd.DataFrame({
    "date": pd.to_datetime(["2024-01-02"] * 5),
    "instrument": ["000001.SZ", "000002.SZ", "600000.SH", "600001.SH", "600002.SH"],
    "close": [10.5, 20.3, 15.8, 30.2, 25.6],
    "volume": [1000000, 2000000, 1500000, 3000000, 2500000],
})

# 2、写入数据源
ds = bigquant.dai.DataSource.write_bdb(
    data=my_data,
    id="my_stock_data_v1",
    unique_together=["date", "instrument"],
    on_duplicates="last",
    overwrite=True,
)

print(f"✓ 数据源已创建: {ds.id}")
  • ” 数据
    • 您可以通过读取 bdb 数据源,使用 BigQuant 数据
    • 示例代码: ”读取您刚才写入的 my_stock_data_v1 数据
read_data = bigquant.dai.DataSource("my_stock_data_v1").read_bdb()

print(f"✓ 读取成功: {len(read_data)} 行")
print(read_data)

  • ” 数据
    • 同时,您也可以通过使用 sql 语句读取数据
    • 示例代码:“查看 A股日线行情数据(cn_stock_bar1d)中 2025-01-02 当天的数据
data = bigquant.dai.query("select date, instrument, close from cn_stock_bar1d", filters={'date': ["2025-01-02", "2025-01-02"]})

print(data)

核心优势:不需要登陆网页,SDK 即可为您提供畅享 BigQuant 海量数据的能力!

策略

  • 开始您的第一个策略
    • 示例:复制以下策略代码(“债券小市值策略-去除退市”),命名为 demo_strategy.py
from bigquant import bigtrader, dai

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.00005, sell_cost=0.00005, min_cost=5))

    # 选股数量
    stock_num = 25

    context.logger.info("开始计算选股数据...")
    
    sql = """
        select date, 
        instrument,  
        conversion_premium_rate + close as score1,
        c_rank(score1)+c_rank(bond_balance) as score,
        1.0/$stock_num AS weight
        FROM cn_cbond_analyze_metric 

        inner join cn_cbond_bar1d  USING (date, instrument)

        where 
            remaining_days > 60
            and bond_balance > 0.5
        ORDER BY date, score ASC
        """

    from bigquant import dai
    import pandas as pd

    # C++ 引擎兼容:手动计算 10 个交易日前的日期
    start_date_with_buffer = (pd.to_datetime(context.start_date) - pd.Timedelta(days=15)).strftime('%Y-%m-%d')

    df = dai.query(
        sql,
        filters={"date": [start_date_with_buffer, context.end_date]},
        params={"stock_num": stock_num}
    ).df()
    context.logger.info(f"数据计算完成: {len(df)} 条记录")

    sql = "select instrument, delist_date from cn_cbond_basic_info "
    basic_info = dai.query(sql).df() 

    import pandas as pd 
    merge_df = pd.merge(df, basic_info, how='left',on=['instrument'])

    merge_df['days'] = (merge_df['delist_date']-merge_df['date']).apply(lambda x:x.days)
    df =merge_df[merge_df['days'] >= 15] # 必须20天前就平掉

    # 每天选取流通市值最小的前N只股票
    df = df.groupby('date').head(stock_num)

    # 设置每日调仓
    df = bigtrader.TradingDaysRebalance(5, context=context).select_rebalance_data(df) # 2-3天调仓效果更好

    context.data = df

performance = bigtrader.run(
    market=bigtrader.Market.CN_CBOND,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2018-01-01",
    end_date="2025-12-03",
    capital_base=200000,
    initialize=initialize,
    handle_data=bigtrader.HandleDataLib.handle_data_weight_based,
    order_price_field_buy='open',
    order_price_field_sell='open')

print(performance.get_stats())
  • 本地回测
    • 在 SDK 环境下运行策略代码:python demo_strategy.py,获得策略的 波动率(volatility)、夏普比率(sharpe_ratio)等指标。
  • 云端回测
    • 运行指令:bq aistudio run demo_strategy.py

核心优势:使用 SDK 快速本地回测、云端复杂回测,为您提供策略评估和绩效评估

部署运行

  • 查看账户策略
    • 示例代码:
import bigquant

strategy_list = bigquant.papertrading.list()
print(startegy_list)
  • 查看策略绩效
    • 示例代码:Strategy_ID(通过上述策略列表中的 策略ID 获取)
import bigquant

strategy = bigquant.papertrading.get("Strategy_ID")
print(strategy.get_performances())

大规模算力

  • 创建集群
    • 代码:您可以通过代码声明式创建集群,实现算力单元的精细化编排。
      • 示例代码:“创建名为 ‘my_cluster’、pod 数量为 2、cpu 数量为 2、内存资源为 4G 的集群“
from bigquant import fai

# 创建名为 ‘mycluster’,pod 数量为 2、cpu 数量为 2、内存资源为 8G 的集群
cluster = fai.create_cluster(
    cluster_name="mycluster",
    num_workers=2,
    worker_cpus=3,
    worker_memory="8G",
)
cluster.wait_cluster("Running")

print(f"集群已创建,ID: {cluster.cluster_info['id']}")

  • 运行代码
    • 示例代码:“分布式数据统计计算”
    • CLUSTER_ID 为您之前创建集群获得的 ID
# 替换为你的集群 ID
CLUSTER_ID = "11817776-4530-4eda-9373-1519582e47c2"

# 获取集群
cluster = fai.get_cluster(CLUSTER_ID)

# 初始化连接
cluster.init()

# 定义远程函数:计算数据统计信息
@fai.remote
def compute_statistics(data_range):
    import time, math
    data = list(range(data_range[0], data_range[1]))
    n = len(data)
    mean = sum(data) / n
    std_dev = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
    time.sleep(0.5)

    return {"range": data_range, "count": n, "mean": mean, "std_dev": std_dev, "sum": sum(data)}

# 提交并行任务
tasks = [(i * 1000, (i + 1) * 1000) for i in range(5)]
result_refs = [compute_statistics.remote(task) for task in tasks]

# 获取结果并输出
results = fai.get(result_refs)
for r in results:
    print(f"范围 {r['range']}: 均值={r['mean']:.2f}, 标准差={r['std_dev']:.2f}, 总和={r['sum']}")

  • 删除集群
    • 代码:
      • CLUSTER_ID 为您之前创建集群获得的 ID
# 替换为你的集群 ID
CLUSTER_ID = "11817776-4530-4eda-9373-1519582e47c2"

# 获取集群对象
cluster = fai.get_cluster(CLUSTER_ID)

# 删除集群
result = cluster.delete_cluster()
print(f"集群已删除: {result}")

核心优势:SDK 为您的大规模并行计算提供强大算力支持!

更多资料

\

标签

模拟交易
评论
  • undefined
  • bigquant模块还没有更新吧, 今天试了里面还没有bigtrader功能
{link}