BigQuant SDK 使用文档
由small_q创建,最终由游客 被浏览 79 用户
介绍
BigQuant SDK 提供 Python API 接口,帮助开发量化投资策略和使用 BigQuant:
- 📊 数据查询:海量金融数据的 SQL 查询接口
- 📈 模拟交易:策略回测与模拟交易管理
- 💰 账户管理:交易账户的持仓、订单、资金查询
- 🔬 回测引擎:高性能本地策略回测
- ☁️ 云端计算:BigQuant AIStudio 云计算环境
- 🚀 分布式计算:基于BigQuant FAI 的大规模并行计算
安装
pip install bigquant
支持系统
- 操作系统:Windows / Linux / macOS
- Python 3.11:目前仅支持 3.11,将支持更多 Python 版本
登录
- 登录 BigQuant 平台获取 API Key:BigQuant > 我的 > API Keys > 创建
bq --save-auth --aksk apikey
- 配置成功后,终端会显示:
“✓ Config saved to XXX”
数据
- “写” 数据
- 您可以通过将本地数据写入 bdb 数据源,实现云端复用数据
- 示例代码:
import bigquant
# 1、准备要写入的数据
my_data = pd.DataFrame({
"date": pd.to_datetime(["2024-01-02"] * 5),
"instrument": ["000001.SZ", "000002.SZ", "600000.SH", "600001.SH", "600002.SH"],
"close": [10.5, 20.3, 15.8, 30.2, 25.6],
"volume": [1000000, 2000000, 1500000, 3000000, 2500000],
})
# 2、写入数据源
ds = bigquant.dai.DataSource.write_bdb(
data=my_data,
id="my_stock_data_v1",
unique_together=["date", "instrument"],
on_duplicates="last",
overwrite=True,
)
print(f"✓ 数据源已创建: {ds.id}")
- “读” 数据
- 您可以通过读取 bdb 数据源,使用 BigQuant 数据
- 示例代码: ”读取您刚才写入的 my_stock_data_v1 数据”
read_data = bigquant.dai.DataSource("my_stock_data_v1").read_bdb()
print(f"✓ 读取成功: {len(read_data)} 行")
print(read_data)
- “查” 数据
- 同时,您也可以通过使用 sql 语句读取数据
- 示例代码:“查看 A股日线行情数据(cn_stock_bar1d)中 2025-01-02 当天的数据”
data = bigquant.dai.query("select date, instrument, close from cn_stock_bar1d", filters={'date': ["2025-01-02", "2025-01-02"]})
print(data)
核心优势:不需要登陆网页,SDK 即可为您提供畅享 BigQuant 海量数据的能力!
策略
- 开始您的第一个策略
- 示例:复制以下策略代码(“债券小市值策略-去除退市”),命名为
demo_strategy.py
- 示例:复制以下策略代码(“债券小市值策略-去除退市”),命名为
from bigquant import bigtrader, dai
def initialize(context: bigtrader.IContext):
context.set_commission(bigtrader.PerOrder(buy_cost=0.00005, sell_cost=0.00005, min_cost=5))
# 选股数量
stock_num = 25
context.logger.info("开始计算选股数据...")
sql = """
select date,
instrument,
conversion_premium_rate + close as score1,
c_rank(score1)+c_rank(bond_balance) as score,
1.0/$stock_num AS weight
FROM cn_cbond_analyze_metric
inner join cn_cbond_bar1d USING (date, instrument)
where
remaining_days > 60
and bond_balance > 0.5
ORDER BY date, score ASC
"""
from bigquant import dai
import pandas as pd
# C++ 引擎兼容:手动计算 10 个交易日前的日期
start_date_with_buffer = (pd.to_datetime(context.start_date) - pd.Timedelta(days=15)).strftime('%Y-%m-%d')
df = dai.query(
sql,
filters={"date": [start_date_with_buffer, context.end_date]},
params={"stock_num": stock_num}
).df()
context.logger.info(f"数据计算完成: {len(df)} 条记录")
sql = "select instrument, delist_date from cn_cbond_basic_info "
basic_info = dai.query(sql).df()
import pandas as pd
merge_df = pd.merge(df, basic_info, how='left',on=['instrument'])
merge_df['days'] = (merge_df['delist_date']-merge_df['date']).apply(lambda x:x.days)
df =merge_df[merge_df['days'] >= 15] # 必须20天前就平掉
# 每天选取流通市值最小的前N只股票
df = df.groupby('date').head(stock_num)
# 设置每日调仓
df = bigtrader.TradingDaysRebalance(5, context=context).select_rebalance_data(df) # 2-3天调仓效果更好
context.data = df
performance = bigtrader.run(
market=bigtrader.Market.CN_CBOND,
frequency=bigtrader.Frequency.DAILY,
start_date="2018-01-01",
end_date="2025-12-03",
capital_base=200000,
initialize=initialize,
handle_data=bigtrader.HandleDataLib.handle_data_weight_based,
order_price_field_buy='open',
order_price_field_sell='open')
print(performance.get_stats())
- 本地回测
- 在 SDK 环境下运行策略代码:
python demo_strategy.py,获得策略的 波动率(volatility)、夏普比率(sharpe_ratio)等指标。
- 在 SDK 环境下运行策略代码:
- 云端回测
- 运行指令:
bq aistudio run demo_strategy.py
- 运行指令:
核心优势:使用 SDK 快速本地回测、云端复杂回测,为您提供策略评估和绩效评估
部署运行
- 查看账户策略
- 示例代码:
import bigquant
strategy_list = bigquant.papertrading.list()
print(startegy_list)
- 查看策略绩效
- 示例代码:Strategy_ID(通过上述策略列表中的 策略ID 获取)
import bigquant
strategy = bigquant.papertrading.get("Strategy_ID")
print(strategy.get_performances())
大规模算力
- 创建集群
- 代码:您可以通过代码声明式创建集群,实现算力单元的精细化编排。
- 示例代码:“创建名为 ‘my_cluster’、pod 数量为 2、cpu 数量为 2、内存资源为 4G 的集群“
- 代码:您可以通过代码声明式创建集群,实现算力单元的精细化编排。
from bigquant import fai
# 创建名为 ‘mycluster’,pod 数量为 2、cpu 数量为 2、内存资源为 8G 的集群
cluster = fai.create_cluster(
cluster_name="mycluster",
num_workers=2,
worker_cpus=3,
worker_memory="8G",
)
cluster.wait_cluster("Running")
print(f"集群已创建,ID: {cluster.cluster_info['id']}")
- 运行代码
- 示例代码:“分布式数据统计计算”
- CLUSTER_ID 为您之前创建集群获得的 ID
# 替换为你的集群 ID
CLUSTER_ID = "11817776-4530-4eda-9373-1519582e47c2"
# 获取集群
cluster = fai.get_cluster(CLUSTER_ID)
# 初始化连接
cluster.init()
# 定义远程函数:计算数据统计信息
@fai.remote
def compute_statistics(data_range):
import time, math
data = list(range(data_range[0], data_range[1]))
n = len(data)
mean = sum(data) / n
std_dev = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
time.sleep(0.5)
return {"range": data_range, "count": n, "mean": mean, "std_dev": std_dev, "sum": sum(data)}
# 提交并行任务
tasks = [(i * 1000, (i + 1) * 1000) for i in range(5)]
result_refs = [compute_statistics.remote(task) for task in tasks]
# 获取结果并输出
results = fai.get(result_refs)
for r in results:
print(f"范围 {r['range']}: 均值={r['mean']:.2f}, 标准差={r['std_dev']:.2f}, 总和={r['sum']}")
- 删除集群
- 代码:
- CLUSTER_ID 为您之前创建集群获得的 ID
- 代码:
# 替换为你的集群 ID
CLUSTER_ID = "11817776-4530-4eda-9373-1519582e47c2"
# 获取集群对象
cluster = fai.get_cluster(CLUSTER_ID)
# 删除集群
result = cluster.delete_cluster()
print(f"集群已删除: {result}")
核心优势:SDK 为您的大规模并行计算提供强大算力支持!
更多资料
\