Backtrader 故障排除指南¶
本指南提供了在使用 Backtrader 过程中可能遇到的常见问题的诊断技术和解决方案。
目录¶
错误诊断技术¶
1. 错误类型识别¶
Backtrader 中的错误通常分为以下几类:
1.1 初始化错误¶
症状*:在创建策略或指标时出现错误
常见原因*:
参数未正确初始化
依赖对象(如数据源)未正确加载
继承结构问题
诊断步骤*:
# 检查参数是否正确初始化
class MyStrategy(bt.Strategy):
params = (
('period', 20),
('printlog', False),
)
def __init__(self):
# 确保在访问参数前调用父类 __init__
super().__init__()
# 现在可以安全访问参数
print(f"参数值: {self.p.period}")
```bash
#### 1.2 数据加载错误
- *症状**:IndexError、KeyError 或数据缺失
- *常见原因**:
- CSV 文件格式不正确
- 数据列名称不匹配
- 时间索引问题
- *诊断步骤**:
```python
# 在加载数据后进行验证
data = bt.feeds.CSVFeed(
dataname='data.csv',
datetime=0,
open=1,
high=2,
low=3,
close=4,
volume=5,
)
# 添加到 Cerebro 前验证
cerebro = bt.Cerebro()
cerebro.adddata(data)
# 预运行检查
cerebro.run(precheck=True) # 仅验证数据完整性
```bash
#### 1.3 执行时错误
- *症状**:策略运行时崩溃或产生意外结果
- *常见原因**:
- 订单参数无效
- 指标计算错误
- 资金不足
- *诊断步骤**:
```python
class MyStrategy(bt.Strategy):
def next(self):
# 添加安全检查
if len(self.data) < self.p.period:
return # 数据不足
# 检查是否有足够资金
if self.broker.get_cash() < self.data.close[0] * self.p.size:
return # 资金不足
# 执行订单前验证
if not self.position:
self.buy(size=self.p.size)
```bash
- --
## 调试技巧和工具
### 1. 策略调试 (使用 pdb)
#### 1.1 基本调试设置
```python
import bt
import pdb
class DebugStrategy(bt.Strategy):
def __init__(self):
# 设置断点
pdb.set_trace()
self.sma = bt.indicators.SMA(self.data.close, period=20)
def next(self):
# 条件断点
if len(self) == 100: # 在第 100 根 K 线时暂停
pdb.set_trace()
# 正常逻辑
if self.sma[0] > self.data.close[0]:
self.sell()
```bash
#### 1.2 调试命令
| 命令 | 说明 |
|------|------|
| `n(ext)` | 执行下一行 |
| `s(tep)` | 进入函数内部 |
| `c(ontinue)` | 继续执行直到下一个断点 |
| `p(rint) var` | 打印变量值 |
| `pp var` | 美化打印变量 |
| `l(ist)` | 显示当前位置的代码 |
| `w(here)` | 显示调用栈 |
| `b(reak) line` | 在指定行设置断点 |
| `cl(ear)` | 清除断点 |
| `q(uit)` | 退出调试器 |
#### 1.3 高级调试技巧
```python
class AdvancedDebugStrategy(bt.Strategy):
def __init__(self):
# 导入调试器
import pdb
# 条件断点示例
self.debug_condition = False
# 打印所有指标
for obj in self._lineiterators[bt.Indicator.IndType]:
print(f"注册的指标: {obj.__class__.__name__}")
def next(self):
# 使用条件断点
if self.debug_condition:
import pdb
pdb.set_trace()
# 检查策略状态
if len(self) % 100 == 0: # 每 100 根 K 线打印一次
self.log_debug()
def log_debug(self):
"""打印当前状态的调试信息"""
print(f"日期: {self.data.datetime.date(0)}")
print(f"收盘价: {self.data.close[0]:.2f}")
print(f"持仓: {self.position.size}")
print(f"现金: {self.broker.get_cash():.2f}")
print(f"资产: {self.broker.get_value():.2f}")
```bash
### 2. 日志记录
#### 2.1 基本日志设置
```python
import backtrader as bt
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('backtrader.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class LoggingStrategy(bt.Strategy):
params = (
('printlog', True),
)
def log(self, txt, dt=None):
"""统一的日志输出方法"""
if self.p.printlog:
dt = dt or self.data.datetime.datetime(0)
logger.info(f'{dt.isoformat()} {txt}')
def __init__(self):
self.log('策略初始化')
def next(self):
self.log(f'收盘价: {self.data.close[0]:.2f}')
def notify_order(self, order):
self.log(f'订单状态: {order.getstatusname()}')
```bash
#### 2.2 结构化日志记录
```python
import json
import logging
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name='backtrader'):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
# JSON 格式处理器
handler = logging.FileHandler('backtrader_structured.log')
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_event(self, event_type, **kwargs):
"""记录结构化事件"""
event = {
'timestamp': datetime.datetime.now().isoformat(),
'event_type': event_type,
- *kwargs
}
self.logger.info(json.dumps(event))
# 使用示例
class StrategyWithStructuredLogging(bt.Strategy):
def __init__(self):
self.logger = StructuredLogger()
self.logger.log_event('strategy_init', strategy=self.__class__.__name__)
def next(self):
self.logger.log_event(
'bar_data',
close=self.data.close[0],
volume=self.data.volume[0],
position=self.position.size
)
```bash
#### 2.3 性能日志记录
```python
import time
from functools import wraps
def log_performance(func):
"""性能日志装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} 耗时: {(end - start) * 1000:.2f}ms")
return result
return wrapper
class PerformanceLoggedStrategy(bt.Strategy):
@log_performance
def next(self):
# 策略逻辑
pass
@log_performance
def notify_order(self, order):
# 订单处理逻辑
pass
```bash
### 3. 数据源问题 (缺失柱、时区问题)
#### 3.1 缺失数据柱诊断
```python
class DataIntegrityCheck(bt.Strategy):
"""数据完整性检查策略"""
def start(self):
self.expected_bars = []
self.actual_bars = []
self.gap_threshold = 1 # 允许的最大间隔
def next(self):
current_date = self.data.datetime.date(0)
# 记录实际数据
self.actual_bars.append(current_date)
# 检查数据间隔
if len(self) > 1:
prev_date = self.data.datetime.date(-1)
gap = (current_date - prev_date).days
# 如果间隔超过阈值,记录警告
if gap > self.gap_threshold:
print(f"警告: 检测到数据间隔 {gap} 天")
print(f"从 {prev_date} 到 {current_date}")
def stop(self):
"""报告数据完整性"""
print(f"\n 数据完整性报告:")
print(f"预期数据点: {len(self.expected_bars)}")
print(f"实际数据点: {len(self.actual_bars)}")
# 计算缺失天数
missing = set(self.expected_bars) - set(self.actual_bars)
if missing:
print(f"缺失日期: {sorted(missing)}")
```bash
#### 3.2 时区问题处理
```python
import pytz
class TimeZoneAwareData(bt.feeds.CSVFeed):
"""时区感知的数据源"""
params = (
('tz', 'Asia/Shanghai'), # 输入数据时区
('tz_out', 'UTC'), # 输出时区
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 设置时区
self.p.tz = pytz.timezone(self.p.tz)
self.p.tz_out = pytz.timezone(self.p.tz_out)
def _loadline(self, line_no):
"""加载单行数据并转换时区"""
ret = super()._loadline(line_no)
if ret:
# 转换时区
dt = self.lines.datetime[0]
if dt.tzinfo is None:
dt = self.p.tz.localize(dt)
self.lines.datetime[0] = dt.astimezone(self.p.tz_out)
return ret
# 使用示例
cerebro = bt.Cerebro()
data = TimeZoneAwareData(
dataname='data.csv',
tz='US/Eastern', # 数据原始时区
tz_out='UTC' # 转换为 UTC
)
cerebro.adddata(data)
```bash
#### 3.3 数据填充处理
```python
# 方法 1: 使用数据填充器
cerebro = bt.Cerebro()
data = bt.feeds.CSVFeed(dataname='data.csv')
# 添加填充器以处理缺失数据
cerebro.adddata(data)
cerebro.run(runfill=True) # 自动填充缺失数据
# 方法 2: 使用数据过滤器
from backtrader.filters import DataFiller
data = bt.feeds.CSVFeed(dataname='data.csv')
data.addfilter(DataFiller) # 添加填充过滤器
cerebro.adddata(data)
```bash
### 4. 订单执行问题 (拒绝订单、部分成交)
#### 4.1 订单拒绝诊断
```python
class OrderDiagnosticStrategy(bt.Strategy):
"""订单诊断策略"""
def notify_order(self, order):
"""订单状态通知"""
order_status = order.getstatusname()
if order_status in ['Rejected', 'Canceled', 'Margin']:
print(f"\n 订单被拒绝/取消:")
print(f"状态: {order_status}")
print(f"类型: {order.ordtypename()}")
print(f"价格: {order.created.price:.2f}")
print(f"数量: {order.created.size}")
print(f"当前现金: {self.broker.get_cash():.2f}")
print(f"当前持仓: {self.position.size}")
# 检查拒绝原因
if order_status == 'Rejected':
self.diagnose_rejection(order)
elif order_status == 'Completed':
print(f"订单完成: {order.ordtypename()} "
f"{abs(order.executed.size)} @ "
f"{order.executed.price:.2f}")
def diagnose_rejection(self, order):
"""诊断订单被拒绝的原因"""
# 检查资金
cash = self.broker.get_cash()
required = order.created.price *order.created.size
if cash < required:
print(f"原因: 资金不足 (需要: {required:.2f}, 可用: {cash:.2f})")
# 检查持仓限制
if order.ordtype == order.Buy and self.position:
print(f"原因: 已有持仓 {self.position.size}")
# 检查数据有效性
if len(self.data) == 0:
print(f"原因: 无有效数据")
```bash
#### 4.2 部分成交处理
```python
class PartialFillHandling(bt.Strategy):
"""处理部分成交订单"""
def notify_order(self, order):
if order.status == order.Partial:
print(f"\n 部分成交:")
print(f"已成交: {order.executed.size} / {order.created.size}")
print(f"成交价: {order.executed.price:.2f}")
# 处理部分成交
self.handle_partial_fill(order)
elif order.status == order.Completed:
self.on_order_complete(order)
def handle_partial_fill(self, order):
"""处理部分成交逻辑"""
remaining = order.created.size - order.executed.size
if remaining > 0:
print(f"剩余数量: {remaining}")
# 可以选择:
# 1. 取消剩余部分
# self.cancel(order)
# 2. 调整剩余订单
# 3. 等待完全成交
def on_order_complete(self, order):
"""订单完全成交"""
print(f"订单完全成交: {order.executed.size} @ {order.executed.price:.2f}")
```bash
#### 4.3 订单执行验证
```python
class OrderValidationStrategy(bt.Strategy):
"""订单验证策略"""
def validate_order_params(self, order_type, size, price=None):
"""验证订单参数"""
errors = []
# 检查数量
if size <= 0:
errors.append("订单数量必须大于 0")
# 检查价格
if price is not None and price <= 0:
errors.append("订单价格必须大于 0")
# 检查资金
cash = self.broker.get_cash()
if order_type == 'buy' and price:
required = price* size
if cash < required:
errors.append(f"资金不足: 需要 {required:.2f}, 可用 {cash:.2f}")
# 检查持仓
if order_type == 'sell' and abs(self.position.size) < size:
errors.append(f"持仓不足: 需要 {size}, 可用 {abs(self.position.size)}")
return errors
def safe_buy(self, size, price=None):
"""安全的买入订单"""
errors = self.validate_order_params('buy', size, price)
if errors:
print(f"买入订单验证失败:")
for error in errors:
print(f" - {error}")
return None
return self.buy(size=size, price=price)
def safe_sell(self, size, price=None):
"""安全的卖出订单"""
errors = self.validate_order_params('sell', size, price)
if errors:
print(f"卖出订单验证失败:")
for error in errors:
print(f" - {error}")
return None
return self.sell(size=size, price=price)
```bash
### 5. 性能瓶颈 (分析、优化)
#### 5.1 性能分析工具
```python
import cProfile
import pstats
import io
def profile_backtest(cerebro):
"""分析回测性能"""
# 创建性能分析器
pr = cProfile.Profile()
pr.enable()
# 运行回测
result = cerebro.run()
# 停止性能分析
pr.disable()
# 输出结果
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20) # 打印前 20 个耗时最多的函数
print(s.getvalue())
return result
# 使用示例
cerebro = bt.Cerebro()
# ... 添加数据和策略 ...
# 运行性能分析
profile_backtest(cerebro)
```bash
#### 5.2 内存使用分析
```python
import tracemalloc
def profile_memory(cerebro):
"""分析内存使用"""
# 开始内存跟踪
tracemalloc.start()
# 运行回测
result = cerebro.run()
# 获取内存快照
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("\n 内存使用前 10:")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
return result
```bash
#### 5.3 性能优化技巧
```python
class OptimizedStrategy(bt.Strategy):
"""性能优化策略示例"""
params = (
('use_once', True), # 使用 once() 模式
('preload', True), # 预加载数据
('qbuffer', 100), # 限制缓冲区大小
)
def __init__(self):
# 缓存频繁访问的数据
self._close = self.data.close
self._volume = self.data.volume
# 使用更高效的指标
self.sma = bt.indicators.SMA(self._close, period=20)
# 方法 1: 使用 once() 模式 (批量处理)
def next(self):
"""标准 next() 模式 - 逐根 K 线处理"""
pass
# 如果计算允许,使用 once() 可以大幅提升性能
# def once(self, start, end):
# """批量处理模式 - 一次性处理多根 K 线"""
# # 向量化操作
# pass
def next(self):
"""优化后的 next() 方法"""
# 避免重复计算
current_close = self._close[0]
current_sma = self.sma[0]
# 使用局部变量减少属性访问
position_size = self.position.size
# 简化条件判断
if current_close > current_sma and position_size == 0:
self.buy()
# 使用预加载和缓冲限制
cerebro = bt.Cerebro()
# 预加载数据到内存
cerebro.adddata(data, preload=True)
# 限制缓冲区大小 (节省内存)
cerebro.adddata(data, qbuffer=100)
# 运行优化
results = cerebro.run(maxcpu=1) # 单进程运行
```bash
#### 5.4 多进程优化
```python
from multiprocessing import cpu_count
# 参数优化时使用多进程
cerebro = bt.Cerebro()
# 添加策略和数据
cerebro.optstrategy(
MyStrategy,
period=[10, 20, 30, 50],
devfactor=[1, 2, 3]
)
# 使用所有 CPU 核心
cerebro.run(maxcpu=cpu_count())
```bash
### 6. 平台特定问题 (Windows, macOS, Linux)
#### 6.1 Windows 特定问题
```python
# Windows 路径处理
import os
from pathlib import Path
class WindowsCompatibleFeed(bt.feeds.CSVFeed):
"""Windows 兼容的数据源"""
def __init__(self, **kwargs):
# 处理 Windows 路径
if 'dataname' in kwargs:
dataname = kwargs['dataname']
if isinstance(dataname, str):
# 使用 Path 处理路径
kwargs['dataname'] = str(Path(dataname))
super().__init__(**kwargs)
# Windows 文件编码问题
data = bt.feeds.CSVFeed(
dataname='data.csv',
encoding='utf-8', # 显式指定编码
csvcustomdelimiter=',', # Windows 可能使用分号
)
```bash
#### 6.2 macOS 特定问题
```python
# macOS 图形后端问题
import matplotlib
matplotlib.use('TkAgg') # 或 'Qt5Agg'
import matplotlib.pyplot as plt
# macOS 多进程问题
if __name__ == '__main__':
# Windows/macOS 需要此保护
cerebro = bt.Cerebro()
# ... 添加策略和数据 ...
cerebro.run()
```bash
#### 6.3 Linux 特定问题
```python
# Linux 文件权限
import os
def ensure_file_accessible(filepath):
"""确保文件可访问"""
if not os.access(filepath, os.R_OK):
raise PermissionError(f"无法读取文件: {filepath}")
return filepath
# 使用
data = bt.feeds.CSVFeed(
dataname=ensure_file_accessible('/path/to/data.csv')
)
```bash
### 7. 内存泄漏和资源管理
#### 7.1 内存泄漏检测
```python
import gc
import sys
class MemoryLeakDetector(bt.Strategy):
"""内存泄漏检测策略"""
def __init__(self):
self.initial_objects = len(gc.get_objects())
def next(self):
# 每 1000 根 K 线检查一次
if len(self) % 1000 == 0:
gc.collect()
current_objects = len(gc.get_objects())
increase = current_objects - self.initial_objects
if increase > 10000: # 对象增加超过 10000
print(f"警告: 可能存在内存泄漏")
print(f"对象增加: {increase}")
# 打印对象类型统计
obj_types = {}
for obj in gc.get_objects():
obj_type = type(obj).__name__
obj_types[obj_type] = obj_types.get(obj_type, 0) + 1
# 打印前 10 个最多的对象类型
sorted_types = sorted(obj_types.items(),
key=lambda x: x[1],
reverse=True)[:10]
for obj_type, count in sorted_types:
print(f" {obj_type}: {count}")
```bash
#### 7.2 资源清理
```python
class ResourceManagedStrategy(bt.Strategy):
"""资源管理策略"""
def __init__(self):
# 跟踪打开的资源
self._files = []
self._connections = []
def open_file(self, filepath):
"""安全打开文件"""
f = open(filepath, 'r')
self._files.append(f)
return f
def stop(self):
"""清理资源"""
# 关闭所有打开的文件
for f in self._files:
try:
f.close()
except Exception as e:
print(f"关闭文件错误: {e}")
# 关闭所有连接
for conn in self._connections:
try:
conn.close()
except Exception as e:
print(f"关闭连接错误: {e}")
print("资源已清理")
```bash
#### 7.3 循环引用处理
```python
class CircularReferenceSafe(bt.Strategy):
"""避免循环引用的策略"""
def __init__(self):
# 使用弱引用避免循环引用
import weakref
# 不要直接存储对父对象的引用
# 错误示例:
# self.parent_ref = self.datas[0]
# 正确示例 - 使用弱引用
self._data_ref = weakref.ref(self.datas[0])
def cleanup(self):
"""清理引用"""
self._data_ref = None
```bash
- --
## 日志分析
### 1. 日志级别
| 级别 | 用途 | 示例 |
|------|------|------|
| DEBUG | 详细调试信息 | 指标计算中间值 |
| INFO | 常规信息 | 订单状态变化 |
| WARNING | 警告 | 数据间隔检测 |
| ERROR | 错误 | 订单被拒绝 |
| CRITICAL | 严重错误 | 系统崩溃 |
### 2. 日志格式
```python
import logging
# 配置结构化日志
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - '
'[%(filename)s:%(lineno)d] - %(message)s'
)
handler = logging.FileHandler('backtrader.log')
handler.setFormatter(formatter)
logger = logging.getLogger('backtrader')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
```bash
### 3. 日志分析工具
```python
import re
from collections import Counter
def analyze_log_file(logfile='backtrader.log'):
"""分析日志文件"""
with open(logfile, 'r') as f:
lines = f.readlines()
# 统计日志级别
level_pattern = r'-(DEBUG|INFO|WARNING|ERROR|CRITICAL)-'
levels = Counter()
for line in lines:
match = re.search(level_pattern, line)
if match:
levels[match.group(1)] += 1
print("日志级别统计:")
for level, count in levels.most_common():
print(f" {level}: {count}")
# 提取错误信息
errors = []
for line in lines:
if '-ERROR-' in line or '-CRITICAL-' in line:
errors.append(line.strip())
if errors:
print(f"\n 发现 {len(errors)} 个错误:")
for error in errors[:10]: # 显示前 10 个
print(f" {error}")
# 使用
analyze_log_file()
```bash
- --
## 常见错误模式
### 1. IndexError: array index out of range
- *原因**:访问不存在的历史数据
```python
# 错误示例
def next(self):
value = self.data.close[-100] # 可能不存在
# 正确示例
def next(self):
if len(self) > 100:
value = self.data.close[-100]
```bash
### 2. AttributeError: 'NoneType' object has no attribute
- *原因**:访问未初始化的对象
```python
# 错误示例
def __init__(self):
self.indicator = bt.indicators.SMA(self.data.close, period=20)
self.signal = self.indicator crossover self.data.close # 错误语法
# 正确示例
def __init__(self):
self.sma = bt.indicators.SMA(self.data.close, period=20)
self.crossover = bt.indicators.CrossOver(self.sma, self.data.close)
```bash
### 3. ZeroDivisionError
- *原因**:除以零
```python
# 错误示例
def next(self):
returns = (self.data.close[0] - self.data.close[-1]) / self.data.close[-1]
# 正确示例
def next(self):
if self.data.close[-1] != 0:
returns = (self.data.close[0] - self.data.close[-1]) / self.data.close[-1]
else:
returns = 0
```bash
### 4. TypeError: 'LineSeries' object is not subscriptable
- *原因**:错误的数据访问方式
```python
# 错误示例
def next(self):
value = self.data.close(0) # 括号而非方括号
# 正确示例
def next(self):
value = self.data.close[0] # 方括号
```bash
- --
## 问题报告模板
在报告问题时,请使用以下模板:
```markdown
## 问题描述
简要描述遇到的问题
## 环境信息
- Python 版本: [例如: 3.9.7]
- Backtrader 版本: [例如: 1.9.78.123]
- 操作系统: [例如: Windows 10, macOS 12, Ubuntu 20.04]
- 安装方式: [pip install / git clone]
## 复现步骤
1. 第一步
2. 第二步
3. 第三步
## 预期行为
描述预期的正确行为
## 实际行为
描述实际发生的行为
## 最小复现代码
```python
# 提供可以复现问题的最小代码示例
import backtrader as bt
class TestStrategy(bt.Strategy):
pass
cerebro = bt.Cerebro()
cerebro.addstrategy(TestStrategy)
# ...
```bash
## 错误信息
```bash
粘贴完整的错误堆栈跟踪
```bash
## 日志文件
如果适用,提供相关日志文件
## 截图
如果适用,提供截图
## 附加信息
任何其他有助于解决问题的信息
```bash
- --
## 获取帮助资源
### 1. 官方资源
- **官方文档**: <https://www.backtrader.com/docu/>
- **GitHub 仓库**: <https://github.com/mementum/backtrader>
- **社区论坛**: <https://community.backtrader.com/>
### 2. 中文资源
- **中文文档**: (本项目的 docs/zh/ 目录)
- **中文教程**: (本项目的 docs/tutorials/zh/ 目录)
- **常见问题**: (本项目的 docs/support/faq_zh.md)
### 3. 学习资源
- **示例代码**: `samples/` 目录
- **测试用例**: `tests/` 目录
- **指标参考**: `docs/reference/indicators_zh.md`
### 4. 社区支持
- **GitHub Issues**: 报告 Bug 和功能请求
- **Stack Overflow**: 使用 `backtrader` 标签
- **Reddit**: r/algotrading
### 5. 联系方式
- **项目主页**: <https://github.com/your-org/backtrader>
- **邮件列表**: (待添加)
- **Discord/Slack**: (待添加)
- --
## 调试检查清单
在寻求帮助前,请检查以下项目:
- [ ] 确认使用的是最新版本
- [ ] 检查数据格式是否正确
- [ ] 验证参数值是否合理
- [ ] 查看完整的错误堆栈跟踪
- [ ] 尝试最小化复现代码
- [ ] 搜索现有问题是否已有解决方案
- [ ] 提供详细的日志信息
- [ ] 说明尝试过的解决方法
- --
## 附录: 常用调试代码片段
### 1. 打印策略状态
```python
def print_status(self):
"""打印当前策略状态"""
print(f"""
状态报告:
- 日期: {self.data.datetime.date(0)}
- 收盘价: {self.data.close[0]:.2f}
- 持仓: {self.position.size}
- 现金: {self.broker.get_cash():.2f}
- 资产: {self.broker.get_value():.2f}
- 未实现盈亏: {self.position.pnl:.2f}
""")
```bash
### 2. 数据验证
```python
def validate_data(data):
"""验证数据源"""
print(f"数据源名称: {data._name}")
print(f"数据点数量: {len(data)}")
print(f"时间范围: {data.datetime.date(0)} 到 {data.datetime.date(-1)}")
print(f"价格范围: {data.close.lowest(0)} 到 {data.close.highest(0)}")
```bash
### 3. 订单跟踪
```python
class OrderTracker(bt.Strategy):
def __init__(self):
self.orders = {} # 订单跟踪字典
def notify_order(self, order):
order_id = order.ref
self.orders[order_id] = {
'status': order.getstatusname(),
'type': order.ordtypename(),
'size': order.created.size,
'price': order.created.price,
}
print(f"订单 {order_id}: {self.orders[order_id]}")
```bash
- --
- 最后更新: 2025 年*
- 版本: 1.0*