title: Data Acquisition Guide description: Comprehensive guide for acquiring, cleaning, and storing market data for Backtrader
Data Acquisition Guide¶
Reliable data is the foundation of successful backtesting. This guide covers everything you need to know about acquiring, cleaning, storing, and validating market data for Backtrader.
Quick Start¶
Basic CSV Loading¶
import backtrader as bt
# Load data from CSV file
data = bt.feeds.GenericCSVData(
dataname='data.csv',
datetime=0,
open=1,
high=2,
low=3,
close=4,
volume=5,
fromdate=datetime(2023, 1, 1),
todate=datetime(2023, 12, 31)
)
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.run()
```bash
### Pandas DataFrame Loading
```python
import pandas as pd
import backtrader as bt
# Load data using pandas
df = pd.read_csv('data.csv', parse_dates=['datetime'], index_col='datetime')
# Create data feed
data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data)
```bash
## Exchange Data Interfaces
### Cryptocurrency Exchanges (CCXT)
Backtrader supports 100+ cryptocurrency exchanges through the CCXT library.
#### Binance (Spot)
```python
import backtrader as bt
# Create CCXT Store for Binance
store = bt.stores.CCXTStore(
exchange='binance',
currency='USDT',
config={
'enableRateLimit': True,
'options': {'defaultType': 'spot'}
}
)
# Historical data
data = store.getdata(
dataname='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
compression=15,
fromdate=datetime(2023, 1, 1),
todate=datetime(2023, 12, 31),
ohlcv_limit=1000
)
# Live data with WebSocket
data_live = store.getdata(
dataname='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
compression=1,
use_websocket=True,
backfill_start=True
)
```bash
#### Binance Futures
```python
store = bt.stores.CCXTStore(
exchange='binance',
currency='USDT',
config={
'apiKey': 'your_api_key',
'secret': 'your_secret',
'enableRateLimit': True,
'options': {'defaultType': 'future'}
}
)
data = store.getdata(
dataname='BTCUSDT',
timeframe=bt.TimeFrame.Minutes,
compression=15
)
```bash
#### OKX Exchange
```python
store = bt.stores.CCXTStore(
exchange='okx',
currency='USDT',
config={
'apiKey': 'your_api_key',
'secret': 'your_secret',
'password': 'your_passphrase', # OKX requires passphrase
'enableRateLimit': True
}
)
data = store.getdata(
dataname='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
compression=5
)
```bash
#### Bybit Exchange
```python
store = bt.stores.CCXTStore(
exchange='bybit',
currency='USDT',
config={
'apiKey': 'your_api_key',
'secret': 'your_secret',
'enableRateLimit': True,
'options': {'defaultType': 'linear'}
}
)
```bash
### Traditional Market Data
#### Yahoo Finance
```python
import backtrader as bt
from datetime import datetime
# Yahoo Finance data feed
data = bt.feeds.YahooFinanceData(
dataname='AAPL',
fromdate=datetime(2020, 1, 1),
todate=datetime(2023, 12, 31),
buffered=True
)
cerebro.adddata(data)
```bash
#### Interactive Brokers
```python
# Requires ibpy installation
data = bt.feeds.IBData(
dataname='AAPL-STK-SMART',
fromdate=datetime(2023, 1, 1),
todate=datetime(2023, 12, 31),
historical=True
)
```bash
#### OANDA
```python
# OANDA data feed
store = bt.stores.OandaStore(
token='your_token',
account='your_account_id',
practice=True # Use practice account
)
data = store.getdata(
dataname='EUR_USD',
timeframe=bt.TimeFrame.Minutes,
compression=15
)
```bash
#### Quandl
```python
# Quandl data feed
data = bt.feeds.QuandlData(
dataname='WIKI/AAPL',
fromdate=datetime(2020, 1, 1),
todate=datetime(2023, 12, 31)
)
```bash
### Database Data Sources
#### InfluxDB
```python
# InfluxDB data feed for time-series data
data = bt.feeds.InfluxDB(
dataname='market_data',
host='localhost',
port=8086,
username='user',
password='password',
database='crypto',
measurement='btc_usdt',
timeframe=bt.TimeFrame.Minutes
)
```bash
## Data Cleaning and Preprocessing
### Handling Missing Data
```python
import pandas as pd
import numpy as np
def clean_ohlcv_data(df):
"""Clean OHLCV data for backtesting."""
# Remove duplicates
df = df.drop_duplicates(subset=['datetime'])
# Forward fill missing values (optional)
df = df.ffill()
# Handle outliers - remove bars with unrealistic values
df = df[
(df['high'] >= df['low']) &
(df['high'] >= df['open']) &
(df['high'] >= df['close']) &
(df['low'] <= df['open']) &
(df['low'] <= df['close']) &
(df['volume'] >= 0)
]
# Remove zero prices
df = df[(df['close'] > 0) & (df['open'] > 0)]
return df
# Usage
df = pd.read_csv('raw_data.csv', parse_dates=['datetime'])
df_clean = clean_ohlcv_data(df)
```bash
### Timezone Handling
```python
import pandas as pd
def standardize_timezone(df, timezone='UTC'):
"""Standardize timezone for market data."""
# Ensure datetime is timezone-aware
if df.index.tz is None:
df.index = df.index.tz_localize(timezone)
else:
df.index = df.index.tz_convert(timezone)
return df
# Usage
df = pd.read_csv('data.csv', parse_dates=['datetime'], index_col='datetime')
df = standardize_timezone(df, 'UTC')
```bash
### Resampling Data
```python
def resample_data(df, timeframe='15T'):
"""
Resample OHLCV data to different timeframe.
Timeframes:
- '1T', '5T', '15T', '30T' for minutes
- '1H', '4H' for hours
- '1D' for daily
"""
# Resample with proper aggregation
df_resampled = df.resample(timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
return df_resampled
# Usage: Resample tick data to 15-minute bars
df_15m = resample_data(df_tick, '15T')
```bash
### Outlier Detection
```python
def detect_outliers(df, window=20, threshold=3):
"""Detect price outliers using z-score."""
df = df.copy()
# Calculate z-score for closing prices
df['z_score'] = (
(df['close'] - df['close'].rolling(window).mean()) /
df['close'].rolling(window).std()
)
# Flag outliers
outliers = df[np.abs(df['z_score']) > threshold]
return outliers
# Usage
outliers = detect_outliers(df)
print(f"Found {len(outliers)} outliers")
# Option 1: Remove outliers
df_clean = df[np.abs(df['z_score']) <= 3]
# Option 2: Cap outliers to threshold
df_capped = df.copy()
df_capped['close'] = np.where(
np.abs(df['z_score']) > 3,
df['close'].rolling(20).mean(),
df['close']
)
```bash
## Data Storage Solutions
### CSV Format
- *Pros**: Simple, human-readable, universal compatibility
- *Cons**: Slow for large datasets, no compression
```python
import pandas as pd
# Save to CSV
df.to_csv('market_data.csv', index=True)
# Load from CSV
df = pd.read_csv('market_data.csv', parse_dates=['datetime'], index_col='datetime')
```bash
### Parquet Format
- *Pros**: Fast I/O, excellent compression, columnar storage
- *Cons**: Binary format (not human-readable)
```python
import pandas as pd
# Save to Parquet (recommended for large datasets)
df.to_parquet('market_data.parquet', compression='snappy')
# Load from Parquet
df = pd.read_parquet('market_data.parquet')
# Backtrader usage
data = bt.feeds.PandasData(dataname=df)
```bash
### HDF5 Format
- *Pros**: Fast read/write, hierarchical storage, good for time-series
- *Cons**: Requires PyTables, not as widely supported
```python
import pandas as pd
# Save to HDF5
df.to_hdf('market_data.h5', key='data', mode='w')
# Load from HDF5
df = pd.read_hdf('market_data.h5', key='data')
# Appending to existing file
df_new.to_hdf('market_data.h5', key='data', mode='a', append=True, format='table')
```bash
### Database Storage
#### SQLite (Local)
```python
import sqlite3
import pandas as pd
# Save to SQLite
conn = sqlite3.connect('market_data.db')
df.to_sql('ohlcv', conn, if_exists='replace', index=True)
# Load from SQLite
df = pd.read_sql('SELECT *FROM ohlcv', conn, parse_dates=['datetime'], index_col='datetime')
conn.close()
```bash
#### PostgreSQL (Production)
```python
import psycopg2
from sqlalchemy import create_engine
# Save to PostgreSQL
engine = create_engine('postgresql://user:password@localhost/market_db')
df.to_sql('ohlcv', engine, if_exists='append', index=True)
# Load from PostgreSQL
df = pd.read_sql('SELECT*FROM ohlcv WHERE symbol = "BTC/USDT"', engine, parse_dates=['datetime'], index_col='datetime')
```bash
#### TimescaleDB (Time-series optimized)
```python
# TimescaleDB is PostgreSQL with time-series extensions
engine = create_engine('postgresql://user:password@localhost/timeseries_db')
# Create hypertable for optimal time-series performance
# (run once during setup)
with engine.connect() as conn:
conn.execute("SELECT create_hypertable('ohlcv', 'datetime');")
# Normal PostgreSQL operations work
df.to_sql('ohlcv', engine, if_exists='append', index=True)
```bash
## Real-time Data Handling
### WebSocket Streaming
```python
import backtrader as bt
from datetime import datetime
class LiveStrategy(bt.Strategy):
def __init__(self):
self.sma = bt.indicators.SMA(self.data, period=20)
def next(self):
if self.data.close[0] > self.sma[0]:
self.buy()
# Configure live data with WebSocket
store = bt.stores.CCXTStore(
exchange='binance',
currency='USDT',
config={'enableRateLimit': True}
)
data = store.getdata(
dataname='BTC/USDT',
timeframe=bt.TimeFrame.Minutes,
compression=1,
use_websocket=True, # Enable WebSocket
backfill_start=True, # Load historical data on start
ws_reconnect_delay=5.0, # Reconnection delay
ws_health_check_interval=30.0 # Health check interval
)
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.addstrategy(LiveStrategy)
cerebro.run()
```bash
### Live Data with Reconnection
```python
class RobustLiveStrategy(bt.Strategy):
def notify_data(self, data, status,*args, **kwargs):
"""Handle data status changes."""
if status == data.LIVE:
print(f"Live data connected: {data._name}")
elif status == data.DISCONNECTED:
print(f"Data disconnected: {data._name}")
elif status == data.DELAYED:
print(f"Data delayed: {data._name}")
def next(self):
# Only trade if we have live data
if self.data.live():
# Your strategy logic here
pass
```bash
### Multi-Symbol Live Data
```python
# Subscribe to multiple symbols
symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']
store = bt.stores.CCXTStore(
exchange='binance',
currency='USDT'
)
cerebro = bt.Cerebro()
for symbol in symbols:
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=1,
use_websocket=True,
backfill_start=True
)
cerebro.adddata(data, name=symbol)
cerebro.run()
```bash
## Historical Data Backfill
### Fetching Historical Data
```python
import backtrader as bt
from datetime import datetime, timedelta
store = bt.stores.CCXTStore(
exchange='binance',
currency='USDT'
)
# Fetch historical data in chunks
def fetch_historical_data(symbol, start_date, end_date, timeframe='15m'):
"""Fetch historical data in chunks to handle API limits."""
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=15, # 15-minute bars
fromdate=start_date,
todate=end_date,
ohlcv_limit=1000, # Bars per request
historical=True
)
# Convert to DataFrame for storage
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.run()
return data
# Usage
start = datetime(2023, 1, 1)
end = datetime(2023, 12, 31)
data = fetch_historical_data('BTC/USDT', start, end)
```bash
### Backfill with Storage
```python
def backfill_and_store(symbol, start_date, end_date, storage_path):
"""Fetch historical data and store to file."""
import pandas as pd
# Fetch data
store = bt.stores.CCXTStore(exchange='binance', currency='USDT')
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=15,
fromdate=start_date,
todate=end_date,
historical=True
)
# Run cerebro to load data
cerebro = bt.Cerebro()
cerebro.adddata(data)
# Extract data and save
# (This depends on your specific implementation)
# data_df = extract_dataframe(data)
# data_df.to_parquet(storage_path)
print(f"Backfilled {symbol} to {storage_path}")
# Usage
backfill_and_store(
'BTC/USDT',
datetime(2020, 1, 1),
datetime.now(),
'data/btc_usdt_15m.parquet'
)
```bash
## Data Quality Validation
### Validation Checklist
```python
def validate_ohlcv_data(df):
"""Comprehensive OHLCV data validation."""
issues = []
# 1. Check for missing values
missing = df.isnull().sum()
if missing.any():
issues.append(f"Missing values: {missing[missing > 0].to_dict()}")
# 2. Check for duplicate timestamps
duplicates = df.index.duplicated()
if duplicates.sum() > 0:
issues.append(f"Duplicate timestamps: {duplicates.sum()} found")
# 3. Check OHLC relationships
invalid_ohlc = (
(df['high'] < df['low']) |
(df['high'] < df['open']) |
(df['high'] < df['close']) |
(df['low'] > df['open']) |
(df['low'] > df['close'])
)
if invalid_ohlc.sum() > 0:
issues.append(f"Invalid OHLC relationships: {invalid_ohlc.sum()} bars")
# 4. Check for negative values
negative = (df[['open', 'high', 'low', 'close', 'volume']] < 0).any()
if negative.any():
issues.append(f"Negative values: {negative[negative].index.tolist()}")
# 5. Check for zero prices
zero_prices = (df['close'] == 0).sum()
if zero_prices > 0:
issues.append(f"Zero close prices: {zero_prices} bars")
# 6. Check time sequence
not_monotonic = not df.index.is_monotonic_increasing
if not_monotonic:
issues.append("Timestamps not monotonically increasing")
# 7. Check for outliers (extreme price changes)
price_change = df['close'].pct_change().abs()
extreme_changes = price_change > 0.5 # More than 50% change
if extreme_changes.sum() > 0:
issues.append(f"Extreme price changes: {extreme_changes.sum()} bars")
return issues
# Usage
df = pd.read_parquet('market_data.parquet')
issues = validate_ohlcv_data(df)
if issues:
print("Data validation issues found:")
for issue in issues:
print(f" - {issue}")
else:
print("Data validation passed!")
```bash
### Statistical Summary
```python
def data_quality_report(df):
"""Generate a comprehensive data quality report."""
report = {
'total_bars': len(df),
'date_range': f"{df.index.min()} to {df.index.max()}",
'missing_values': df.isnull().sum().to_dict(),
'duplicate_bars': df.index.duplicated().sum(),
'price_stats': {
'min_close': df['close'].min(),
'max_close': df['close'].max(),
'mean_close': df['close'].mean(),
'std_close': df['close'].std()
},
'volume_stats': {
'min_volume': df['volume'].min(),
'max_volume': df['volume'].max(),
'mean_volume': df['volume'].mean()
},
'gaps': detect_time_gaps(df)
}
return report
def detect_time_gaps(df, expected_freq='15T'):
"""Detect time gaps in the data."""
expected_delta = pd.Timedelta(expected_freq)
actual_deltas = df.index.to_series().diff()
gaps = actual_deltas[actual_deltas > expected_delta * 1.5]
return len(gaps)
# Usage
report = data_quality_report(df)
import json
print(json.dumps(report, indent=2, default=str))
```bash
## Complete Examples
### Example 1: Crypto Trading System
```python
import backtrader as bt
from datetime import datetime
class CryptoTradingSystem:
def __init__(self, exchange='binance'):
self.exchange = exchange
self.cerebro = bt.Cerebro()
def setup_data(self, symbols, timeframe='15m', use_ws=True):
"""Setup data feeds for multiple symbols."""
store = bt.stores.CCXTStore(
exchange=self.exchange,
currency='USDT',
config={'enableRateLimit': True}
)
for symbol in symbols:
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=int(timeframe[:-1]),
use_websocket=use_ws,
backfill_start=True,
drop_newest=True
)
self.cerebro.adddata(data, name=symbol)
return self
def add_strategy(self, strategy_class, **kwargs):
"""Add trading strategy."""
self.cerebro.addstrategy(strategy_class, **kwargs)
return self
def setup_broker(self, initial_cash=10000, commission=0.001):
"""Configure broker."""
self.cerebro.broker.setcash(initial_cash)
self.cerebro.broker.setcommission(commission=commission)
return self
def run(self):
"""Execute the backtest or live trading."""
return self.cerebro.run()
# Usage
system = CryptoTradingSystem('binance')
system.setup_data(['BTC/USDT', 'ETH/USDT'], use_ws=False)
system.setup_broker(initial_cash=10000)
system.add_strategy(MyStrategy)
results = system.run()
```bash
### Example 2: Data Pipeline
```python
import pandas as pd
import backtrader as bt
from datetime import datetime, timedelta
class DataPipeline:
"""Complete data pipeline for Backtrader."""
def __init__(self, storage_path='./data'):
self.storage_path = storage_path
def fetch(self, exchange, symbol, start, end, timeframe='15m'):
"""Fetch data from exchange."""
store = bt.stores.CCXTStore(exchange=exchange, currency='USDT')
compression = int(timeframe[:-1])
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=compression,
fromdate=start,
todate=end,
historical=True,
ohlcv_limit=1000
)
return data
def clean(self, df):
"""Clean and validate data."""
# Remove duplicates
df = df[~df.index.duplicated(keep='first')]
# Validate OHLC
df = df[
(df['high'] >= df['low']) &
(df['high'] >= df['open']) &
(df['high'] >= df['close']) &
(df['low'] <= df['open']) &
(df['low'] <= df['close']) &
(df['volume'] >= 0)
]
# Forward fill small gaps
df = df.ffill(limit=3)
return df
def store(self, df, symbol, timeframe):
"""Store data efficiently."""
filename = f"{self.storage_path}/{symbol.replace('/', '_')}_{timeframe}.parquet"
df.to_parquet(filename, compression='snappy')
print(f"Stored data to {filename}")
return filename
def load(self, symbol, timeframe):
"""Load stored data."""
filename = f"{self.storage_path}/{symbol.replace('/', '_')}_{timeframe}.parquet"
df = pd.read_parquet(filename)
return df
def create_feed(self, df):
"""Create Backtrader data feed."""
return bt.feeds.PandasData(dataname=df)
# Usage
pipeline = DataPipeline()
# Fetch and store
start_date = datetime(2023, 1, 1)
end_date = datetime.now()
data = pipeline.fetch('binance', 'BTC/USDT', start_date, end_date)
# ... convert data to DataFrame ...
df = convert_to_dataframe(data)
df_clean = pipeline.clean(df)
pipeline.store(df_clean, 'BTC/USDT', '15m')
# Load and create feed
df_loaded = pipeline.load('BTC/USDT', '15m')
feed = pipeline.create_feed(df_loaded)
```bash
### Example 3: Multi-Source Data Aggregator
```python
class MultiSourceAggregator:
"""Aggregate data from multiple sources."""
def __init__(self):
self.sources = []
def add_csv_source(self, path, symbol):
"""Add CSV data source."""
df = pd.read_csv(path, parse_dates=['datetime'], index_col='datetime')
self.sources.append({'symbol': symbol, 'data': df, 'type': 'csv'})
return self
def add_exchange_source(self, exchange, symbol, start, end):
"""Add exchange data source."""
store = bt.stores.CCXTStore(exchange=exchange, currency='USDT')
data = store.getdata(
dataname=symbol,
timeframe=bt.TimeFrame.Minutes,
compression=15,
fromdate=start,
todate=end,
historical=True
)
self.sources.append({'symbol': symbol, 'data': data, 'type': 'exchange'})
return self
def normalize(self):
"""Normalize all data sources to common format."""
normalized = []
for source in self.sources:
if source['type'] == 'csv':
df = source['data']
else:
# Convert exchange data to DataFrame
df = self._exchange_to_df(source['data'])
# Apply standard resampling
df = self._resample_to_common(df, '15T')
normalized.append({'symbol': source['symbol'], 'data': df})
return normalized
def merge(self, normalized_data):
"""Merge multiple data sources."""
merged = pd.DataFrame()
for item in normalized_data:
if merged.empty:
merged = item['data'].copy()
else:
merged = merged.join(item['data'], how='outer', rsuffix=f"_{item['symbol']}")
return merged
# Usage
aggregator = MultiSourceAggregator()
aggregator.add_csv_source('data/BTC.csv', 'BTC')
aggregator.add_exchange_source('binance', 'ETH/USDT', datetime(2023, 1, 1), datetime.now())
normalized = aggregator.normalize()
merged = aggregator.merge(normalized)
```bash
## Best Practices
### Data Sourcing
1. **Use multiple data sources**for critical data to verify accuracy
2.**Check data frequency**matches your strategy requirements
3.**Consider survivorship bias**when using stock data
4.**Include dividend and split adjustments**for equity data
### Data Storage
1.**Use Parquet format**for large datasets (best performance/compression ratio)
2.**Organize by symbol and timeframe**in directory structure
3.**Keep raw data separate**from processed data
4.**Version your datasets**for reproducibility
### Real-time Data
1.**Always implement reconnection logic**for live feeds
2.**Use WebSocket**for lower latency when available
3.**Backfill missing bars**on reconnection
4.**Monitor data quality**in real-time
### Data Validation
1.**Validate before backtesting**- catch issues early
2.**Log data quality metrics**for each backtest
3.**Set up alerts**for unusual data patterns
4.**Document known data issues** (e.g., exchange downtime)
## Troubleshooting
### Common Issues
| Issue | Solution |
|-------|----------|
| Data gaps in series | Use `ffill()` or detect and mark gap periods |
| Duplicate timestamps | `df.drop_duplicates(subset=['datetime'])` |
| Wrong timezone | Standardize all data to UTC |
| Memory errors | Use `qbuffer()` or process in chunks |
| Slow loading | Convert CSV to Parquet format |
| Missing bars | Implement backfill logic |
| Invalid OHLC | Validate and filter/correct |
## Next Steps
- [Performance Optimization](performance-optimization.md) - Speed up your backtests
- [TS Mode Guide](ts-mode.md) - Time series optimization for large datasets
- [Live Trading Guide](../CCXT_LIVE_TRADING_GUIDE.md) - Real trading with CCXT