title: 多策略回测 description: 在 backtrader 中运行和管理多个策略的指南


多策略回测

同时运行多个策略可以分散交易方法、比较绩效并构建稳健的交易系统。本指南介绍在 backtrader 中进行多策略组合管理的技术。

快速开始

基础多策略设置

import backtrader as bt

cerebro = bt.Cerebro()

# 添加多个策略

cerebro.addstrategy(MomentumStrategy, period=20)
cerebro.addstrategy(MeanReversionStrategy, period=10)
cerebro.addstrategy(BreakoutStrategy, period=50)

# 运行 - 所有策略共享同一个经纪商

results = cerebro.run()

# 每个策略结果单独返回

for i, strat in enumerate(results):
    print(f"策略 {i}: 最终价值 {strat.broker.getvalue()}")

```bash

## 策略组合管理

### 等权重分配

```python
class EqualWeightStrategy(bt.Strategy):
    """等权重多策略组合的基础类。"""

    params = (
        ('weight', 0.33),  # 3 个策略等权重分配
        ('max_position', 0.95),
    )

    def __init__(self):
        self.order = None
        self.target_value = self.broker.getvalue() *self.p.weight

    def next(self):
        current_value = self.broker.getvalue()*self.p.weight

        if self.signal() and not self.position:

# 使用分配的资金买入
            size = int(current_value / self.data.close[0])
            self.buy(size=size)
        elif not self.signal() and self.position:
            self.close()

    def signal(self):

# 在子类中重写
        return False

```bash

### 风险平价分配

```python
class RiskParityStrategy(bt.Strategy):
    """根据策略波动率分配资金。"""

    params = (
        ('lookback', 20),
        ('target_risk', 0.02),  # 2% 日风险
    )

    def __init__(self):
        self.atr = bt.indicators.ATR(self.data, period=self.p.lookback)
        self.volatility = self.atr / self.data.close

    def get_position_size(self):
        """根据波动率计算持仓大小。"""
        risk_per_share = self.atr[0]
        account_risk = self.broker.getvalue()*self.p.target_risk
        return int(account_risk / risk_per_share) if risk_per_share > 0 else 0

```bash

## 资源分配

### 资金分配策略

```python
class CapitalAllocator(bt.Strategy):
    """在策略之间动态分配资金。"""

    params = (
        ('rebalance_freq', 20),  # 每 20 根 K 线再平衡
        ('min_allocation', 0.1),  # 最小 10%分配
    )

    def __init__(self):
        self.strategies = []
        self.allocations = []
        self.last_rebalance = 0

    def add_strategy(self, strategy, allocation):
        """添加策略及其目标分配比例。"""
        self.strategies.append(strategy)
        self.allocations.append(allocation)

    def next(self):
        if len(self.data) - self.last_rebalance >= self.p.rebalance_freq:
            self.rebalance()
            self.last_rebalance = len(self.data)

    def rebalance(self):
        """根据绩效再平衡资金。"""

# 实现取决于分配方法
        pass

```bash

### 佣金分摊

```python
class CommissionSplitter(bt.CommissionInfo):
    """在多个策略之间按比例分摊佣金。"""

    params = (('strategies', []),)

    def getcommission(self, size, price):
        comm = super().getcommission(size, price)

# 如果涉及多个策略,分摊佣金
        return comm / len(self.p.strategies) if self.p.strategies else comm

```bash

## 结果聚合

### 组合层面分析

```python
class PortfolioAnalyzer(bt.Analyzer):
    """分析所有策略的合并绩效。"""

    def __init__(self):
        self.returns = []
        self.drawdowns = []

    def next(self):
        total_value = self.strategy.broker.getvalue()
        self.returns.append(total_value)

    def get_analysis(self):
        import numpy as np

        returns_array = np.array(self.returns)
        cumulative_returns = (returns_array / returns_array[0]) - 1

# 计算滚动最大值
        running_max = np.maximum.accumulate(returns_array)
        drawdowns = (returns_array - running_max) / running_max

        return {
            'total_return': cumulative_returns[-1],
            'max_drawdown': drawdowns.min(),
            'final_value': returns_array[-1],
            'returns_series': self.returns,
        }

# 使用方法

cerebro.addanalyzer(PortfolioAnalyzer, _name='portfolio')
results = cerebro.run()
portfolio_analysis = results[0].analyzers.portfolio.get_analysis()

```bash

### 多策略比较

```python
def compare_strategies(strategies, data_path):
    """运行并比较多个策略。"""
    results_summary = []

    for strat_class in strategies:
        cerebro = bt.Cerebro()
        cerebro.adddata(bt.feeds.CSVData(dataname=data_path))
        cerebro.addstrategy(strat_class)
        cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
        cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
        cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')

        result = cerebro.run()[0]

        summary = {
            'strategy': strat_class.__name__,
            'sharpe': result.analyzers.sharpe.get_analysis().get('sharperatio'),
            'max_dd': result.analyzers.drawdown.get_analysis()['max']['drawdown'],
            'return': result.analyzers.returns.get_analysis()['rnorm'],
        }
        results_summary.append(summary)

# 打印比较表格
    print(f"{'策略':<20} {'夏普':>10} {'最大回撤':>10} {'收益':>10}")
    print("-"* 52)
    for s in results_summary:
        print(f"{s['strategy']:<20} {s['sharpe']:>10.2f} {s['max_dd']:>10.2f} {s['return']:>10.2%}")

    return results_summary

```bash

## 策略相关性分析

### 计算相关性

```python
def calculate_strategy_correlations(strategies, data_path):
    """计算策略之间的收益率相关性。"""
    from scipy.stats import pearsonr
    import pandas as pd

# 收集每个策略的收益
    all_returns = {}

    for strat_class in strategies:
        cerebro = bt.Cerebro()
        cerebro.adddata(bt.feeds.CSVData(dataname=data_path))
        cerebro.addstrategy(strat_class)
        cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='returns')

        result = cerebro.run()[0]
        returns_dict = result.analyzers.returns.get_analysis()
        all_returns[strat_class.__name__] = pd.Series(returns_dict)

# 计算相关矩阵
    returns_df = pd.DataFrame(all_returns)
    correlation_matrix = returns_df.corr()

    return correlation_matrix

# 使用方法

strategies = [MomentumStrategy, MeanReversionStrategy, BreakoutStrategy]
corr_matrix = calculate_strategy_correlations(strategies, 'data.csv')
print(corr_matrix)

```bash

### 低相关性组合

```python
class LowCorrelationSelector(bt.Strategy):
    """选择彼此相关性低的策略。"""

    params = (
        ('max_correlation', 0.7),
        ('min_strategies', 2),
    )

    def __init__(self):
        self.selected_strategies = []
        self.returns_history = {s: [] for s in self.p.strategies}

    def calculate_correlation(self, returns1, returns2):
        """计算两个收益率序列之间的相关性。"""
        import numpy as np
        return np.corrcoef(returns1, returns2)[0, 1]

    def select_strategies(self):
        """选择相关性低于阈值的策略。"""
        selected = [self.p.strategies[0]]  # 从第一个策略开始

        for candidate in self.p.strategies[1:]:

# 检查与所有已选策略的相关性
            correlations = [
                self.calculate_correlation(
                    self.returns_history[candidate],
                    self.returns_history[selected_strat]
                )
                for selected_strat in selected
            ]

            if all(c < self.p.max_correlation for c in correlations):
                selected.append(candidate)

        return selected[:self.p.max_strategies]

```bash

## 并行执行

### 多进程优化

```python
from multiprocessing import Pool
import itertools

def run_strategy_backtest(params):
    """使用给定参数运行单个回测。"""
    strat_class, data_path, strat_params = params

    cerebro = bt.Cerebro(stdstats=False)
    cerebro.adddata(bt.feeds.CSVData(dataname=data_path))
    cerebro.addstrategy(strat_class, **strat_params)

    result = cerebro.run()[0]
    return {
        'params': strat_params,
        'final_value': cerebro.broker.getvalue(),
        'sharpe': result.analyzers.sharpe.get_analysis().get('sharperatio', 0),
    }

def parallel_optimize(strat_class, data_path, param_grid, n_workers=4):
    """并行优化策略参数。"""

# 生成所有参数组合
    param_combinations = list(itertools.product(*param_grid.values()))
    param_dicts = [dict(zip(param_grid.keys(), combo)) for combo in param_combinations]

# 为每个 worker 创建参数元组
    params_list = [(strat_class, data_path, p) for p in param_dicts]

# 并行运行
    with Pool(n_workers) as pool:
        results = pool.map(run_strategy_backtest, params_list)

# 按夏普比率排序
    results.sort(key=lambda x: x['sharpe'], reverse=True)
    return results

```bash

### 独立策略执行

```python
def run_strategies_independent(strategies_config):
    """独立运行策略并合并结果。"""
    import concurrent.futures

    def run_single(config):
        cerebro = bt.Cerebro()
        cerebro.adddata(bt.feeds.CSVData(dataname=config['data']))
        cerebro.addstrategy(config['strategy'], **config.get('params', {}))
        cerebro.broker.setcash(config.get('cash', 100000))

        result = cerebro.run()[0]
        return {
            'name': config['name'],
            'return': cerebro.broker.getvalue() / config.get('cash', 100000) - 1,
        }

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(run_single, config) for config in strategies_config]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]

    return results

```bash

## 跨策略风险管理

### 组合层面止损

```python
class PortfolioStopLoss(bt.Strategy):
    """在所有策略中实现组合层面止损。"""

    params = (
        ('max_drawdown', 0.15),  # 15% 最大回撤
        ('stop_trading', False),
    )

    def __init__(self):
        self.peak_value = self.broker.getvalue()
        self.trading_stopped = False

    def next(self):
        current_value = self.broker.getvalue()

# 更新峰值
        if current_value > self.peak_value:
            self.peak_value = current_value

# 计算回撤
        drawdown = (self.peak_value - current_value) / self.peak_value

# 如果超过最大回撤则停止交易
        if drawdown >= self.p.max_drawdown and not self.trading_stopped:
            self.trading_stopped = True
            self.close()  # 平掉所有仓位

        if self.trading_stopped:
            return  # 跳过所有交易逻辑

# 正常策略逻辑
        self.execute_strategy()

    def execute_strategy(self):
        """在子类中重写。"""
        pass

```bash

### 持仓层面风险控制

```python
class MultiStrategyPositionSizer(bt.Sizer):
    """考虑所有策略持仓的仓位管理。"""

    params = (
        ('max_total_exposure', 0.95),  # 最大 95%组合敞口
        ('max_single_position', 0.20),  # 最大 20%单仓位
    )

    def _getsizing(self, comminfo, cash, data, isbuy):
        total_value = self.strategy.broker.getvalue()
        current_exposure = abs(self.strategy.broker.getvalue() -
                               self.strategy.broker.get_cash()) / total_value

# 计算可用容量
        available = self.p.max_total_exposure - current_exposure
        if available <= 0:
            return 0  # 新仓位无容量

# 计算持仓大小
        max_size = (total_value * min(available, self.p.max_single_position))
        price = data.close[0]
        return int(max_size / price) if price > 0 else 0

```bash

## 完整示例

### 多策略组合系统

```python
import backtrader as bt
import pandas as pd
from datetime import datetime

# 策略 1: 动量策略

class MomentumStrategy(bt.Strategy):
    """基于 RSI 的动量策略。"""

    params = (('rsi_period', 14), ('oversold', 30), ('overbought', 70))

    def __init__(self):
        self.rsi = bt.indicators.RSI(self.data.close, period=self.p.rsi_period)
        self.signal = 0  # 1=买入, -1=卖出, 0=持有

    def next(self):
        if self.rsi[0] < self.p.oversold and not self.position:
            self.buy(size=self.sizer.get_size(self))
            self.signal = 1
        elif self.rsi[0] > self.p.overbought and self.position:
            self.close()
            self.signal = -1
        else:
            self.signal = 0


# 策略 2: 均值回归策略

class MeanReversionStrategy(bt.Strategy):
    """使用布林带的均值回归策略。"""

    params = (('period', 20), ('devfactor', 2.0))

    def __init__(self):
        self.boll = bt.indicators.BollingerBands(
            self.data.close,
            period=self.p.period,
            devfactor=self.p.devfactor
        )
        self.signal = 0

    def next(self):
        if self.data.close[0] < self.boll.lines.bot[0] and not self.position:
            self.buy(size=self.sizer.get_size(self))
            self.signal = 1
        elif self.data.close[0] > self.boll.lines.top[0] and self.position:
            self.close()
            self.signal = -1
        else:
            self.signal = 0


# 策略 3: 趋势跟踪策略

class TrendFollowingStrategy(bt.Strategy):
    """使用均线交叉的趋势跟踪策略。"""

    params = (('fast_period', 10), ('slow_period', 30))

    def __init__(self):
        self.fast_ma = bt.indicators.SMA(self.data.close, period=self.p.fast_period)
        self.slow_ma = bt.indicators.SMA(self.data.close, period=self.p.slow_period)
        self.crossover = bt.ind.CrossOver(self.fast_ma, self.slow_ma)
        self.signal = 0

    def next(self):
        if self.crossover[0] > 0 and not self.position:
            self.buy(size=self.sizer.get_size(self))
            self.signal = 1
        elif self.crossover[0] < 0 and self.position:
            self.close()
            self.signal = -1
        else:
            self.signal = 0


# 组合管理器

class MultiStrategyPortfolio(bt.Strategy):
    """组合多个策略的组合管理器。"""

    params = (
        ('strategies', []),
        ('weights', None),  # None = 等权重
        ('rebalance_freq', 5),
    )

    def __init__(self):

# 存储策略实例
        self.strategy_instances = []
        for strat_params in self.p.strategies:
            strat_class = strat_params['class']
            strat_instance = strat_class(**strat_params.get('params', {}))
            self.strategy_instances.append(strat_instance)

# 设置权重
        if self.p.weights is None:
            self.weights = [1.0 / len(self.strategy_instances)] *len(self.strategy_instances)
        else:
            self.weights = self.p.weights

# 跟踪分配
        self.allocations = [0.0]*len(self.strategy_instances)
        self.last_rebalance = 0

    def next(self):

# 获取所有策略的信号
        signals = []
        for i, strat in enumerate(self.strategy_instances):

# 执行策略逻辑
            strat.next()
            signals.append(strat.signal)

# 如需则再平衡
        if len(self.data) - self.last_rebalance >= self.p.rebalance_freq:
            self.rebalance()
            self.last_rebalance = len(self.data)

    def rebalance(self):
        """根据目标权重再平衡组合。"""
        total_value = self.broker.getvalue()

        for i, weight in enumerate(self.weights):
            target_value = total_value*weight
            current_value = self.get_strategy_value(i)

            if current_value < target_value*0.95:  # 低配

# 买入达到目标
                pass
            elif current_value > target_value*1.05:  # 超配

# 卖出达到目标
                pass

    def get_strategy_value(self, index):
        """获取指定索引策略的当前价值。"""

# 实现取决于跟踪方法
        return self.broker.getvalue() / len(self.strategy_instances)


# 自定义仓位管理

class EqualWeightSizer(bt.Sizer):
    """多策略组合的等权重仓位管理。"""

    params = (('num_strategies', 3), ('target_weight', 0.33))

    def _getsizing(self, comminfo, cash, data, isbuy):
        total_value = self.strategy.broker.getvalue()
        target_value = total_value*self.p.target_weight
        return int(target_value / data.close[0]) if data.close[0] > 0 else 0


# 运行组合

def run_multi_strategy_portfolio(data_path):
    """运行多策略组合回测。"""

    cerebro = bt.Cerebro()

# 添加数据
    data = bt.feeds.CSVData(dataname=data_path)
    cerebro.adddata(data)

# 添加组合策略
    strategies_config = [
        {'class': MomentumStrategy, 'params': {'rsi_period': 14}},
        {'class': MeanReversionStrategy, 'params': {'period': 20}},
        {'class': TrendFollowingStrategy, 'params': {'fast_period': 10, 'slow_period': 30}},
    ]

    cerebro.addstrategy(
        MultiStrategyPortfolio,
        strategies=strategies_config,
        weights=[0.3, 0.3, 0.4],  # 自定义权重
    )

# 设置经纪商
    cerebro.broker.setcash(100000)
    cerebro.broker.setcommission(commission=0.001)

# 添加仓位管理
    cerebro.addsizer(EqualWeightSizer, num_strategies=3, target_weight=0.33)

# 添加分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')

# 运行
    results = cerebro.run()
    strat = results[0]

# 打印结果
    print("\n" + "="*50)
    print("多策略组合回测结果")
    print("="*50)
    print(f"最终价值: {cerebro.broker.getvalue():.2f}")
    print(f"夏普比率: {strat.analyzers.sharpe.get_analysis().get('sharperatio', 'N/A')}")
    print(f"最大回撤: {strat.analyzers.drawdown.get_analysis()['max']['drawdown']:.2f}%")
    print(f"年化收益: {strat.analyzers.returns.get_analysis().get('rnorm', 0):.2%}")
    print("="* 50)

    return results


if __name__ == '__main__':

# 运行组合
    results = run_multi_strategy_portfolio('data.csv')

```bash

## 最佳实践

### 策略选择

1. **多元化**: 组合适应不同市场条件的策略
2. **低相关性**: 选择走势不完全同步的策略
3. **互补信号**: 使用相互确认的策略

### 风险管理

1. **组合层面控制**: 实施最大回撤限制
2. **仓位管理**: 在策略间使用一致的仓位管理
3. **资金分配**: 不要过度分配给相似策略

### 绩效监控

1. **单独指标**: 分别跟踪每个策略
2. **组合指标**: 监控组合层面绩效
3. **归因分析**: 了解哪些策略贡献最大

## 相关阅读

- [性能优化](performance-optimization_zh.md) - 加速回测
- [TS 模式指南](ts-mode_zh.md) - 时间序列优化
- [CS 模式指南](cs-mode_zh.md) - 组合的横截面模式