WebSocket Streaming Guide¶
Real-time data streaming with bt_api_py WebSocket connections.
Overview¶
bt_api_py provides WebSocket streaming for: - Real-time market data (tickers, order books, trades) - User data streams (orders, balances, positions) - Custom data subscriptions
WebSocket Connection Management¶
Basic Connection¶
import asyncio
from bt_api_py import BtApi
async def basic_streaming():
api = BtApi()
# Stream ticker data
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
print(f"BTC Price: ${ticker.get_last_price():.2f}")
asyncio.run(basic_streaming())
Multiple Streams¶
async def multi_streaming():
api = BtApi()
async def stream_ticker():
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
print(f"Ticker: ${ticker.get_last_price():.2f}")
async def stream_depth():
async for depth in api.stream_depth("BINANCE___SPOT", "BTCUSDT"):
depth.init_data()
if depth.get_bids() and depth.get_asks():
bid = depth.get_bids()[0][0]
ask = depth.get_asks()[0][0]
print(f"Spread: ${ask - bid:.2f}")
# Run multiple streams concurrently
await asyncio.gather(
stream_ticker(),
stream_depth()
)
asyncio.run(multi_streaming())
Market Data Streams¶
Ticker Stream¶
async def ticker_stream():
api = BtApi()
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
# Access ticker data
symbol = ticker.get_symbol()
price = ticker.get_last_price()
volume = ticker.get_volume()
change = ticker.get_price_change()
print(f"{symbol}: ${price:.2f} ({change:+.2f}%) Vol: {volume:,.0f}")
Order Book Stream¶
async def depth_stream():
api = BtApi()
async for depth in api.stream_depth("BINANCE___SPOT", "BTCUSDT"):
depth.init_data()
# Get best bid/ask
if depth.get_bids() and depth.get_asks():
best_bid = depth.get_bids()[0] # [price, quantity]
best_ask = depth.get_asks()[0]
print(f"Bid: ${best_bid[0]:.2f} ({best_bid[1]:.4f})")
print(f"Ask: ${best_ask[0]:.2f} ({best_ask[1]:.4f})")
# Calculate spread
spread = best_ask[0] - best_bid[0]
spread_pct = (spread / best_bid[0]) * 100
print(f"Spread: ${spread:.2f} ({spread_pct:.3f}%)")
Trade Stream¶
async def trade_stream():
api = BtApi()
async for trade in api.stream_trades("BINANCE___SPOT", "BTCUSDT"):
trade.init_data()
# Access trade data
price = trade.get_price()
quantity = trade.get_quantity()
timestamp = trade.get_timestamp()
side = trade.get_side() # buy/sell
print(f"Trade: {side} {quantity:.4f} @ ${price:.2f} - {timestamp}")
Kline Stream¶
async def kline_stream():
api = BtApi()
async for kline in api.stream_klines("BINANCE___SPOT", "BTCUSDT", "1m"):
kline.init_data()
# Access kline data
timestamp = kline.get_timestamp()
open_price = kline.get_open()
high_price = kline.get_high()
low_price = kline.get_low()
close_price = kline.get_close()
volume = kline.get_volume()
print(f"Candle {timestamp}: O={open_price} H={high_price} L={low_price} C={close_price} V={volume}")
User Data Streams¶
Order Updates¶
async def order_stream():
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "your_key",
"secret": "your_secret"
}
})
async for order in api.stream_orders("BINANCE___SPOT"):
order.init_data()
status = order.get_status()
symbol = order.get_symbol()
side = order.get_side()
if status == "filled":
filled_qty = order.get_filled_quantity()
avg_price = order.get_avg_price()
print(f"✅ Filled: {side} {filled_qty} {symbol} @ ${avg_price}")
elif status == "canceled":
print(f"❌ Canceled: {symbol} {order.get_order_id()}")
elif status == "partial":
print(f"⏳ Partial: {symbol} {order.get_filled_quantity()}/{order.get_quantity()}")
asyncio.run(order_stream())
Balance Updates¶
async def balance_stream():
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "your_key",
"secret": "your_secret"
}
})
async for balance in api.stream_balances("BINANCE___SPOT"):
balance.init_data()
asset = balance.get_asset()
free = balance.get_free_balance()
locked = balance.get_locked_balance()
total = balance.get_total_balance()
print(f"Balance {asset}: Free={free:.6f} Locked={locked:.6f} Total={total:.6f}")
Advanced WebSocket Usage¶
Custom Data Processing¶
class DataProcessor:
def __init__(self):
self.price_history = []
self.sma_window = 20
async def process_stream(self):
api = BtApi()
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
price = ticker.get_last_price()
# Add to history
self.price_history.append(price)
# Keep only recent data
if len(self.price_history) > self.sma_window:
self.price_history.pop(0)
# Calculate indicators
if len(self.price_history) >= self.sma_window:
sma = sum(self.price_history) / self.sma_window
# Generate signals
if price > sma * 1.02: # 2% above SMA
print(f"🔥 HOT: BTC ${price:.2f} (SMA: ${sma:.2f})")
elif price < sma * 0.98: # 2% below SMA
print(f"❄️ COLD: BTC ${price:.2f} (SMA: ${sma:.2f})")
processor = DataProcessor()
asyncio.run(processor.process_stream())
Multi-Exchange Streaming¶
async def multi_exchange_stream():
api = BtApi()
exchanges = ["BINANCE___SPOT", "OKX___SPOT", "HTX___SPOT"]
symbol = "BTCUSDT"
async def stream_exchange(exchange_name):
try:
async for ticker in api.stream_ticker(exchange_name, symbol):
ticker.init_data()
price = ticker.get_last_price()
print(f"{exchange_name}: ${price:.2f}")
except Exception as e:
print(f"{exchange_name} error: {e}")
# Stream from all exchanges
tasks = [stream_exchange(ex) for ex in exchanges]
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(multi_exchange_stream())
Error Handling¶
Connection Resilience¶
import asyncio
from bt_api_py.exceptions import WebSocketError
async def resilient_stream():
api = BtApi()
max_retries = 5
retry_delay = 5
for attempt in range(max_retries):
try:
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
print(f"Price: ${ticker.get_last_price():.2f}")
except WebSocketError as e:
print(f"WebSocket error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
print(f"Reconnecting in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("Max retries reached")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(resilient_stream())
Performance Optimization¶
Stream Filtering¶
async def filtered_stream():
api = BtApi()
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
price = ticker.get_last_price()
volume = ticker.get_volume()
# Only process significant price changes
if volume > 1000: # High volume threshold
print(f"High volume trade: ${price:.2f} Vol: {volume:,.0f}")
asyncio.run(filtered_stream())
Batch Processing¶
async def batch_processing():
api = BtApi()
batch_size = 10
ticker_batch = []
async for ticker in api.stream_ticker("BINANCE___SPOT", "BTCUSDT"):
ticker.init_data()
ticker_batch.append(ticker.get_last_price())
# Process in batches
if len(ticker_batch) >= batch_size:
# Calculate batch statistics
avg_price = sum(ticker_batch) / len(ticker_batch)
min_price = min(ticker_batch)
max_price = max(ticker_batch)
print(f"Batch - Avg: ${avg_price:.2f}, Min: ${min_price:.2f}, Max: ${max_price:.2f}")
# Reset batch
ticker_batch = []
asyncio.run(batch_processing())
Configuration Options¶
WebSocket Settings¶
# Configure WebSocket connection
api = BtApi(exchange_kwargs={
"BINANCE___SPOT": {
"api_key": "key",
"secret": "secret",
"websocket": {
"ping_interval": 20, # Ping interval in seconds
"ping_timeout": 10, # Ping timeout
"max_reconnect": 5, # Max reconnection attempts
"reconnect_delay": 1, # Delay between reconnections
"compression": True, # Enable compression
}
}
})
Best Practices¶
- Handle Disconnections - Implement reconnection logic
- Rate Limit Updates - Don't overwhelm your application
- Filter Data - Process only relevant updates
- Monitor Health - Track connection status
- Use Async - Leverage asyncio for performance
- Buffer Data - Handle temporary network issues
Need more help? Check our API patterns guide.