backtrader.indicators.zlema 源代码
#!/usr/bin/env python
"""ZLEMA Indicator Module - Zero Lag Exponential Moving Average.
This module provides the ZLEMA (Zero Lag Exponential Moving Average)
indicator which aims to reduce lag in the standard EMA.
Classes:
ZeroLagExponentialMovingAverage: ZLEMA indicator (aliases: ZLEMA, ZeroLagEma).
Example:
class MyStrategy(bt.Strategy):
def __init__(self):
self.zlema = bt.indicators.ZLEMA(self.data.close, period=20)
def next(self):
# Price above ZLEMA indicates uptrend
if self.data.close[0] > self.zlema[0]:
self.buy()
# Price below ZLEMA indicates downtrend
elif self.data.close[0] < self.zlema[0]:
self.sell()
"""
import math
from . import MovingAverageBase
from .ema import EMA
[文档]
class ZeroLagExponentialMovingAverage(MovingAverageBase):
"""
The zero-lag exponential moving average (ZLEMA) is a variation of the EMA
which adds a momentum term aiming to reduce lag in the average to
track current prices more closely.
Formula:
- lag = (period - 1) / 2
- zlema = ema(2 * data - data(-lag))
See also:
- http://user42.tuxfamily.org/chart/manual/Zero_002dLag-Exponential-Moving-Average.html
"""
alias = (
"ZLEMA",
"ZeroLagEma",
)
lines = ("zlema",)
params = (("_movav", EMA),)
def __init__(self):
"""Initialize the ZLEMA indicator.
Calculates lag and alpha values for zero-lag EMA.
"""
super().__init__()
self.lag = (self.p.period - 1) // 2
self.alpha = 2.0 / (1.0 + self.p.period)
self.alpha1 = 1.0 - self.alpha
self.addminperiod(self.lag + self.p.period)
[文档]
def nextstart(self):
"""Seed ZLEMA calculation with SMA on first valid bar.
Uses SMA of lag-adjusted data for initial seed value.
"""
# Seed with SMA of adjusted data
period = self.p.period
lag = self.lag
data_sum = 0.0
for i in range(period):
adjusted = 2.0 * self.data[-i] - self.data[-i - lag]
data_sum += adjusted
self.lines.zlema[0] = data_sum / period
[文档]
def next(self):
"""Calculate ZLEMA for the current bar.
Applies EMA to lag-adjusted data: 2 * data - data(-lag).
"""
lag = self.lag
adjusted = 2.0 * self.data[0] - self.data[-lag]
self.lines.zlema[0] = self.lines.zlema[-1] * self.alpha1 + adjusted * self.alpha
[文档]
def once(self, start, end):
"""Calculate ZLEMA in runonce mode.
Applies EMA to lag-adjusted data across all bars.
"""
darray = self.data.array
larray = self.lines.zlema.array
period = self.p.period
lag = self.lag
alpha = self.alpha
alpha1 = self.alpha1
while len(larray) < end:
larray.append(0.0)
minperiod = lag + period
for i in range(min(minperiod - 1, len(darray))):
if i < len(larray):
larray[i] = float("nan")
# Seed value
seed_idx = minperiod - 1
if seed_idx < len(darray) and seed_idx >= lag:
seed_sum = 0.0
for j in range(period):
idx = seed_idx - j
if idx >= lag and idx < len(darray) and idx - lag >= 0:
adjusted = 2.0 * darray[idx] - darray[idx - lag]
seed_sum += adjusted
prev = seed_sum / period
if seed_idx < len(larray):
larray[seed_idx] = prev
else:
prev = 0.0
# Calculate ZLEMA
for i in range(minperiod, min(end, len(darray))):
if i >= lag and i - lag >= 0:
adjusted = 2.0 * darray[i] - darray[i - lag]
else:
adjusted = darray[i]
if i > 0 and i - 1 < len(larray):
prev_val = larray[i - 1]
if not (isinstance(prev_val, float) and math.isnan(prev_val)):
prev = prev_val
prev = prev * alpha1 + adjusted * alpha
if i < len(larray):
larray[i] = prev