性能优化指南¶
bt_api_py 的性能优化建议和最佳实践。
网络优化¶
使用 WebSocket 代替轮询¶
-
问题: 轮询会增加网络延迟和服务器负载
-
解决: 使用 WebSocket 订阅实时数据
# ❌ 轮询方式(延迟高,资源消耗大)
while True:
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
ticker.init_data()
print(ticker.get_last_price())
time.sleep(1)
# ✅ WebSocket 方式(实时,资源消耗低)
def on_ticker(ticker):
ticker.init_data()
print(ticker.get_last_price())
api.event_bus.subscribe("ticker", on_ticker)
api.subscribe_ticker("BINANCE___SPOT", "BTCUSDT")
api.run()
性能对比: | 方式 | 延迟 | CPU | 网络 | |------|------|-----|------| | 轮询 (1 秒) | 0-1000ms | 高 | 高 | | WebSocket | 0-50ms | 低 | 低 |
连接复用¶
# ❌ 每次请求创建新连接
def get_price():
api = BtApi(exchange_kwargs={...}) # 每次创建新连接
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
return ticker
# ✅ 复用连接
api = BtApi(exchange_kwargs={...}) # 创建一次
def get_price():
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
return ticker
批量请求合并¶
# ❌ 多次单独请求
for symbol in symbols:
ticker = api.get_tick("BINANCE___SPOT", symbol)
# ✅ 使用批量方法
ticks = api.get_all_ticks("BTCUSDT") # 一次获取所有交易所价格
数据缓存¶
内存缓存¶
from functools import lru_cache
import time
class CachedApi:
"""带缓存的 API"""
def __init__(self, api, cache_ttl=5):
self.api = api
self.cache_ttl = cache_ttl
self._cache = {}
def get_tick(self, exchange, symbol):
"""获取行情(带缓存)"""
key = f"{exchange}:{symbol}"
now = time.time()
# 检查缓存
if key in self._cache:
data, timestamp = self._cache[key]
if now - timestamp < self.cache_ttl:
return data
# 从 API 获取
ticker = self.api.get_tick(exchange, symbol)
ticker.init_data()
self._cache[key] = (ticker, now)
return ticker
# 性能提升:避免频繁 API 调用,减少 90% 重复请求
Redis 缓存¶
import redis
import pickle
class RedisCachedApi:
"""Redis 缓存的 API(多进程共享)"""
def __init__(self, api, redis_url="redis://localhost:6379"):
self.api = api
self.redis = redis.from_url(redis_url)
self.cache_ttl = 5 # 秒
def get_tick(self, exchange, symbol):
"""获取行情(Redis 缓存)"""
key = f"ticker:{exchange}:{symbol}"
# 尝试从缓存获取
cached = self.redis.get(key)
if cached:
return pickle.loads(cached)
# 从 API 获取
ticker = self.api.get_tick(exchange, symbol)
ticker.init_data()
# 存入缓存
self.redis.setex(
key,
self.cache_ttl,
pickle.dumps(ticker)
)
return ticker
并发处理¶
多线程并发¶
import concurrent.futures
import time
def get_price(exchange, symbol):
"""获取单个价格"""
ticker = api.get_tick(exchange, symbol)
ticker.init_data()
return exchange, ticker.get_last_price()
def get_all_prices_concurrent(exchanges, symbol):
"""并发获取所有交易所价格"""
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(get_price, exchange, symbol)
for exchange in exchanges
]
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return dict(results)
# 性能提升:10 个交易所 5 秒获取完,串行需要 50 秒
异步处理¶
import asyncio
import time
async def async_get_price(api, exchange, symbol):
"""异步获取价格"""
loop = asyncio.get_running_loop()
ticker = await loop.run_in_executor(
None,
lambda: api.get_tick(exchange, symbol)
)
ticker.init_data()
return exchange, ticker.get_last_price()
async def get_all_prices_async(exchanges, symbol):
"""异步获取所有价格"""
tasks = [
async_get_price(api, exchange, symbol)
for exchange in exchanges
]
results = await asyncio.gather(*tasks)
return dict(results)
# 使用
asyncio.run(get_all_prices_async(["BINANCE___SPOT", "OKX___SPOT"], "BTCUSDT"))
批量数据处理¶
def process_klines_batch(klines):
"""批量处理 K 线数据"""
# 预分配列表大小
closes = [0] *len(klines)
volumes = [0]* len(klines)
for i, bar in enumerate(klines):
bar.init_data()
closes[i] = bar.get_close_price()
volumes[i] = bar.get_volume()
# 使用 numpy 进行高性能计算
import numpy as np
np_closes = np.array(closes)
ma = np.convolve(np_closes, np.ones(20)/20, mode='valid')
return ma
# 性能提升:批量处理比逐条处理快 3-5 倍
内存管理¶
及时清理数据¶
# ❌ 不推荐:累积大量数据
all_klines = []
while True:
klines = api.get_kline("BINANCE___SPOT", "BTCUSDT", "1m", count=100)
all_klines.extend(klines)
# 内存持续增长
# ✅ 推荐:只保留需要的数据
class CircularBuffer:
"""环形缓冲区"""
def __init__(self, size):
self.size = size
self.buffer = []
def add(self, items):
self.buffer.extend(items)
if len(self.buffer) > self.size:
self.buffer = self.buffer[-self.size:]
def get(self):
return self.buffer
klines_buffer = CircularBuffer(1000)
使用生成器¶
# ❌ 不推荐:返回大列表
def get_all_klines():
klines = []
for i in range(10000):
kline = api.get_kline(...)
klines.append(kline)
return klines # 占用大量内存
# ✅ 推荐:使用生成器
def get_klines_generator(count):
"""K 线生成器"""
for i in range(count):
klines = api.get_kline("BINANCE___SPOT", "BTCUSDT", "1m", count=100)
for bar in klines:
bar.init_data()
yield bar
# 逐条处理,不占用大量内存
for bar in get_klines_generator(100):
process(bar)
数据分片处理¶
def process_large_dataset(data, chunk_size=1000):
"""分片处理大数据集"""
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
# 处理这一批数据
results = process_chunk(chunk)
# 释放内存
del chunk
yield results
# 使用
for result in process_large_dataset(large_dataset):
save_to_db(result)
数据库优化¶
批量插入¶
# ❌ 不推荐:逐条插入
for bar in bars:
cursor.execute("INSERT INTO klines VALUES (?,?,?,?)", (...))
# ✅ 推荐:批量插入
def batch_insert(cursor, bars, batch_size=1000):
"""批量插入"""
for i in range(0, len(bars), batch_size):
chunk = bars[i:i+batch_size]
cursor.executemany(
"INSERT INTO klines VALUES (?,?,?,?)",
[
(bar.get_open_time(), bar.get_close_price(), ...)
for bar in chunk
]
)
conn.commit()
# 性能提升:批量插入比逐条插入快 10-100 倍
索引优化¶
- - 为常用查询字段添加索引
CREATE INDEX idx_symbol_time ON klines(symbol, open_time);
CREATE INDEX idx_price ON ticks(price);
- - 定期清理过期数据
DELETE FROM klines WHERE open_time < NOW() - INTERVAL '30 DAY';
连接池¶
from sqlalchemy.pool import QueuePool
# 创建连接池
pool = QueuePool(
create_function,
max_overflow_size=10,
pool_size=5
)
# 使用连接池
with pool.connect() as conn:
process_data(conn)
监控工具¶
性能分析器¶
import cProfile
import pstats
from io import StringIO
def profile_api_call(func):
"""API 调用性能分析"""
pr = cProfile.Profile()
pr.enable()
result = func()
pr.disable()
# 输出分析结果
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10) # 打印前 10 个最耗时的函数
print(s.getvalue())
return result
# 使用
@profile_api_call
def test_function():
ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
return ticker
性能监控装饰器¶
import time
from functools import wraps
def monitor_performance(func):
"""性能监控装饰器"""
@wraps(func)
def wrapper(**args, **kwargs):
start_time = time.time()
start_memory = get_memory_usage()
try:
result = func(**args, **kwargs)
return result
finally:
elapsed = time.time() - start_time
end_memory = get_memory_usage()
memory_used = end_memory - start_memory
print(f"{func.__name__}:")
print(f" 耗时: {elapsed:.3f}秒")
print(f" 内存: {memory_used:.2f}MB")
return wrapper
def get_memory_usage():
"""获取当前内存使用量"""
import psutil
return psutil.Process().memory_info().rss / 1024 / 1024
实时性能监控¶
class PerformanceMonitor:
"""实时性能监控"""
def __init__(self, api):
self.api = api
self.metrics = {
"api_calls": 0,
"errors": 0,
"latencies": [],
}
def track_call(self, func, **args, **kwargs):
"""跟踪 API 调用"""
start_time = time.time()
try:
result = func(**args, **kwargs)
self.metrics["api_calls"] += 1
return result
except Exception:
self.metrics["errors"] += 1
raise
finally:
latency = time.time() - start_time
self.metrics["latencies"].append(latency)
def get_stats(self):
"""获取统计信息"""
latencies = self.metrics["latencies"]
return {
"total_calls": self.metrics["api_calls"],
"errors": self.metrics["errors"],
"avg_latency": sum(latencies) / len(latencies) if latencies else 0,
"max_latency": max(latencies) if latencies else 0,
"p99_latency": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
# 使用
monitor = PerformanceMonitor(api)
ticker = monitor.track_call(api.get_tick, "BINANCE___SPOT", "BTCUSDT")
print(monitor.get_stats())
性能优化清单¶
网络层优化¶
- 使用 WebSocket 代替轮询
- 启用连接复用
- 批量请求合并
- 配置合理的超时时间
应用层优化¶
- 使用缓存减少 API 调用
- 并发处理独立请求
- 异步处理耗时操作
- 使用数据结构优化算法
内存优化¶
- 及时清理不需要的数据
- 使用生成器处理大数据集
- 避免内存泄漏
- 监控内存使用情况
数据库优化¶
- 批量插入代替逐条插入
- 为常用字段添加索引
- 定期清理过期数据
- 使用连接池
监控优化¶
- 添加性能监控
- 记录关键指标
- 设置告警阈值
- 定期分析性能瓶颈
性能基准¶
操作性能参考¶
| 操作 | 预期耗时 | 优化后 |
|---|---|---|
| 获取单个 ticker | 50-200ms | 30-100ms |
| 获取深度 (20 档) | 100-300ms | 50-150ms |
| 获取 K 线 (100 根) | 200-500ms | 100-300ms |
| 下单 | 100-400ms | 50-200ms |
| 撤单 | 50-200ms | 30-100ms |
| 查询订单 | 50-200ms | 30-100ms |
并发性能参考¶
| 并发数 | 吞吐量 (QPS) | 延迟 (P99) |
|---|---|---|
| 1 | 50 | 100ms |
| 5 | 200 | 150ms |
| 10 | 350 | 200ms |
| 20 | 500 | 300ms |