Exchange Integration Patterns and Specifications¶
Overview¶
This document outlines the standardized patterns and specifications used for exchange integrations in bt_api_py. It is based on the analysis of four implemented exchanges: Binance, OKX, Interactive Brokers (IB), and CTP.
Architecture Overview¶
The exchange integration follows a modular, registry-based architecture:
bt_api_py/
├── containers/ # Data containers for exchange responses
│ ├── exchanges/ # Exchange configuration classes
│ ├── tickers/ # Ticker data containers
│ ├── orderbooks/ # Order book data containers
│ ├── orders/ # Order data containers
│ ├── balances/ # Balance data containers
│ └── ...
├── feeds/ # Feed classes for REST/WebSocket APIs
│ ├── live_<exchange>/ # Exchange-specific feed implementations
│ │ ├── spot.py # Spot trading feed
│ │ ├── swap.py # Futures/perpetual feed
│ │ ├── request_base.py # Base request class
│ │ └── ...
│ └── register_<exchange>.py # Registration module
├── configs/ # YAML configuration files
│ └── <exchange>.yaml # Exchange-specific configuration
├── registry.py # Global exchange registry
└── config_loader.py # YAML configuration loader
1. Naming Conventions¶
Exchange Name Format¶
- ***Registry Key**:
<EXCHANGE>___<ASSET_TYPE>(e.g.,BINANCE___SPOT,OKX___SWAP) - ***Directory**:
live_<exchange_lowercase>(e.g.,live_binance,live_okx) - ***Class Names**:
<Exchange><Request/Wss><DataType>(e.g.,BinanceRequestTickerData,OkxWssOrderData) - ***Config File**:
<exchange>.yaml(e.g.,binance.yaml,okx.yaml)
Asset Types¶
Common asset types: - spot - Spot trading - swap - USDT-M perpetual futures - futures - Delivery futures - coin_m - Coin-margined futures - option - Options trading - margin - Margin trading
2. Exchange Configuration Class Pattern¶
Location¶
bt_api_py/containers/exchanges/<exchange>_exchange_data.py
Structure¶
import os
from bt_api_py.containers.exchanges.exchange_data import ExchangeData
from bt_api_py.functions.log_message import SpdLogManager
logger = SpdLogManager(
file_name="<exchange>_exchange_data.log",
logger_name="<exchange>_data",
print_info=False
).create_logger()
# Module-level config cache (for lazy loading)
_<exchange>_config = None
_<exchange>_config_loaded = False
def _get_<exchange>_config():
"""Load and cache YAML configuration."""
global _<exchange>_config, _<exchange>_config_loaded
if _<exchange>_config_loaded:
return _<exchange>_config
_<exchange>_config_loaded = True
try:
from bt_api_py.config_loader import load_exchange_config
config_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"configs",
"<exchange>.yaml",
)
if os.path.exists(config_path):
_<exchange>_config = load_exchange_config(config_path)
except Exception as e:
logger.warn(f"Failed to load <exchange>.yaml config: {e}")
return _<exchange>_config
class <Exchange>ExchangeData(ExchangeData):
"""Base class for <Exchange> exchange."""
def __init__(self):
super().__init__()
self.exchange_name = "<exchange>"
self.rest_url = "https://api.example.com"
self.wss_url = "wss://api.example.com/ws"
self.acct_wss_url = "" # Account WebSocket URL
self.kline_periods = {
"1m": "1m",
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1h": "1h",
"4h": "4h",
"1d": "1d",
}
self.reverse_kline_periods = {v: k for k, v in self.kline_periods.items()}
self.legal_currency = ["USDT", "USD", "BTC", "ETH"]
self.rest_paths = {}
self.wss_paths = {}
def _load_from_config(self, asset_type):
"""Load configuration from YAML.
Args:
asset_type: Asset type key (e.g., 'spot', 'swap')
Returns:
bool: Success status
"""
config = _get_<exchange>_config()
if config is None:
return False
asset_cfg = config.asset_types.get(asset_type)
if asset_cfg is None:
return False
# Load exchange_name
if asset_cfg.exchange_name:
self.exchange_name = asset_cfg.exchange_name
# Load URLs (handle both dict and string formats)
if config.base_urls:
if isinstance(config.base_urls.rest, dict):
self.rest_url = config.base_urls.rest.get(
asset_type,
config.base_urls.rest.get('default', self.rest_url)
)
else:
self.rest_url = config.base_urls.rest
if isinstance(config.base_urls.wss, dict):
self.wss_url = config.base_urls.wss.get(
asset_type,
config.base_urls.wss.get('default', self.wss_url)
)
else:
self.wss_url = config.base_urls.wss
# Load rest_paths
if asset_cfg.rest_paths:
self.rest_paths = dict(asset_cfg.rest_paths)
# Load wss_paths (convert templates to proper format)
if asset_cfg.wss_paths:
converted = {}
for key, value in asset_cfg.wss_paths.items():
if isinstance(value, str):
if value:
converted[key] = {"params": [value], "method": "SUBSCRIBE", "id": 1}
else:
converted[key] = ""
else:
converted[key] = value
self.wss_paths = converted
# Load kline_periods (asset-level priority)
kp = asset_cfg.kline_periods or (config.kline_periods if config.kline_periods else None)
if kp:
self.kline_periods = dict(kp)
self.reverse_kline_periods = {v: k for k, v in self.kline_periods.items()}
# Load legal_currency (asset-level priority)
lc = asset_cfg.legal_currency or (config.legal_currency if config.legal_currency else None)
if lc:
self.legal_currency = list(lc)
return True
def get_symbol(self, symbol):
"""Convert standard symbol format to exchange format.
Override in subclasses for exchange-specific formatting.
"""
return symbol
def get_period(self, key):
"""Get exchange-specific kline period format."""
return self.kline_periods.get(key, key)
def get_rest_path(self, key):
"""Get REST API path for given endpoint.
Args:
key: Endpoint key (e.g., 'get_tick', 'make_order')
Returns:
str: API path (e.g., 'GET /api/v1/ticker')
"""
if key not in self.rest_paths or self.rest_paths[key] == "":
self.raise_path_error(self.exchange_name, key)
return self.rest_paths[key]
def get_wss_path(self, **kwargs):
"""Get WebSocket subscription path.
Args:
**kwargs: Including 'topic', 'symbol', 'period', etc.
Returns:
str: JSON-formatted subscription message
"""
key = kwargs["topic"]
if key not in self.wss_paths or self.wss_paths[key] == "":
self.raise_path_error(self.exchange_name, key)
# Substitute template parameters
req = self.wss_paths[key].copy()
# Implementation varies by exchange...
return json.dumps(req)
def account_wss_symbol(self, symbol):
"""Convert symbol for account WebSocket."""
return symbol
class <Exchange>ExchangeDataSpot(<Exchange>ExchangeData):
"""Spot trading configuration."""
def __init__(self):
super().__init__()
self.asset_type = "spot"
self._load_from_config("spot")
# Override methods if needed for spot-specific behavior
def get_symbol(self, symbol):
return symbol.replace("-", "").upper()
class <Exchange>ExchangeDataSwap(<Exchange>ExchangeData):
"""Perpetual futures configuration."""
def __init__(self):
super().__init__()
self.asset_type = "swap"
self._load_from_config("swap")
Key Points¶
- ***Lazy Config Loading**: Use module-level cache and only load when needed
- ***Asset Type Priority**: Asset-level config overrides exchange-level config
- ***Flexible URL Handling**: Support both dict and string formats for base_urls
- ***Template Substitution**: wss_paths use
<symbol>,<period>templates - ***Override Hooks**: Subclasses override
get_symbol(),get_wss_path()etc.
3. YAML Configuration Pattern¶
Location¶
bt_api_py/configs/<exchange>.yaml
Structure¶
# <Exchange> Exchange Configuration
id: <exchange>
display_name: <Exchange Name>
venue_type: cex # cex, dex, broker
website: https://example.com
api_doc: https://docs.example.com
base_urls:
rest:
spot: https://api.example.com
swap: https://fapi.example.com
default: https://api.example.com
wss:
spot: wss://api.example.com/ws
swap: wss://fapi.example.com/ws
public: wss://api.example.com/ws/public
private: wss://api.example.com/ws/private
default: wss://api.example.com/ws
acct_wss:
spot: wss://api.example.com/ws/account
swap: wss://fapi.example.com/ws/account
default: wss://api.example.com/ws/account
connection:
type: http # http, websocket, spi, tws
timeout: 10
max_retries: 3
authentication:
type: hmac_sha256 # none, api_key, hmac_sha256, hmac_sha512
header_name: X-API-KEY
timestamp_key: timestamp
signature_key: signature
# Exchange-level kline periods (shared default)
kline_periods:
1m: "1m"
5m: "5m"
15m: "15m"
30m: "30m"
1h: "1h"
4h: "4h"
1d: "1d"
# Exchange-level legal currencies
legal_currency: [USDT, USD, BTC, ETH]
# Rate limit rules
rate_limits:
- name: request_weight
type: sliding_window # sliding_window, fixed_window, token_bucket
interval: 60
limit: 2400
scope: global # global, endpoint, ip
weight: 1
weight_map:
"/api/v1/depth": 5
# Order status mapping (optional)
status_dict:
live: new
partially_filled: partially_filled
filled: filled
canceled: canceled
# Asset Types
asset_types:
spot:
exchange_name: <EXCHANGE>_SPOT
symbol_format: "{base}{quote}"
legal_currency: [USDT, USD, BTC, ETH]
rest_paths:
ping: "GET /api/v1/ping"
get_server_time: "GET /api/v1/time"
get_tick: "GET /api/v1/ticker"
get_depth: "GET /api/v1/depth"
get_kline: "GET /api/v1/klines"
make_order: "POST /api/v1/order"
cancel_order: "DELETE /api/v1/order"
query_order: "GET /api/v1/order"
get_open_orders: "GET /api/v1/openOrders"
get_balance: "GET /api/v1/account"
get_account: "GET /api/v1/account"
wss_paths:
tick: "<symbol>@ticker"
depth: "<symbol>@depth20"
kline: "<symbol>@kline_<period>"
orders: ""
account: ""
swap:
exchange_name: <EXCHANGE>_SWAP
symbol_format: "{base}{quote}"
rest_paths:
# ... same structure as spot
wss_paths:
# ... WebSocket subscription templates
Important Fields¶
| Field | Required | Description |
|---|---|---|
id | Yes | Unique exchange identifier |
venue_type | Yes | cex, dex, or broker |
base_urls | Yes (CEX) | REST and WebSocket URLs |
connection.type | Yes | Connection method |
rate_limits[].type | Yes | Must be sliding_window, fixed_window, or token_bucket |
4. Feed Class Pattern¶
Location¶
bt_api_py/feeds/live_<exchange>/request_base.py (base class) bt_api_py/feeds/live_<exchange>/spot.py (spot implementation)
Base Request Class Structure¶
from bt_api_py.feeds.feed import Feed
from bt_api_py.feeds.capability import Capability
from bt_api_py.functions.log_message import SpdLogManager
from bt_api_py.rate_limiter import RateLimiter, RateLimitRule, RateLimitScope, RateLimitType
class <Exchange>RequestData(Feed):
"""Base REST API request handler for <Exchange>."""
@classmethod
def _capabilities(cls):
"""Declare supported capabilities."""
return {
Capability.GET_TICK,
Capability.GET_DEPTH,
Capability.GET_KLINE,
Capability.MAKE_ORDER,
Capability.CANCEL_ORDER,
Capability.QUERY_ORDER,
Capability.GET_BALANCE,
Capability.GET_ACCOUNT,
Capability.GET_EXCHANGE_INFO,
Capability.GET_SERVER_TIME,
}
def __init__(self, data_queue, **kwargs):
super().__init__(data_queue, **kwargs)
self.data_queue = data_queue
self.public_key = kwargs.get("public_key")
self.private_key = kwargs.get("private_key")
self.asset_type = kwargs.get("asset_type", "SPOT")
self.logger_name = kwargs.get("logger_name", "<exchange>_feed.log")
# Exchange configuration
self._params = kwargs.get("exchange_data", <Exchange>ExchangeDataSpot())
# Loggers
self.request_logger = SpdLogManager(
"./logs/" + self.logger_name, "request", 0, 0, False
).create_logger()
# Rate limiter
self._rate_limiter = kwargs.get("rate_limiter", self._create_default_rate_limiter())
@staticmethod
def _create_default_rate_limiter():
"""Create default rate limiter from config."""
rules = [
RateLimitRule(
name="<exchange>_request_weight",
limit=2400,
interval=60,
type=RateLimitType.SLIDING_WINDOW,
scope=RateLimitScope.GLOBAL,
),
]
return RateLimiter(rules)
# ============== Private API Methods (underscore prefix) ==============
# These methods return (path, params, extra_data) tuples
def _get_tick(self, symbol, extra_data=None, **kwargs):
"""Get ticker data endpoint.
Returns:
tuple: (path, params, extra_data)
"""
request_symbol = self._params.get_symbol(symbol)
request_type = "get_tick"
path = self._params.get_rest_path(request_type)
params = {"symbol": request_symbol}
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": symbol,
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._get_tick_normalize_function,
},
)
return path, params, extra_data
@staticmethod
def _get_tick_normalize_function(input_data, extra_data):
"""Normalize ticker response.
Returns:
tuple: (data_list, status)
"""
symbol_name = extra_data["symbol_name"]
asset_type = extra_data["asset_type"]
if isinstance(input_data, list):
data = [<Exchange>RequestTickerData(i, symbol_name, asset_type, True)
for i in input_data]
elif isinstance(input_data, dict):
data = [<Exchange>RequestTickerData(input_data, symbol_name, asset_type, True)]
else:
data = []
return data, input_data is not None
def _get_depth(self, symbol, count=20, extra_data=None, **kwargs):
"""Get order book endpoint.
Returns:
tuple: (path, params, extra_data)
"""
request_symbol = self._params.get_symbol(symbol)
request_type = "get_depth"
path = self._params.get_rest_path(request_type)
params = {"symbol": request_symbol, "limit": count}
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": symbol,
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._get_depth_normalize_function,
},
)
return path, params, extra_data
@staticmethod
def _get_depth_normalize_function(input_data, extra_data):
"""Normalize order book response."""
symbol_name = extra_data["symbol_name"]
asset_type = extra_data["asset_type"]
data = [<Exchange>RequestOrderBookData(input_data, symbol_name, asset_type, True)]
return data, input_data is not None
def _get_kline(self, symbol, period="1h", count=100, extra_data=None, **kwargs):
"""Get kline data endpoint.
Returns:
tuple: (path, params, extra_data)
"""
request_symbol = self._params.get_symbol(symbol)
request_period = self._params.get_period(period)
request_type = "get_kline"
path = self._params.get_rest_path(request_type)
params = {
"symbol": request_symbol,
"interval": request_period,
"limit": count,
}
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": symbol,
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._get_kline_normalize_function,
},
)
return path, params, extra_data
@staticmethod
def _get_kline_normalize_function(input_data, extra_data):
"""Normalize kline response."""
# Implementation...
pass
def _make_order(self, symbol, vol, price=None, order_type="buy-limit",
offset="open", extra_data=None, **kwargs):
"""Create order endpoint.
Args:
symbol: Trading pair symbol
vol: Order volume
price: Order price (None for market orders)
order_type: Order type (buy-limit, sell-limit, buy-market, etc.)
offset: Order side (open, close)
Returns:
tuple: (path, params, extra_data)
"""
request_symbol = self._params.get_symbol(symbol)
request_type = "make_order"
path = self._params.get_rest_path(request_type)
# Parse order_type
side, order_type = order_type.split("-")
params = {
"symbol": request_symbol,
"side": side.upper(),
"type": order_type.upper(),
"quantity": vol,
}
if price and order_type == "limit":
params["price"] = price
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": symbol,
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._make_order_normalize_function,
},
)
return path, params, extra_data
@staticmethod
def _make_order_normalize_function(input_data, extra_data):
"""Normalize order response."""
symbol_name = extra_data["symbol_name"]
asset_type = extra_data["asset_type"]
if isinstance(input_data, list):
data = [<Exchange>RequestOrderData(i, symbol_name, asset_type, True)
for i in input_data]
else:
data = [<Exchange>RequestOrderData(input_data, symbol_name, asset_type, True)]
return data, input_data is not None
def _cancel_order(self, symbol, order_id=None, client_order_id=None,
extra_data=None, **kwargs):
"""Cancel order endpoint.
Returns:
tuple: (path, params, extra_data)
"""
request_symbol = self._params.get_symbol(symbol)
request_type = "cancel_order"
path = self._params.get_rest_path(request_type)
params = {"symbol": request_symbol}
if order_id:
params["orderId"] = order_id
if client_order_id:
params["clientOrderId"] = client_order_id
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": symbol,
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._cancel_order_normalize_function,
},
)
return path, params, extra_data
def _get_balance(self, extra_data=None, **kwargs):
"""Get account balance endpoint.
Returns:
tuple: (path, params, extra_data)
"""
request_type = "get_balance"
path = self._params.get_rest_path(request_type)
params = {}
extra_data = update_extra_data(
extra_data,
**{
"request_type": request_type,
"symbol_name": "ALL",
"asset_type": self.asset_type,
"exchange_name": self.exchange_name,
"normalize_function": <Exchange>RequestData._get_balance_normalize_function,
},
)
return path, params, extra_data
# ============== Public API Methods ==============
def get_tick(self, symbol):
"""Get ticker data.
Returns:
RequestData: Ticker data container
"""
path, params, extra_data = self._get_tick(symbol)
return self.request(path, params=params, extra_data=extra_data)
def get_depth(self, symbol, count=20):
"""Get order book.
Returns:
RequestData: Order book data container
"""
path, params, extra_data = self._get_depth(symbol, count)
return self.request(path, params=params, extra_data=extra_data)
def get_kline(self, symbol, period="1h", count=100):
"""Get kline data.
Returns:
RequestData: Kline data container
"""
path, params, extra_data = self._get_kline(symbol, period, count)
return self.request(path, params=params, extra_data=extra_data)
def make_order(self, symbol, vol, price=None, order_type="buy-limit",
offset="open", client_order_id=None):
"""Place an order.
Returns:
RequestData: Order data container
"""
path, params, extra_data = self._make_order(
symbol, vol, price, order_type, offset
)
if client_order_id:
params["clientOrderId"] = client_order_id
return self.request(path, params=params, extra_data=extra_data, is_sign=True)
def cancel_order(self, symbol, order_id=None, client_order_id=None):
"""Cancel an order.
Returns:
RequestData: Order data container
"""
path, params, extra_data = self._cancel_order(
symbol, order_id, client_order_id
)
return self.request(path, params=params, extra_data=extra_data, is_sign=True)
Key Points¶
- ***Private Methods**: Use
_prefix for internal methods that return(path, params, extra_data) - ***Public Methods**: Call private methods and invoke
self.request() - ***Normalization**: Each endpoint has a normalize function to convert response to data containers
- ***Rate Limiting**: Use
_rate_limiterdecorator or check before requests - ***Authentication**: Set
is_sign=Truefor endpoints requiring signatures
5. Data Container Pattern¶
Base Container Classes¶
Location: bt_api_py/containers/<datatype>/<datatype>.py
# Ticker base
from bt_api_py.containers.tickers.ticker import TickerData
# OrderBook base
from bt_api_py.containers.orderbooks.orderbook import OrderBookData
# Order base
from bt_api_py.containers.orders.order import OrderData
# Balance base
from bt_api_py.containers.balances.balance import BalanceData
Ticker Container Example¶
import json
import time
from bt_api_py.containers.tickers.ticker import TickerData
from bt_api_py.functions.utils import from_dict_get_float, from_dict_get_string
class <Exchange>RequestTickerData(TickerData):
"""Ticker data from REST API."""
def __init__(self, ticker_info, symbol_name, asset_type, has_been_json_encoded=False):
super().__init__(ticker_info, has_been_json_encoded)
self.exchange_name = "<EXCHANGE>"
self.local_update_time = time.time()
self.symbol_name = symbol_name
self.asset_type = asset_type
self.ticker_data = ticker_info if has_been_json_encoded else None
self.ticker_symbol_name = None
self.server_time = None
self.bid_price = None
self.ask_price = None
self.bid_volume = None
self.ask_volume = None
self.last_price = None
self.last_volume = None
self.high_24h = None
self.low_24h = None
self.volume_24h = None
self.all_data = None
self.has_been_init_data = False
def init_data(self):
"""Parse ticker data from exchange response."""
if not self.has_been_json_encoded:
self.ticker_data = json.loads(self.ticker_info)
self.has_been_json_encoded = True
if self.has_been_init_data:
return self
# Parse fields according to exchange API format
self.ticker_symbol_name = from_dict_get_string(self.ticker_data, "symbol")
self.server_time = from_dict_get_float(self.ticker_data, "time")
self.bid_price = from_dict_get_float(self.ticker_data, "bidPrice")
self.ask_price = from_dict_get_float(self.ticker_data, "askPrice")
self.last_price = from_dict_get_float(self.ticker_data, "lastPrice")
self.high_24h = from_dict_get_float(self.ticker_data, "high24h")
self.low_24h = from_dict_get_float(self.ticker_data, "low24h")
self.has_been_init_data = True
return self
def get_all_data(self):
"""Get all data as dictionary."""
if self.all_data is None:
self.init_data()
self.all_data = {
"exchange_name": self.exchange_name,
"symbol_name": self.symbol_name,
"asset_type": self.asset_type,
"local_update_time": self.local_update_time,
"ticker_symbol_name": self.ticker_symbol_name,
"server_time": self.server_time,
"bid_price": self.bid_price,
"ask_price": self.ask_price,
"last_price": self.last_price,
"high_24h": self.high_24h,
"low_24h": self.low_24h,
}
return self.all_data
def __str__(self):
self.init_data()
return json.dumps(self.get_all_data())
# Standard getter methods
def get_exchange_name(self):
return self.exchange_name
def get_symbol_name(self):
return self.symbol_name
def get_last_price(self):
self.init_data()
return self.last_price
def get_bid_price(self):
self.init_data()
return self.bid_price
def get_ask_price(self):
self.init_data()
return self.ask_price
def get_high_24h(self):
self.init_data()
return self.high_24h
def get_low_24h(self):
self.init_data()
return self.low_24h
Constructor Parameters¶
All data containers follow this signature:
def __init__(self, data, symbol_name, asset_type, has_been_json_encoded=False):
"""
Args:
data: Raw response data (dict or JSON string)
symbol_name: Standard symbol format (e.g., "BTC-USDT")
asset_type: Asset type (e.g., "SPOT", "SWAP")
has_been_json_encoded: True if data is already a dict
"""
6. Registration Pattern¶
Location¶
bt_api_py/feeds/register_<exchange>.py
Structure¶
"""
<Exchange> Registration Module
Registers <Exchange> feeds to the global ExchangeRegistry.
Import this module to complete the registration.
"""
from bt_api_py.balance_utils import simple_balance_handler as _<exchange>_balance_handler
from bt_api_py.containers.exchanges.<exchange>_exchange_data import (
<Exchange>ExchangeDataSpot,
<Exchange>ExchangeDataSwap,
)
from bt_api_py.feeds.live_<exchange>.spot import <Exchange>RequestDataSpot
from bt_api_py.feeds.live_<exchange>.swap import <Exchange>RequestDataSwap
from bt_api_py.registry import ExchangeRegistry
def _<exchange>_subscribe_handler(data_queue, exchange_params, topics, bt_api):
"""<Exchange> subscription handler for WebSocket streams.
Args:
data_queue: queue.Queue
exchange_params: dict with connection parameters
topics: list of topic dicts
bt_api: BtApi instance for shared state
"""
exchange_data = <Exchange>ExchangeDataSpot()
kwargs = dict(exchange_params.items())
kwargs["wss_name"] = "<exchange>_market_data"
kwargs["wss_url"] = exchange_data.wss_url
kwargs["exchange_data"] = exchange_data
kwargs["topics"] = topics
# Start market WebSocket
<Exchange>MarketWssDataSpot(data_queue, **kwargs).start()
# Start account WebSocket (if not already started)
if not bt_api._subscription_flags.get("<EXCHANGE>___SPOT_account", False):
account_kwargs = dict(kwargs.items())
account_kwargs["topics"] = [
{"topic": "account"},
{"topic": "order"},
]
<Exchange>AccountWssDataSpot(data_queue, **account_kwargs).start()
bt_api._subscription_flags["<EXCHANGE>___SPOT_account"] = True
def register_<exchange>():
"""Register <Exchange> to global ExchangeRegistry."""
# Spot
ExchangeRegistry.register_feed("<EXCHANGE>___SPOT", <Exchange>RequestDataSpot)
ExchangeRegistry.register_exchange_data("<EXCHANGE>___SPOT", <Exchange>ExchangeDataSpot)
ExchangeRegistry.register_balance_handler("<EXCHANGE>___SPOT", _<exchange>_balance_handler)
ExchangeRegistry.register_stream("<EXCHANGE>___SPOT", "subscribe", _<exchange>_subscribe_handler)
# Swap (if applicable)
ExchangeRegistry.register_feed("<EXCHANGE>___SWAP", <Exchange>RequestDataSwap)
ExchangeRegistry.register_exchange_data("<EXCHANGE>___SWAP", <Exchange>ExchangeDataSwap)
ExchangeRegistry.register_balance_handler("<EXCHANGE>___SWAP", _<exchange>_balance_handler)
ExchangeRegistry.register_stream("<EXCHANGE>___SWAP", "subscribe", _<exchange>_subscribe_handler)
# Auto-register on module import
register_<exchange>()
7. Testing Pattern¶
Location¶
tests/feeds/test_<exchange>.py
Structure¶
"""
Test <Exchange> exchange integration.
Run tests:
pytest tests/feeds/test_<exchange>.py -v
Run with coverage:
pytest tests/feeds/test_<exchange>.py --cov=bt_api_py.feeds.live_<exchange> --cov-report=term-missing
"""
import queue
import pytest
# Import registration to auto-register <Exchange>
import bt_api_py.feeds.register_<exchange> # noqa: F401
from bt_api_py.containers.exchanges.<exchange>_exchange_data import (
<Exchange>ExchangeDataSpot,
)
from bt_api_py.feeds.live_<exchange>.spot import <Exchange>RequestDataSpot
from bt_api_py.registry import ExchangeRegistry
def init_req_feed():
"""Initialize request feed for testing."""
data_queue = queue.Queue()
kwargs = {
"public_key": "test_key",
"private_key": "test_secret",
}
return <Exchange>RequestDataSpot(data_queue, **kwargs)
class Test<Exchange>ExchangeData:
"""Test <Exchange> exchange data configuration."""
def test_exchange_data_spot_creation(self):
"""Test creating <Exchange> spot exchange data."""
exchange_data = <Exchange>ExchangeDataSpot()
assert exchange_data.exchange_name in ["<exchange>", "<exchange>_spot"]
assert exchange_data.asset_type == "spot"
assert exchange_data.rest_url.startswith("https://")
assert len(exchange_data.rest_paths) > 0
def test_get_symbol(self):
"""Test symbol format conversion."""
exchange_data = <Exchange>ExchangeDataSpot()
assert exchange_data.get_symbol("BTC-USDT") == "BTCUSDT"
assert exchange_data.get_symbol("ETH-USDT") == "ETHUSDT"
def test_kline_periods(self):
"""Test kline periods are defined."""
exchange_data = <Exchange>ExchangeDataSpot()
assert "1m" in exchange_data.kline_periods
assert "1h" in exchange_data.kline_periods
assert "1d" in exchange_data.kline_periods
def test_legal_currencies(self):
"""Test legal currencies are defined."""
exchange_data = <Exchange>ExchangeDataSpot()
assert "USDT" in exchange_data.legal_currency
class Test<Exchange>Registration:
"""Test <Exchange> registration."""
def test_<exchange>_registered(self):
"""Test that <Exchange> is properly registered."""
assert "<EXCHANGE>___SPOT" in ExchangeRegistry._feed_classes
assert ExchangeRegistry._feed_classes["<EXCHANGE>___SPOT"] == <Exchange>RequestDataSpot
assert "<EXCHANGE>___SPOT" in ExchangeRegistry._exchange_data_classes
assert ExchangeRegistry._exchange_data_classes["<EXCHANGE>___SPOT"] == <Exchange>ExchangeDataSpot
def test_<exchange>_create_feed(self):
"""Test creating <Exchange> feed through registry."""
data_queue = queue.Queue()
feed = ExchangeRegistry.create_feed("<EXCHANGE>___SPOT", data_queue)
assert isinstance(feed, <Exchange>RequestDataSpot)
def test_<exchange>_create_exchange_data(self):
"""Test creating <Exchange> exchange data through registry."""
exchange_data = ExchangeRegistry.create_exchange_data("<EXCHANGE>___SPOT")
assert isinstance(exchange_data, <Exchange>ExchangeDataSpot)
class Test<Exchange>ServerTime:
"""Test server time endpoint."""
@pytest.mark.skip(reason="Requires network access")
def test_get_server_time(self):
"""Test getting server time."""
feed = init_req_feed()
result = feed.get_server_time()
assert result is not None
assert result.status is True
class Test<Exchange>Ticker:
"""Test ticker data retrieval."""
def test_ticker_container(self):
"""Test ticker data container."""
ticker_data = {
"symbol": "BTCUSDT",
"bidPrice": "49990",
"askPrice": "50010",
"lastPrice": "50000",
"high24h": "51000",
"low24h": "49000",
}
from bt_api_py.containers.tickers.<exchange>_ticker import <Exchange>RequestTickerData
ticker = <Exchange>RequestTickerData(
ticker_data, "BTC-USDT", "SPOT", True
)
ticker.init_data()
assert ticker.get_exchange_name() == "<EXCHANGE>"
assert ticker.get_symbol_name() == "BTC-USDT"
assert ticker.get_last_price() == 50000.0
assert ticker.get_bid_price() == 49990.0
assert ticker.get_ask_price() == 50010.0
@pytest.mark.skip(reason="Requires network access")
def test_get_ticker_live(self):
"""Test getting ticker from live API."""
feed = init_req_feed()
result = feed.get_tick("BTC-USDT")
assert result.status is True
class Test<Exchange>OrderBook:
"""Test order book data retrieval."""
def test_orderbook_container(self):
"""Test orderbook data container."""
orderbook_data = {
"lastUpdateId": 123456789,
"bids": [
["49990", "1.5"],
["49980", "2.0"]
],
"asks": [
["50010", "1.0"],
["50020", "2.5"]
],
}
from bt_api_py.containers.orderbooks.<exchange>_orderbook import <Exchange>RequestOrderBookData
orderbook = <Exchange>RequestOrderBookData(
orderbook_data, "BTC-USDT", "SPOT"
)
orderbook.init_data()
assert orderbook.symbol_name == "BTC-USDT"
assert orderbook.exchange_name == "<EXCHANGE>"
@pytest.mark.skip(reason="Requires network access")
def test_get_depth_live(self):
"""Test getting order book from live API."""
feed = init_req_feed()
result = feed.get_depth("BTC-USDT", count=20)
assert result.status is True
class Test<Exchange>Order:
"""Test order placement and management."""
def test_order_container(self):
"""Test order data container."""
order_data = {
"orderId": "123456",
"symbol": "BTCUSDT",
"side": "BUY",
"type": "LIMIT",
"price": "50000",
"origQty": "0.001",
"status": "FILLED",
}
from bt_api_py.containers.orders.<exchange>_order import <Exchange>RequestOrderData
order = <Exchange>RequestOrderData(
order_data, "BTC-USDT", "SPOT", True
)
order.init_data()
assert order.symbol_name == "BTC-USDT"
assert order.exchange_name == "<EXCHANGE>"
class Test<Exchange>Balance:
"""Test balance data retrieval."""
def test_balance_container(self):
"""Test balance data container."""
balance_data = {
"asset": "BTC",
"free": "0.5",
"locked": "0.1",
}
from bt_api_py.containers.balances.<exchange>_balance import <Exchange>RequestBalanceData
balance = <Exchange>RequestBalanceData(
balance_data, "SPOT", False
)
balance.init_data()
assert balance.get_exchange_name() == "<EXCHANGE>"
assert balance.get_currency() == "BTC"
assert balance.get_available() == 0.5
class Test<Exchange>Capabilities:
"""Test <Exchange> feed capabilities."""
def test_feed_capabilities(self):
"""Test that feed has expected capabilities."""
from bt_api_py.feeds.capability import Capability
capabilities = <Exchange>RequestDataSpot._capabilities()
assert Capability.GET_TICK in capabilities
assert Capability.GET_EXCHANGE_INFO in capabilities
class Test<Exchange>Integration:
"""Integration tests for <Exchange>."""
@pytest.mark.skip(reason="Requires network access")
def test_market_data_api(self):
"""Test market data API calls (requires network)."""
data_queue = queue.Queue()
feed = <Exchange>RequestDataSpot(data_queue)
# Test ticker
ticker = feed.get_tick("BTC-USDT")
assert ticker.status is True
# Test depth
depth = feed.get_depth("BTC-USDT", count=20)
assert depth.status is True
# Test kline
kline = feed.get_kline("BTC-USDT", "1h", count=10)
assert kline.status is True
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Test Categories¶
- ***Configuration Tests**: Verify exchange data class loads correctly
- ***Registration Tests**: Verify registry registration works
- ***Container Tests**: Verify data containers parse responses correctly
- ***Capability Tests**: Verify declared capabilities
- ***Integration Tests**: Live API tests (marked with
@pytest.mark.skip)
8. Special Exchange Types¶
CTP (Futures Market) Pattern¶
CTP uses SPI callback instead of REST/WebSocket:
class CtpExchangeData(ExchangeData):
def __init__(self):
super().__init__()
self.exchange_name = "CTP"
self.rest_url = "" # No REST API
self.wss_url = "" # No WebSocket
self.md_front = "" # Market data front address
self.td_front = "" # Trading front address
def get_rest_path(self, key):
raise NotImplementedError("CTP does not use REST API")
def get_wss_path(self, **kwargs):
raise NotImplementedError("CTP does not use WebSocket")
IB (Broker) Pattern¶
IB uses different APIs for different data types:
class IbWebExchangeData(ExchangeData):
PROD_REST_URL = "https://api.interactivebrokers.com"
TEST_REST_URL = "https://api.test.interactivebrokers.com"
GATEWAY_REST_URL = "https://localhost:5000"
def __init__(self):
# ... loads entirely from YAML config
self.sec_type_map = {} # Security type mapping
self.market_data_fields = {} # Market data field definitions
self.order_type_map = {} # Order type mapping
self.tif_map = {} # Time in force mapping
DEX Pattern¶
For DEX (Uniswap, PancakeSwap, etc.):
id: uniswap
display_name: Uniswap
venue_type: dex
website: https://uniswap.org
# DEX uses RPC endpoints instead of REST
connection:
type: rpc
timeout: 30
# DEX-specific fields
chains: [ethereum, polygon, arbitrum]
router_address: "0xE592427A0AEce92De3Edee1F18E0157C05861564"
factory_address: "0x1F98431c8aD98523631AE4a59f267346ea31F984"
9. Error Handling Pattern¶
Error Translator¶
Location: bt_api_py/errors/<exchange>_error_translator.py
from bt_api_py.error_framework import ErrorTranslator, UnifiedError
class <Exchange>ErrorTranslator(ErrorTranslator):
"""Translate <Exchange> API errors to UnifiedError."""
def translate(self, response, exchange_name="<EXCHANGE>") -> UnifiedError | None:
"""Translate error response to UnifiedError.
Args:
response: Raw error response from exchange
exchange_name: Exchange name
Returns:
UnifiedError or None if no error
"""
if not isinstance(response, dict):
return None
# Check for error code
code = response.get("code")
if code is not None and int(code) < 0:
error_code = int(code)
message = response.get("msg", "Unknown error")
# Map to standard error types
if error_code == -1001:
return UnifiedError(
exchange_name=exchange_name,
error_type=UnifiedError.DISCONNECTED,
message=message,
raw_code=error_code,
)
elif error_code == -1021:
return UnifiedError(
exchange_name=exchange_name,
error_type=UnifiedError.INVALID_TIMESTAMP,
message=message,
raw_code=error_code,
)
# ... more mappings
return None
Usage in Feed Class¶
def __init__(self, data_queue, **kwargs):
# ...
self._error_translator = <Exchange>ErrorTranslator()
def translate_error(self, raw_response):
"""Translate raw response to UnifiedError."""
return self._error_translator.translate(raw_response, self.exchange_name)
10. Rate Limiting Pattern¶
Default Rate Limiter¶
from bt_api_py.rate_limiter import RateLimiter, RateLimitRule, RateLimitScope, RateLimitType
@staticmethod
def _create_default_rate_limiter():
"""Create default rate limiter from config."""
rules = [
RateLimitRule(
name="request_weight",
limit=2400,
interval=60,
type=RateLimitType.SLIDING_WINDOW,
scope=RateLimitScope.GLOBAL,
),
RateLimitRule(
name="order_rate",
limit=100,
interval=10,
type=RateLimitType.SLIDING_WINDOW,
scope=RateLimitScope.ENDPOINT,
endpoint="/api/v1/order*",
),
]
return RateLimiter(rules)
Using the Rate Limiter¶
from bt_api_py.rate_limiter import rate_limit
@rate_limit
def request(self, path, params=None, extra_data=None, is_sign=False):
"""Send HTTP request with rate limiting."""
# Implementation...
11. Common Pitfalls to Avoid¶
- ***Missing
asset_typeattribute**: Always setself.asset_typein subclass__init__ - ***Incorrect logger format**: spdlog doesn't support
%sformatting, use f-strings - ***Missing
typefield in rate_limits**: All rate_limits entries must have atypefield - ***Hardcoded URLs**: Always load from YAML config
- ***Incorrect constructor signature**: Data containers must use
(data, symbol_name, asset_type, has_been_json_encoded) - ***Forgetting registration**: Always create
register_<exchange>.pymodule - ***Mocking config in tests**: Avoid module-level mocking that affects other tests
12. Checklist for New Exchange Integration¶
Configuration¶
- Create
<exchange>.yamlinbt_api_py/configs/ - Include all required fields (id, venue_type, base_urls, connection)
- Add
typefield to all rate_limits entries - Define rest_paths for all endpoints
- Define wss_paths for WebSocket subscriptions
Exchange Data Class¶
- Create
<exchange>_exchange_data.py - Implement
_load_from_config()method - Implement
get_symbol()for symbol conversion - Implement
get_period()for kline period conversion - Create subclasses for each asset type (Spot, Swap, etc.)
- Set
self.asset_typein each subclass
Feed Classes¶
- Create
live_<exchange>directory - Create
request_base.pywith base request class - Create
spot.pywith spot-specific implementation - Implement
_methods for all endpoints - Implement normalize functions for each endpoint
- Implement public API methods
Data Containers¶
- Create ticker container in
containers/tickers/ - Create orderbook container in
containers/orderbooks/ - Create order container in
containers/orders/ - Create balance container in
containers/balances/ - Ensure
init_data()parses all fields - Implement getter methods
Registration¶
- Create
register_<exchange>.py - Import and register all asset types
- Register balance handler
- Register subscribe handler (if WebSocket)
Tests¶
- Create
test_<exchange>.py - Test exchange data creation
- Test registration
- Test data containers
- Test capabilities
- Add integration tests (skipped by default)
Error Handling¶
- Create error translator
- Map common error codes to UnifiedError types
- Use translator in feed class
Summary¶
This pattern document provides a comprehensive guide for implementing new exchanges in bt_api_py. By following these patterns:
- ***Consistency**: All exchanges follow the same structure
- ***Pluggability**: New exchanges can be added without modifying core code
- ***Configurability**: All settings are externalized to YAML
- ***Testability**: Standardized test structure for all exchanges
- ***Maintainability**: Clear separation of concerns between components
For reference implementations, study: - ***Binance**: Most complete implementation with multiple asset types - ***OKX**: Good example of mixin-based organization - ***IB**: Example of broker-style integration - ***CTP**: Example of SPI/callback-based integration