跳转至

交易所注册表

ExchangeRegistry 是 bt_api_py 实现"即插即用"架构的核心,基于 Registry 设计模式管理所有交易所的 Feed 类、WebSocket 流类和余额处理函数。

工作原理

新交易所注册 → ExchangeRegistry
                   ├── register_feed()        # 注册 REST Feed 类
                   ├── register_stream()      # 注册 WebSocket 流类
                   └── register_balance_handler()  # 注册余额解析函数

BtApi 使用 → ExchangeRegistry
                   ├── create_feed()          # 自动创建 Feed 实例
                   └── get_stream_class()     # 获取流类

添加新交易所

bt_api_py/exchange_registers/ 目录下创建新文件即可,无需修改核心代码:

# bt_api_py/exchange_registers/my_exchange.py
from bt_api_py.registry import ExchangeRegistry
from .my_exchange_feed import MyExchangeFeed
from .my_exchange_stream import MyExchangeStream

ExchangeRegistry.register_feed("MYEXCHANGE___SPOT", MyExchangeFeed)
ExchangeRegistry.register_stream("MYEXCHANGE___SPOT", "subscribe", MyExchangeStream)

查询已注册交易所

from bt_api_py import BtApi

# 列出所有已注册的交易所
exchanges = BtApi.list_available_exchanges()
print(exchanges)

bt_api_py.registry.ExchangeRegistry

ExchangeRegistry()

交易所注册表,管理 feed 类、流式数据类、交易所配置类的注册与创建

全局使用(向后兼容): ExchangeRegistry.register_feed("BINANCE___SPOT", BinanceSpotFeed) feed = ExchangeRegistry.create_feed("BINANCE___SPOT", queue)

测试隔离

registry = ExchangeRegistry() registry.register_feed("TEST___SPOT", MockFeed)

Initialize registry instance (singleton pattern handled by new).