title: Data Acquisition Guide description: Comprehensive guide for acquiring, cleaning, and storing market data for Backtrader


Data Acquisition Guide

Reliable data is the foundation of successful backtesting. This guide covers everything you need to know about acquiring, cleaning, storing, and validating market data for Backtrader.

Quick Start

Basic CSV Loading

import backtrader as bt

# Load data from CSV file

data = bt.feeds.GenericCSVData(
    dataname='data.csv',
    datetime=0,
    open=1,
    high=2,
    low=3,
    close=4,
    volume=5,
    fromdate=datetime(2023, 1, 1),
    todate=datetime(2023, 12, 31)
)

cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.run()

```bash

### Pandas DataFrame Loading

```python
import pandas as pd
import backtrader as bt

# Load data using pandas

df = pd.read_csv('data.csv', parse_dates=['datetime'], index_col='datetime')

# Create data feed

data = bt.feeds.PandasData(dataname=df)

cerebro.adddata(data)

```bash

## Exchange Data Interfaces

### Cryptocurrency Exchanges (CCXT)

Backtrader supports 100+ cryptocurrency exchanges through the CCXT library.

#### Binance (Spot)

```python
import backtrader as bt

# Create CCXT Store for Binance

store = bt.stores.CCXTStore(
    exchange='binance',
    currency='USDT',
    config={
        'enableRateLimit': True,
        'options': {'defaultType': 'spot'}
    }
)

# Historical data

data = store.getdata(
    dataname='BTC/USDT',
    timeframe=bt.TimeFrame.Minutes,
    compression=15,
    fromdate=datetime(2023, 1, 1),
    todate=datetime(2023, 12, 31),
    ohlcv_limit=1000
)

# Live data with WebSocket

data_live = store.getdata(
    dataname='BTC/USDT',
    timeframe=bt.TimeFrame.Minutes,
    compression=1,
    use_websocket=True,
    backfill_start=True
)

```bash

#### Binance Futures

```python
store = bt.stores.CCXTStore(
    exchange='binance',
    currency='USDT',
    config={
        'apiKey': 'your_api_key',
        'secret': 'your_secret',
        'enableRateLimit': True,
        'options': {'defaultType': 'future'}
    }
)

data = store.getdata(
    dataname='BTCUSDT',
    timeframe=bt.TimeFrame.Minutes,
    compression=15
)

```bash

#### OKX Exchange

```python
store = bt.stores.CCXTStore(
    exchange='okx',
    currency='USDT',
    config={
        'apiKey': 'your_api_key',
        'secret': 'your_secret',
        'password': 'your_passphrase',  # OKX requires passphrase
        'enableRateLimit': True
    }
)

data = store.getdata(
    dataname='BTC/USDT',
    timeframe=bt.TimeFrame.Minutes,
    compression=5
)

```bash

#### Bybit Exchange

```python
store = bt.stores.CCXTStore(
    exchange='bybit',
    currency='USDT',
    config={
        'apiKey': 'your_api_key',
        'secret': 'your_secret',
        'enableRateLimit': True,
        'options': {'defaultType': 'linear'}
    }
)

```bash

### Traditional Market Data

#### Yahoo Finance

```python
import backtrader as bt
from datetime import datetime

# Yahoo Finance data feed

data = bt.feeds.YahooFinanceData(
    dataname='AAPL',
    fromdate=datetime(2020, 1, 1),
    todate=datetime(2023, 12, 31),
    buffered=True
)

cerebro.adddata(data)

```bash

#### Interactive Brokers

```python

# Requires ibpy installation

data = bt.feeds.IBData(
    dataname='AAPL-STK-SMART',
    fromdate=datetime(2023, 1, 1),
    todate=datetime(2023, 12, 31),
    historical=True
)

```bash

#### OANDA

```python

# OANDA data feed

store = bt.stores.OandaStore(
    token='your_token',
    account='your_account_id',
    practice=True  # Use practice account

)

data = store.getdata(
    dataname='EUR_USD',
    timeframe=bt.TimeFrame.Minutes,
    compression=15
)

```bash

#### Quandl

```python

# Quandl data feed

data = bt.feeds.QuandlData(
    dataname='WIKI/AAPL',
    fromdate=datetime(2020, 1, 1),
    todate=datetime(2023, 12, 31)
)

```bash

### Database Data Sources

#### InfluxDB

```python

# InfluxDB data feed for time-series data

data = bt.feeds.InfluxDB(
    dataname='market_data',
    host='localhost',
    port=8086,
    username='user',
    password='password',
    database='crypto',
    measurement='btc_usdt',
    timeframe=bt.TimeFrame.Minutes
)

```bash

## Data Cleaning and Preprocessing

### Handling Missing Data

```python
import pandas as pd
import numpy as np

def clean_ohlcv_data(df):
    """Clean OHLCV data for backtesting."""

# Remove duplicates
    df = df.drop_duplicates(subset=['datetime'])

# Forward fill missing values (optional)
    df = df.ffill()

# Handle outliers - remove bars with unrealistic values
    df = df[
        (df['high'] >= df['low']) &
        (df['high'] >= df['open']) &
        (df['high'] >= df['close']) &
        (df['low'] <= df['open']) &
        (df['low'] <= df['close']) &
        (df['volume'] >= 0)
    ]

# Remove zero prices
    df = df[(df['close'] > 0) & (df['open'] > 0)]

    return df

# Usage

df = pd.read_csv('raw_data.csv', parse_dates=['datetime'])
df_clean = clean_ohlcv_data(df)

```bash

### Timezone Handling

```python
import pandas as pd

def standardize_timezone(df, timezone='UTC'):
    """Standardize timezone for market data."""

# Ensure datetime is timezone-aware
    if df.index.tz is None:
        df.index = df.index.tz_localize(timezone)
    else:
        df.index = df.index.tz_convert(timezone)

    return df

# Usage

df = pd.read_csv('data.csv', parse_dates=['datetime'], index_col='datetime')
df = standardize_timezone(df, 'UTC')

```bash

### Resampling Data

```python
def resample_data(df, timeframe='15T'):
    """
    Resample OHLCV data to different timeframe.

    Timeframes:

    - '1T', '5T', '15T', '30T' for minutes
    - '1H', '4H' for hours
    - '1D' for daily

    """

# Resample with proper aggregation
    df_resampled = df.resample(timeframe).agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        'volume': 'sum'
    }).dropna()

    return df_resampled

# Usage: Resample tick data to 15-minute bars

df_15m = resample_data(df_tick, '15T')

```bash

### Outlier Detection

```python
def detect_outliers(df, window=20, threshold=3):
    """Detect price outliers using z-score."""
    df = df.copy()

# Calculate z-score for closing prices
    df['z_score'] = (
        (df['close'] - df['close'].rolling(window).mean()) /
        df['close'].rolling(window).std()
    )

# Flag outliers
    outliers = df[np.abs(df['z_score']) > threshold]

    return outliers

# Usage

outliers = detect_outliers(df)
print(f"Found {len(outliers)} outliers")

# Option 1: Remove outliers

df_clean = df[np.abs(df['z_score']) <= 3]

# Option 2: Cap outliers to threshold

df_capped = df.copy()
df_capped['close'] = np.where(
    np.abs(df['z_score']) > 3,
    df['close'].rolling(20).mean(),
    df['close']
)

```bash

## Data Storage Solutions

### CSV Format

- *Pros**: Simple, human-readable, universal compatibility
- *Cons**: Slow for large datasets, no compression

```python
import pandas as pd

# Save to CSV

df.to_csv('market_data.csv', index=True)

# Load from CSV

df = pd.read_csv('market_data.csv', parse_dates=['datetime'], index_col='datetime')

```bash

### Parquet Format

- *Pros**: Fast I/O, excellent compression, columnar storage
- *Cons**: Binary format (not human-readable)

```python
import pandas as pd

# Save to Parquet (recommended for large datasets)

df.to_parquet('market_data.parquet', compression='snappy')

# Load from Parquet

df = pd.read_parquet('market_data.parquet')

# Backtrader usage

data = bt.feeds.PandasData(dataname=df)

```bash

### HDF5 Format

- *Pros**: Fast read/write, hierarchical storage, good for time-series
- *Cons**: Requires PyTables, not as widely supported

```python
import pandas as pd

# Save to HDF5

df.to_hdf('market_data.h5', key='data', mode='w')

# Load from HDF5

df = pd.read_hdf('market_data.h5', key='data')

# Appending to existing file

df_new.to_hdf('market_data.h5', key='data', mode='a', append=True, format='table')

```bash

### Database Storage

#### SQLite (Local)

```python
import sqlite3
import pandas as pd

# Save to SQLite

conn = sqlite3.connect('market_data.db')
df.to_sql('ohlcv', conn, if_exists='replace', index=True)

# Load from SQLite

df = pd.read_sql('SELECT *FROM ohlcv', conn, parse_dates=['datetime'], index_col='datetime')
conn.close()

```bash

#### PostgreSQL (Production)

```python
import psycopg2
from sqlalchemy import create_engine

# Save to PostgreSQL

engine = create_engine('postgresql://user:password@localhost/market_db')
df.to_sql('ohlcv', engine, if_exists='append', index=True)

# Load from PostgreSQL

df = pd.read_sql('SELECT*FROM ohlcv WHERE symbol = "BTC/USDT"', engine, parse_dates=['datetime'], index_col='datetime')

```bash

#### TimescaleDB (Time-series optimized)

```python

# TimescaleDB is PostgreSQL with time-series extensions

engine = create_engine('postgresql://user:password@localhost/timeseries_db')

# Create hypertable for optimal time-series performance

# (run once during setup)

with engine.connect() as conn:
    conn.execute("SELECT create_hypertable('ohlcv', 'datetime');")

# Normal PostgreSQL operations work

df.to_sql('ohlcv', engine, if_exists='append', index=True)

```bash

## Real-time Data Handling

### WebSocket Streaming

```python
import backtrader as bt
from datetime import datetime

class LiveStrategy(bt.Strategy):
    def __init__(self):
        self.sma = bt.indicators.SMA(self.data, period=20)

    def next(self):
        if self.data.close[0] > self.sma[0]:
            self.buy()

# Configure live data with WebSocket

store = bt.stores.CCXTStore(
    exchange='binance',
    currency='USDT',
    config={'enableRateLimit': True}
)

data = store.getdata(
    dataname='BTC/USDT',
    timeframe=bt.TimeFrame.Minutes,
    compression=1,
    use_websocket=True,              # Enable WebSocket
    backfill_start=True,              # Load historical data on start
    ws_reconnect_delay=5.0,           # Reconnection delay
    ws_health_check_interval=30.0     # Health check interval

)

cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.addstrategy(LiveStrategy)
cerebro.run()

```bash

### Live Data with Reconnection

```python
class RobustLiveStrategy(bt.Strategy):
    def notify_data(self, data, status,*args, **kwargs):
        """Handle data status changes."""
        if status == data.LIVE:
            print(f"Live data connected: {data._name}")
        elif status == data.DISCONNECTED:
            print(f"Data disconnected: {data._name}")
        elif status == data.DELAYED:
            print(f"Data delayed: {data._name}")

    def next(self):

# Only trade if we have live data
        if self.data.live():

# Your strategy logic here
            pass

```bash

### Multi-Symbol Live Data

```python

# Subscribe to multiple symbols

symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']

store = bt.stores.CCXTStore(
    exchange='binance',
    currency='USDT'
)

cerebro = bt.Cerebro()

for symbol in symbols:
    data = store.getdata(
        dataname=symbol,
        timeframe=bt.TimeFrame.Minutes,
        compression=1,
        use_websocket=True,
        backfill_start=True
    )
    cerebro.adddata(data, name=symbol)

cerebro.run()

```bash

## Historical Data Backfill

### Fetching Historical Data

```python
import backtrader as bt
from datetime import datetime, timedelta

store = bt.stores.CCXTStore(
    exchange='binance',
    currency='USDT'
)

# Fetch historical data in chunks

def fetch_historical_data(symbol, start_date, end_date, timeframe='15m'):
    """Fetch historical data in chunks to handle API limits."""
    data = store.getdata(
        dataname=symbol,
        timeframe=bt.TimeFrame.Minutes,
        compression=15,  # 15-minute bars
        fromdate=start_date,
        todate=end_date,
        ohlcv_limit=1000,  # Bars per request
        historical=True
    )

# Convert to DataFrame for storage
    cerebro = bt.Cerebro()
    cerebro.adddata(data)
    cerebro.run()

    return data

# Usage

start = datetime(2023, 1, 1)
end = datetime(2023, 12, 31)

data = fetch_historical_data('BTC/USDT', start, end)

```bash

### Backfill with Storage

```python
def backfill_and_store(symbol, start_date, end_date, storage_path):
    """Fetch historical data and store to file."""
    import pandas as pd

# Fetch data
    store = bt.stores.CCXTStore(exchange='binance', currency='USDT')
    data = store.getdata(
        dataname=symbol,
        timeframe=bt.TimeFrame.Minutes,
        compression=15,
        fromdate=start_date,
        todate=end_date,
        historical=True
    )

# Run cerebro to load data
    cerebro = bt.Cerebro()
    cerebro.adddata(data)

# Extract data and save

# (This depends on your specific implementation)

# data_df = extract_dataframe(data)

# data_df.to_parquet(storage_path)

    print(f"Backfilled {symbol} to {storage_path}")

# Usage

backfill_and_store(
    'BTC/USDT',
    datetime(2020, 1, 1),
    datetime.now(),
    'data/btc_usdt_15m.parquet'
)

```bash

## Data Quality Validation

### Validation Checklist

```python
def validate_ohlcv_data(df):
    """Comprehensive OHLCV data validation."""
    issues = []

# 1. Check for missing values
    missing = df.isnull().sum()
    if missing.any():
        issues.append(f"Missing values: {missing[missing > 0].to_dict()}")

# 2. Check for duplicate timestamps
    duplicates = df.index.duplicated()
    if duplicates.sum() > 0:
        issues.append(f"Duplicate timestamps: {duplicates.sum()} found")

# 3. Check OHLC relationships
    invalid_ohlc = (
        (df['high'] < df['low']) |

        (df['high'] < df['open']) |

        (df['high'] < df['close']) |

        (df['low'] > df['open']) |

        (df['low'] > df['close'])
    )
    if invalid_ohlc.sum() > 0:
        issues.append(f"Invalid OHLC relationships: {invalid_ohlc.sum()} bars")

# 4. Check for negative values
    negative = (df[['open', 'high', 'low', 'close', 'volume']] < 0).any()
    if negative.any():
        issues.append(f"Negative values: {negative[negative].index.tolist()}")

# 5. Check for zero prices
    zero_prices = (df['close'] == 0).sum()
    if zero_prices > 0:
        issues.append(f"Zero close prices: {zero_prices} bars")

# 6. Check time sequence
    not_monotonic = not df.index.is_monotonic_increasing
    if not_monotonic:
        issues.append("Timestamps not monotonically increasing")

# 7. Check for outliers (extreme price changes)
    price_change = df['close'].pct_change().abs()
    extreme_changes = price_change > 0.5  # More than 50% change
    if extreme_changes.sum() > 0:
        issues.append(f"Extreme price changes: {extreme_changes.sum()} bars")

    return issues

# Usage

df = pd.read_parquet('market_data.parquet')
issues = validate_ohlcv_data(df)

if issues:
    print("Data validation issues found:")
    for issue in issues:
        print(f"  - {issue}")
else:
    print("Data validation passed!")

```bash

### Statistical Summary

```python
def data_quality_report(df):
    """Generate a comprehensive data quality report."""
    report = {
        'total_bars': len(df),
        'date_range': f"{df.index.min()} to {df.index.max()}",
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_bars': df.index.duplicated().sum(),
        'price_stats': {
            'min_close': df['close'].min(),
            'max_close': df['close'].max(),
            'mean_close': df['close'].mean(),
            'std_close': df['close'].std()
        },
        'volume_stats': {
            'min_volume': df['volume'].min(),
            'max_volume': df['volume'].max(),
            'mean_volume': df['volume'].mean()
        },
        'gaps': detect_time_gaps(df)
    }

    return report

def detect_time_gaps(df, expected_freq='15T'):
    """Detect time gaps in the data."""
    expected_delta = pd.Timedelta(expected_freq)
    actual_deltas = df.index.to_series().diff()
    gaps = actual_deltas[actual_deltas > expected_delta * 1.5]

    return len(gaps)

# Usage

report = data_quality_report(df)
import json
print(json.dumps(report, indent=2, default=str))

```bash

## Complete Examples

### Example 1: Crypto Trading System

```python
import backtrader as bt
from datetime import datetime

class CryptoTradingSystem:
    def __init__(self, exchange='binance'):
        self.exchange = exchange
        self.cerebro = bt.Cerebro()

    def setup_data(self, symbols, timeframe='15m', use_ws=True):
        """Setup data feeds for multiple symbols."""
        store = bt.stores.CCXTStore(
            exchange=self.exchange,
            currency='USDT',
            config={'enableRateLimit': True}
        )

        for symbol in symbols:
            data = store.getdata(
                dataname=symbol,
                timeframe=bt.TimeFrame.Minutes,
                compression=int(timeframe[:-1]),
                use_websocket=use_ws,
                backfill_start=True,
                drop_newest=True
            )
            self.cerebro.adddata(data, name=symbol)

        return self

    def add_strategy(self, strategy_class, **kwargs):
        """Add trading strategy."""
        self.cerebro.addstrategy(strategy_class, **kwargs)
        return self

    def setup_broker(self, initial_cash=10000, commission=0.001):
        """Configure broker."""
        self.cerebro.broker.setcash(initial_cash)
        self.cerebro.broker.setcommission(commission=commission)
        return self

    def run(self):
        """Execute the backtest or live trading."""
        return self.cerebro.run()

# Usage

system = CryptoTradingSystem('binance')
system.setup_data(['BTC/USDT', 'ETH/USDT'], use_ws=False)
system.setup_broker(initial_cash=10000)
system.add_strategy(MyStrategy)
results = system.run()

```bash

### Example 2: Data Pipeline

```python
import pandas as pd
import backtrader as bt
from datetime import datetime, timedelta

class DataPipeline:
    """Complete data pipeline for Backtrader."""

    def __init__(self, storage_path='./data'):
        self.storage_path = storage_path

    def fetch(self, exchange, symbol, start, end, timeframe='15m'):
        """Fetch data from exchange."""
        store = bt.stores.CCXTStore(exchange=exchange, currency='USDT')
        compression = int(timeframe[:-1])

        data = store.getdata(
            dataname=symbol,
            timeframe=bt.TimeFrame.Minutes,
            compression=compression,
            fromdate=start,
            todate=end,
            historical=True,
            ohlcv_limit=1000
        )

        return data

    def clean(self, df):
        """Clean and validate data."""

# Remove duplicates
        df = df[~df.index.duplicated(keep='first')]

# Validate OHLC
        df = df[
            (df['high'] >= df['low']) &
            (df['high'] >= df['open']) &
            (df['high'] >= df['close']) &
            (df['low'] <= df['open']) &
            (df['low'] <= df['close']) &
            (df['volume'] >= 0)
        ]

# Forward fill small gaps
        df = df.ffill(limit=3)

        return df

    def store(self, df, symbol, timeframe):
        """Store data efficiently."""
        filename = f"{self.storage_path}/{symbol.replace('/', '_')}_{timeframe}.parquet"
        df.to_parquet(filename, compression='snappy')
        print(f"Stored data to {filename}")
        return filename

    def load(self, symbol, timeframe):
        """Load stored data."""
        filename = f"{self.storage_path}/{symbol.replace('/', '_')}_{timeframe}.parquet"
        df = pd.read_parquet(filename)
        return df

    def create_feed(self, df):
        """Create Backtrader data feed."""
        return bt.feeds.PandasData(dataname=df)

# Usage

pipeline = DataPipeline()

# Fetch and store

start_date = datetime(2023, 1, 1)
end_date = datetime.now()

data = pipeline.fetch('binance', 'BTC/USDT', start_date, end_date)

# ... convert data to DataFrame ...

df = convert_to_dataframe(data)
df_clean = pipeline.clean(df)
pipeline.store(df_clean, 'BTC/USDT', '15m')

# Load and create feed

df_loaded = pipeline.load('BTC/USDT', '15m')
feed = pipeline.create_feed(df_loaded)

```bash

### Example 3: Multi-Source Data Aggregator

```python
class MultiSourceAggregator:
    """Aggregate data from multiple sources."""

    def __init__(self):
        self.sources = []

    def add_csv_source(self, path, symbol):
        """Add CSV data source."""
        df = pd.read_csv(path, parse_dates=['datetime'], index_col='datetime')
        self.sources.append({'symbol': symbol, 'data': df, 'type': 'csv'})
        return self

    def add_exchange_source(self, exchange, symbol, start, end):
        """Add exchange data source."""
        store = bt.stores.CCXTStore(exchange=exchange, currency='USDT')
        data = store.getdata(
            dataname=symbol,
            timeframe=bt.TimeFrame.Minutes,
            compression=15,
            fromdate=start,
            todate=end,
            historical=True
        )
        self.sources.append({'symbol': symbol, 'data': data, 'type': 'exchange'})
        return self

    def normalize(self):
        """Normalize all data sources to common format."""
        normalized = []
        for source in self.sources:
            if source['type'] == 'csv':
                df = source['data']
            else:

# Convert exchange data to DataFrame
                df = self._exchange_to_df(source['data'])

# Apply standard resampling
            df = self._resample_to_common(df, '15T')
            normalized.append({'symbol': source['symbol'], 'data': df})

        return normalized

    def merge(self, normalized_data):
        """Merge multiple data sources."""
        merged = pd.DataFrame()
        for item in normalized_data:
            if merged.empty:
                merged = item['data'].copy()
            else:
                merged = merged.join(item['data'], how='outer', rsuffix=f"_{item['symbol']}")

        return merged

# Usage

aggregator = MultiSourceAggregator()
aggregator.add_csv_source('data/BTC.csv', 'BTC')
aggregator.add_exchange_source('binance', 'ETH/USDT', datetime(2023, 1, 1), datetime.now())

normalized = aggregator.normalize()
merged = aggregator.merge(normalized)

```bash

## Best Practices

### Data Sourcing

1. **Use multiple data sources**for critical data to verify accuracy

2.**Check data frequency**matches your strategy requirements
3.**Consider survivorship bias**when using stock data
4.**Include dividend and split adjustments**for equity data

### Data Storage

1.**Use Parquet format**for large datasets (best performance/compression ratio)
2.**Organize by symbol and timeframe**in directory structure
3.**Keep raw data separate**from processed data
4.**Version your datasets**for reproducibility

### Real-time Data

1.**Always implement reconnection logic**for live feeds
2.**Use WebSocket**for lower latency when available
3.**Backfill missing bars**on reconnection
4.**Monitor data quality**in real-time

### Data Validation

1.**Validate before backtesting**- catch issues early
2.**Log data quality metrics**for each backtest
3.**Set up alerts**for unusual data patterns
4.**Document known data issues** (e.g., exchange downtime)

## Troubleshooting

### Common Issues

| Issue | Solution |

|-------|----------|

| Data gaps in series | Use `ffill()` or detect and mark gap periods |

| Duplicate timestamps | `df.drop_duplicates(subset=['datetime'])` |

| Wrong timezone | Standardize all data to UTC |

| Memory errors | Use `qbuffer()` or process in chunks |

| Slow loading | Convert CSV to Parquet format |

| Missing bars | Implement backfill logic |

| Invalid OHLC | Validate and filter/correct |

## Next Steps

- [Performance Optimization](performance-optimization.md) - Speed up your backtests
- [TS Mode Guide](ts-mode.md) - Time series optimization for large datasets
- [Live Trading Guide](../CCXT_LIVE_TRADING_GUIDE.md) - Real trading with CCXT