跳转至

API 使用示例

本文档提供 bt_api_py 的实际使用示例。


基础示例

初始化 API

from bt_api_py import BtApi

# 单个交易所

api = BtApi(exchange_kwargs={
    "BINANCE___SPOT": {
        "api_key": "your_api_key",
        "secret": "your_secret",
        "testnet": True,
    }
})

# 多个交易所

api = BtApi(exchange_kwargs={
    "BINANCE___SPOT": {
        "api_key": "binance_key",
        "secret": "binance_secret",
    },
    "OKX___SPOT": {
        "api_key": "okx_key",
        "secret": "okx_secret",
        "passphrase": "okx_passphrase",
    },
})

# 使用配置文件

import yaml
with open("config.yaml") as f:
    config = yaml.safe_load(f)
api = BtApi(exchange_kwargs=config["exchanges"])

基础查询

# 获取行情

ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
ticker.init_data()
print(f"最新价: {ticker.get_last_price()}")

# 获取深度

orderbook = api.get_depth("BINANCE___SPOT", "BTCUSDT", count=10)
orderbook.init_data()
print(f"买一: {orderbook.get_bids()[0]}")

# 获取 K 线

klines = api.get_kline("BINANCE___SPOT", "BTCUSDT", "1m", count=10)
for bar in klines:
    bar.init_data()
    print(f"时间: {bar.get_open_time()}, 收盘: {bar.get_close_price()}")

行情数据

获取多个交易对的行情

symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]

for symbol in symbols:
    ticker = api.get_tick("BINANCE___SPOT", symbol)
    ticker.init_data()
    print(f"{symbol}: {ticker.get_last_price()}")

监控价格变化

import time

def monitor_price(symbol, interval=5):
    """监控价格变化"""
    last_price = None

    while True:
        ticker = api.get_tick("BINANCE___SPOT", symbol)
        ticker.init_data()
        price = ticker.get_last_price()

        if last_price is not None:
            change = price - last_price
            change_pct = (change / last_price) *100
            print(f"价格: {price}, 变化: {change:+.2f} ({change_pct:+.2f}%)")
        else:
            print(f"初始价格: {price}")

        last_price = price
        time.sleep(interval)

# 使用

monitor_price("BTCUSDT")

批量获取历史 K 线

def download_bars(symbol, period, days=7):
    """下载历史 K 线"""
    from datetime import datetime, timedelta

    end_time = datetime.now()
    start_time = end_time - timedelta(days=days)

    api.download_history_bars(
        "BINANCE___SPOT",
        symbol,
        period,
        start_time=start_time,
        end_time=end_time
    )

# 从队列获取数据
    data_queue = api.get_data_queue("BINANCE___SPOT")
    bars = []
    while not data_queue.empty():
        bar = data_queue.get()
        bar.init_data()
        bars.append(bar)

    return bars

bars = download_bars("BTCUSDT", "1H", days=30)
print(f"下载了 {len(bars)} 根 K 线")

计算技术指标

def calculate_ma(bars, period=20):
    """计算移动平均线"""
    prices = [bar.get_close_price() for bar in bars]
    ma = []
    for i in range(len(prices)):
        if i >= period - 1:
            ma.append(sum(prices[i-period+1:i+1]) / period)
        else:
            ma.append(None)
    return ma

def calculate_rsi(bars, period=14):
    """计算 RSI 指标"""
    prices = [bar.get_close_price() for bar in bars]
    gains = []
    losses = []

    for i in range(1, len(prices)):
        change = prices[i] - prices[i-1]
        if change > 0:
            gains.append(change)
            losses.append(0)
        else:
            gains.append(0)
            losses.append(-change)

    avg_gain = sum(gains[:period]) / period
    avg_loss = sum(losses[:period]) / period
    rsi = []

    for i in range(period, len(gains)):
        avg_gain = (avg_gain*(period - 1) + gains[i]) / period
        avg_loss = (avg_loss*(period - 1) + losses[i]) / period
        rs = avg_gain / avg_loss if avg_loss != 0 else 0
        rsi.append(100 - (100 / (1 + rs)))

    return rsi

# 使用

klines = api.get_kline("BINANCE___SPOT", "BTCUSDT", "1H", count=100)
for bar in klines:
    bar.init_data()

ma_20 = calculate_ma(klines, 20)
rsi_14 = calculate_rsi(klines, 14)

print(f"最新 MA20: {ma_20[-1]:.2f}")
print(f"最新 RSI14: {rsi_14[-1]:.2f}")

交易操作

下限价单

# 创建订单

order = api.make_order(
    exchange_name="BINANCE___SPOT",
    symbol="BTCUSDT",
    volume=0.001,
    price=45000,
    order_type="limit"
)
order.init_data()

print(f"订单 ID: {order.get_order_id()}")
print(f"订单状态: {order.get_order_status()}")

下市价单

# 市价单(price 传 0)

order = api.make_order(
    exchange_name="BINANCE___SPOT",
    symbol="BTCUSDT",
    volume=0.001,
    price=0,
    order_type="market"
)

下条件单(只做 maker)

# Post-only 订单(只做 maker,不立即成交)

order = api.make_order(
    exchange_name="BINANCE___SPOT",
    symbol="BTCUSDT",
    volume=0.001,
    price=45000,
    order_type="limit",
    post_only=True  # 只做 maker

)

CTP 下单(指定开平)

# CTP 必须指定开平方向

order = api.make_order(
    exchange_name="CTP___FUTURE",
    symbol="IF2506",
    volume=1,
    price=3500.0,
    order_type="limit",
    offset="open"  # 开仓

)

查询订单状态

def wait_order_filled(api, exchange_name, symbol, order_id, timeout=60):
    """等待订单成交"""
    import time
    start_time = time.time()

    while time.time() - start_time < timeout:
        order = api.query_order(exchange_name, symbol, order_id)
        order.init_data()
        status = order.get_order_status()

        print(f"订单状态: {status}")

        if status in ["filled", "canceled", "rejected"]:
            return order

        time.sleep(2)

    return order

# 使用

order = api.make_order("BINANCE___SPOT", "BTCUSDT", 0.001, 45000, "limit")
order.init_data()

result = wait_order_filled(
    api,
    "BINANCE___SPOT",
    "BTCUSDT",
    order.get_order_id()
)

撤销所有订单

# 撤销特定交易对的订单

api.cancel_all("BINANCE___SPOT", symbol="BTCUSDT")

# 撤销所有订单

api.cancel_all("BINANCE___SPOT")

# 撤销多个交易所的订单

for exchange in api.list_exchanges():
    api.cancel_all(exchange)

账户管理

查询账户余额

def print_balance(api, exchange_name):
    """打印账户余额"""
    balance = api.get_balance(exchange_name)
    balance.init_data()

    print(f"\n=== {exchange_name} 余额 ===")
    for asset in balance:
        print(f"{asset.get_currency()}:")
        print(f"  可用: {asset.get_free()}")
        print(f"  冻结: {asset.get_locked()}")
        print(f"  总计: {asset.get_total()}")

print_balance(api, "BINANCE___SPOT")

查询持仓

def print_positions(api, exchange_name):
    """打印持仓信息"""
    positions = api.get_position(exchange_name)

    print(f"\n=== {exchange_name} 持仓 ===")
    total_pnl = 0

    for pos in positions:
        pos.init_data()
        pnl = pos.get_unrealized_profit()
        total_pnl += pnl

        print(f"{pos.get_symbol_name()}:")
        print(f"  持仓: {pos.get_position()}")
        print(f"  开仓价: {pos.get_open_price()}")
        print(f"  未实现盈亏: {pnl:.2f}")

    print(f"总未实现盈亏: {total_pnl:.2f}")

print_positions(api, "BINANCE___SWAP")

更新账户净值

# 更新所有交易所的账户净值

api.update_total_balance()

# 获取总净值和现金

total_value = api.get_total_value()
total_cash = api.get_total_cash()

print("账户总净值:")
for exchange, value in total_value.items():
    cash = total_cash[exchange]
    for currency, val in value.items():
        print(f"  {exchange} {currency}: 净值={val:.2f}, 现金={cash[currency]:.2f}")

WebSocket 订阅

K 线订阅

from bt_api_py import BtApi

api = BtApi(exchange_kwargs={...})
data_queue = api.get_data_queue("BINANCE___SPOT")

# 订阅 K 线

api.subscribe("BINANCE___SPOT___BTCUSDT", [
    {"topic": "kline", "symbol": "BTCUSDT", "period": "1m"},
])

# 处理推送数据

while True:
    bar = data_queue.get()
    bar.init_data()
    print(f"K 线更新: {bar.get_close_price()} - {bar.get_volume()}")

使用回调模式

def on_kline(bar):
    """K 线回调"""
    bar.init_data()
    print(f"K 线: {bar.get_open_time()} - {bar.get_close_price()}")

def on_order(order):
    """订单回调"""
    order.init_data()
    print(f"订单: {order.get_order_id()} - {order.get_order_status()}")

# 注册回调

event_bus = api.get_event_bus()
event_bus.subscribe("kline", on_kline)
event_bus.subscribe("order", on_order)

# 订阅并运行

api.subscribe_kline("BINANCE___SPOT", "BTCUSDT", "1m")
api.run()

简单量化策略

class SimpleStrategy:
    def __init__(self, api):
        self.api = api
        self.event_bus = api.get_event_bus()
        self.position = 0
        self.entry_price = None

    def on_tick(self, ticker):
        """行情回调"""
        ticker.init_data()
        price = ticker.get_last_price()

# 简单的均线策略
        if price > 50000 and self.position == 0:
            self.buy()
        elif price < 45000 and self.position > 0:
            self.sell()

        print(f"价格: {price}, 持仓: {self.position}")

    def buy(self):
        """买入"""
        self.api.make_order("BINANCE___SPOT", "BTCUSDT", 0.001, 0, "market")
        self.position = 1

    def sell(self):
        """卖出"""
        self.api.make_order("BINANCE___SPOT", "BTCUSDT", 0.001, 0, "market")
        self.position = 0

# 使用

api = BtApi(exchange_kwargs={...})
strategy = SimpleStrategy(api)

strategy.event_bus.subscribe("ticker", strategy.on_tick)
api.subscribe_ticker("BINANCE___SPOT", "BTCUSDT")
api.run()

多交易所操作

套利交易

def find_arbitrage(api, symbol):
    """寻找套利机会"""
    exchanges = ["BINANCE___SPOT", "OKX___SPOT"]

    prices = {}
    for exchange in exchanges:
        ticker = api.get_tick(exchange, symbol)
        ticker.init_data()
        prices[exchange] = ticker.get_last_price()

# 计算价差
    binance_price = prices["BINANCE___SPOT"]
    okx_price = prices["OKX___SPOT"]
    spread = okx_price - binance_price
    spread_pct = (spread / binance_price)*100

    print(f"Binance: {binance_price}")
    print(f"OKX: {okx_price}")
    print(f"价差: {spread} ({spread_pct:.2f}%)")

# 判断是否有套利机会
    if abs(spread_pct) > 0.5:  # 价差超过 0.5%
        if spread > 0:
            print("OKX 更贵,可低买高卖")
            return {"buy": "BINANCE___SPOT", "sell": "OKX___SPOT"}
        else:
            print("Binance 更贵,可低买高卖")
            return {"buy": "OKX___SPOT", "sell": "BINANCE___SPOT"}

    return None

# 使用

arbitrage = find_arbitrage(api, "BTCUSDT")
if arbitrage:
    print(f"套利机会: 买入 {arbitrage['buy']}, 卖出 {arbitrage['sell']}")

获取最优价格

def get_best_price(api, symbol, side="buy"):
    """获取最优价格"""
    exchanges = ["BINANCE___SPOT", "OKX___SPOT"]
    best_price = None
    best_exchange = None

    for exchange in exchanges:
        ticker = api.get_tick(exchange, symbol)
        ticker.init_data()

        if side == "buy":
            price = ticker.get_ask_price()  # 卖一价(买入价)
        else:
            price = ticker.get_bid_price()  # 买一价(卖出价)

        if best_price is None or price < best_price:
            best_price = price
            best_exchange = exchange

    return best_exchange, best_price

# 使用

exchange, price = get_best_price(api, "BTCUSDT", side="buy")
print(f"最优买入: {exchange} @ {price}")

跨交易所价差监控

import time

def monitor_spread(api, symbol, interval=5):
    """监控跨交易所价差"""
    exchanges = ["BINANCE___SPOT", "OKX___SPOT"]

    while True:
        prices = {}
        for exchange in exchanges:
            try:
                ticker = api.get_tick(exchange, symbol)
                ticker.init_data()
                prices[exchange] = ticker.get_last_price()
            except Exception as e:
                print(f"{exchange} 获取失败: {e}")
                continue

        if len(prices) == len(exchanges):
            price_values = list(prices.values())
            max_price = max(price_values)
            min_price = min(price_values)
            spread = max_price - min_price
            spread_pct = (spread / min_price)*100

            print(f"\n{symbol} 价差监控:")
            for exchange, price in prices.items():
                print(f"  {exchange}: {price}")

            print(f"  最大价差: {spread} ({spread_pct:.4f}%)")

        time.sleep(interval)

monitor_spread(api, "BTCUSDT")

策略开发

网格交易策略

class GridStrategy:
    def __init__(self, api, symbol, base_price, grid_count=10, grid_spacing=0.01):
        self.api = api
        self.symbol = symbol
        self.base_price = base_price
        self.grid_count = grid_count
        self.grid_spacing = grid_spacing
        self.grids = []
        self.init_grids()

    def init_grids(self):
        """初始化网格"""
        for i in range(-self.grid_count, self.grid_count + 1):
            price = self.base_price*(1 + i*self.grid_spacing)
            self.grids.append({
                "price": price,
                "filled": False,
                "order_id": None
            })

    def run(self):
        """运行策略"""
        while True:
            ticker = self.api.get_tick("BINANCE___SPOT", self.symbol)
            ticker.init_data()
            current_price = ticker.get_last_price()

# 检查是否触发网格
            for grid in self.grids:
                if not grid["filled"]:

# 买入网格
                    if current_price <= grid["price"]:
                        self.place_order("buy", grid["price"])

# 卖出网格
                    elif current_price >= grid["price"]:
                        self.place_order("sell", grid["price"])

    def place_order(self, side, price):
        """下单"""

# 这里应该有更复杂的订单管理
        order = self.api.make_order(
            "BINANCE___SPOT",
            self.symbol,
            0.001,
            price,
            "limit"
        )
        print(f"网格订单: {side} @ {price}")

动量策略

class MomentumStrategy:
    def __init__(self, api, symbol, period=20):
        self.api = api
        self.symbol = symbol
        self.period = period
        self.prices = []

    def update(self):
        """更新策略"""
        ticker = self.api.get_tick("BINANCE___SPOT", self.symbol)
        ticker.init_data()
        price = ticker.get_last_price()

        self.prices.append(price)
        if len(self.prices) > self.period:
            self.prices.pop(0)

        if len(self.prices) >= self.period:
            ma = sum(self.prices) / len(self.prices)
            momentum = (price - ma) / ma* 100

            print(f"价格: {price}, 均线: {ma:.2f}, 动量: {momentum:.2f}%")

# 动量超过阈值时交易
            if momentum > 2 and self.position == 0:
                self.buy()
            elif momentum < -2 and self.position > 0:
                self.sell()

数据处理

保存数据到文件

import json
import csv
from datetime import datetime

def save_klines_to_csv(api, symbol, period, count, filename):
    """保存 K 线到 CSV"""
    klines = api.get_kline("BINANCE___SPOT", symbol, period, count)

    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['时间', '开盘', '最高', '最低', '收盘', '成交量'])

        for bar in klines:
            bar.init_data()
            timestamp = datetime.fromtimestamp(bar.get_open_time() / 1000)
            writer.writerow([
                timestamp,
                bar.get_open_price(),
                bar.get_high_price(),
                bar.get_low_price(),
                bar.get_close_price(),
                bar.get_volume()
            ])

    print(f"已保存 {len(klines)} 条数据到 {filename}")

# 使用

save_klines_to_csv(api, "BTCUSDT", "1H", 1000, "btc_klines.csv")

数据库存储

import sqlite3

def init_db():
    """初始化数据库"""
    conn = sqlite3.connect('trading.db')
    cursor = conn.cursor()

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS ticks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            exchange TEXT,
            symbol TEXT,
            price REAL,
            volume REAL,
            timestamp INTEGER
        )
    ''')

    conn.commit()
    return conn

def save_tick(conn, exchange, ticker):
    """保存 tick 到数据库"""
    ticker.init_data()
    cursor = conn.cursor()

    cursor.execute('''
        INSERT INTO ticks (exchange, symbol, price, volume, timestamp)
        VALUES (?, ?, ?, ?, ?)
    ''', (
        exchange,
        ticker.get_symbol_name(),
        ticker.get_last_price(),
        ticker.get_last_volume(),
        ticker.get_server_time()
    ))

    conn.commit()

# 使用

conn = init_db()
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
save_tick(conn, "BINANCE___SPOT", ticker)

错误处理

from bt_api_py.exceptions import ExchangeNotFoundError

def safe_get_tick(api, exchange_name, symbol):
    """安全获取行情"""
    try:
        ticker = api.get_tick(exchange_name, symbol)
        ticker.init_data()
        return ticker
    except ExchangeNotFoundError:
        print(f"交易所 {exchange_name} 不存在")
        return None
    except Exception as e:
        print(f"获取行情失败: {e}")
        return None

# 使用

ticker = safe_get_tick(api, "BINANCE___SPOT", "BTCUSDT")
if ticker:
    print(ticker.get_last_price())

更多示例