最佳实践指南¶
bt_api_py 开发和使用的最佳实践。
代码规范¶
使用类型提示¶
from bt_api_py import BtApi
from typing import Optional
def get_current_price(api: BtApi, exchange: str, symbol: str) -> Optional[float]:
"""获取当前价格"""
try:
ticker = api.get_tick(exchange, symbol)
ticker.init_data()
return ticker.get_last_price()
except Exception:
return None
使用常量定义¶
# 交易所常量
EXCHANGE_BINANCE_SPOT = "BINANCE___SPOT"
EXCHANGE_OKX_SPOT = "OKX___SPOT"
EXCHANGE_CTP_FUTURE = "CTP___FUTURE"
# 订单类型常量
ORDER_TYPE_LIMIT = "limit"
ORDER_TYPE_MARKET = "market"
# 订单方向常量
ORDER_SIDE_BUY = "buy"
ORDER_SIDE_SELL = "sell"
# 使用
ticker = api.get_tick(EXCHANGE_BINANCE_SPOT, "BTCUSDT")
配置管理¶
# config.py
import yaml
from dataclasses import dataclass
@dataclass
class ExchangeConfig:
api_key: str
secret: str
passphrase: str = ""
testnet: bool = True
def load_config(config_file: str) -> dict:
"""加载配置文件"""
with open(config_file) as f:
config = yaml.safe_load(f)
return {
"BINANCE___SPOT": ExchangeConfig(**config["binance"]),
"OKX___SPOT": ExchangeConfig(**config["okx"]),
}
# 使用
exchange_kwargs = load_config("config.yaml")
api = BtApi(exchange_kwargs=exchange_kwargs)
API 使用¶
初始化连接¶
# ❌ 不推荐:每次创建新实例
def get_price():
api = BtApi(exchange_kwargs={...})
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
ticker.init_data()
return ticker
# ✅ 推荐:复用实例
api = BtApi(exchange_kwargs={...})
def get_price():
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
ticker.init_data()
return ticker
连接池管理¶
class ExchangePool:
"""交易所连接池"""
def __init__(self):
self.apis = {}
def get_api(self, exchange_name):
"""获取 API 实例"""
if exchange_name not in self.apis:
self.apis[exchange_name] = self._create_api(exchange_name)
return self.apis[exchange_name]
def _create_api(self, exchange_name):
"""创建 API 实例"""
# 从配置加载
config = load_config(exchange_name)
return BtApi(exchange_kwargs={exchange_name: config})
# 使用
pool = ExchangePool()
binance_api = pool.get_api("BINANCE___SPOT")
批量操作优化¶
# ❌ 不推荐:串行请求
for symbol in ["BTCUSDT", "ETHUSDT", "BNBUSDT"]:
ticker = api.get_tick("BINANCE___SPOT", symbol)
ticker.init_data()
print(ticker.get_last_price())
# ✅ 推荐:使用批量方法
ticks = api.get_all_ticks("BTCUSDT")
for exchange, ticker in ticks.items():
ticker.init_data()
print(f"{exchange}: {ticker.get_last_price()}")
资源清理¶
from contextlib import contextmanager
@contextmanager
def api_context(exchange_kwargs):
"""API 上下文管理器"""
api = BtApi(exchange_kwargs=exchange_kwargs)
try:
yield api
finally:
# 清理资源
for exchange in api.list_exchanges():
# 取消订阅
api.cancel_all(exchange)
print("API 连接已关闭")
# 使用
with api_context({...}) as api:
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
print(ticker.get_last_price())
性能优化¶
使用异步接口¶
import asyncio
async def get_multiple_prices(api, symbols):
"""并发获取多个价格"""
loop = asyncio.get_running_loop()
tasks = []
for symbol in symbols:
task = loop.run_in_executor(
None,
lambda s=symbol: api.get_tick("BINANCE___SPOT", s)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
for ticker in results:
ticker.init_data()
print(ticker.get_last_price())
# 使用
asyncio.run(get_multiple_prices(api, ["BTCUSDT", "ETHUSDT"]))
数据缓存¶
from functools import lru_cache
import time
class CachedTicker:
"""带缓存的行情获取"""
def __init__(self, api, ttl=5):
self.api = api
self.ttl = ttl # 缓存过期时间(秒)
self.cache = {}
def get_tick(self, exchange, symbol):
"""获取行情(带缓存)"""
key = f"{exchange}:{symbol}"
now = time.time()
if key in self.cache:
data, timestamp = self.cache[key]
if now - timestamp < self.ttl:
return data
# 从 API 获取
ticker = self.api.get_tick(exchange, symbol)
ticker.init_data()
self.cache[key] = (ticker, now)
return ticker
# 使用
cached_ticker = CachedTicker(api, ttl=5)
ticker = cached_ticker.get_tick("BINANCE___SPOT", "BTCUSDT")
WebSocket 代替轮询¶
# ❌ 不推荐:轮询获取价格
while True:
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
ticker.init_data()
print(ticker.get_last_price())
time.sleep(1)
# ✅ 推荐:WebSocket 订阅
def on_ticker(ticker):
ticker.init_data()
print(ticker.get_last_price())
api.event_bus.subscribe("ticker", on_ticker)
api.subscribe_ticker("BINANCE___SPOT", "BTCUSDT")
api.run()
安全实践¶
API Key 管理¶
# ❌ 不推荐:硬编码 API Key
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "abc123...",
"secret": "xyz789...",
}
})
# ✅ 推荐:使用环境变量
import os
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": os.getenv("BINANCE_API_KEY"),
"secret": os.getenv("BINANCE_SECRET"),
}
})
# 或使用配置文件(不提交到 Git)
# config.yaml (在 .gitignore 中)
# binance:
# api_key: abc123...
# secret: xyz789...
使用测试网络¶
# 开发时使用测试网络
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "...",
"secret": "...",
"testnet": True, # 开发时使用测试网
}
})
# 生产环境通过环境变量控制
TESTNET = os.getenv("TESTNET", "true").lower() == "true"
订单金额限制¶
MAX_ORDER_SIZE = 0.1 # BTC
MAX_ORDER_VALUE = 10000 # USDT
def validate_order(symbol, volume, price):
"""验证订单参数"""
order_value = volume * price
if volume > MAX_ORDER_SIZE:
raise ValueError(f"订单数量过大: {volume} > {MAX_ORDER_SIZE}")
if order_value > MAX_ORDER_VALUE:
raise ValueError(f"订单金额过大: {order_value} > {MAX_ORDER_VALUE}")
return True
# 使用
if validate_order("BTCUSDT", 0.001, 50000):
api.make_order("BINANCE___SPOT", "BTCUSDT", 0.001, 50000, "limit")
测试策略¶
单元测试¶
import pytest
from bt_api_py import BtApi
@pytest.fixture
def test_api():
"""测试用 API"""
return BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "test_key",
"secret": "test_secret",
"testnet": True,
}
})
def test_get_tick(test_api):
"""测试获取行情"""
ticker = test_api.get_tick("BINANCE___SPOT", "BTCUSDT")
assert ticker is not None
def test_api_error_handling(test_api):
"""测试错误处理"""
with pytest.raises(ExchangeNotFoundError):
test_api.get_tick("INVALID_EXCHANGE", "BTCUSDT")
Mock 测试¶
from unittest.mock import Mock, patch
def test_with_mock():
"""使用 mock 测试"""
# Mock ticker 数据
mock_ticker = Mock()
mock_ticker.get_last_price.return_value = 50000
with patch.object(api, 'get_tick', return_value=mock_ticker):
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
assert ticker.get_last_price() == 50000
部署建议¶
Docker 容器化¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install .
ENV PYTHONUNBUFFERED=1
CMD ["python", "main.py"]
健康检查¶
def health_check(api):
"""健康检查"""
try:
# 检查所有交易所连接
for exchange in api.list_exchanges():
# 尝试获取账户信息
api.get_account(exchange)
return True
except Exception as e:
print(f"健康检查失败: {e}")
return False
# 定期健康检查
import schedule
def run_health_check():
while True:
if not health_check(api):
print("检测到异常,尝试重新连接...")
schedule.run_pending()
time.sleep(60)
# 后台运行
import threading
threading.Thread(target=run_health_check, daemon=True).start()
日志轮转¶
import logging
from logging.handlers import RotatingFileHandler
# 设置日志轮转
handler = RotatingFileHandler(
'bt_api.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
logger = logging.getLogger("bt_api")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
代码组织¶
项目结构¶
project/
├── config/
│ ├── __init__.py
│ └── exchanges.py # 交易所配置
├── strategies/
│ ├── __init__.py
│ ├── base.py # 策略基类
│ └── grid.py # 网格策略
├── utils/
│ ├── __init__.py
│ ├── error_handler.py # 错误处理
│ └── logger.py # 日志工具
└── main.py # 主程序
策略基类¶
class BaseStrategy:
"""策略基类"""
def __init__(self, api, exchange, symbol):
self.api = api
self.exchange = exchange
self.symbol = symbol
self.event_bus = api.get_event_bus()
self.running = False
def start(self):
"""启动策略"""
self.running = True
self.setup()
def stop(self):
"""停止策略"""
self.running = False
self.teardown()
def setup(self):
"""策略初始化(子类实现)"""
pass
def teardown(self):
"""策略清理(子类实现)"""
pass
def on_tick(self, ticker):
"""行情回调(子类实现)"""
pass
def on_order(self, order):
"""订单回调(子类实现)"""
pass