backtrader.indicators.basicops 源代码

#!/usr/bin/env python
"""Basic Operations Indicator Module - Fundamental calculation indicators.

This module provides basic mathematical operations and calculations for
indicator development, including period-based operations and statistics.

Classes:
    PeriodN: Base class for period-based indicators.
    OperationN: Base class for function-based period calculations.
    BaseApplyN: Base class for applying a function over a period.
    ApplyN: Applies a function over a period.
    Highest: Calculates highest value (alias: MaxN).
    Lowest: Calculates lowest value (alias: MinN).
    ReduceN: Applies reduce function over a period.
    SumN: Calculates sum over a period.
    AnyN: Returns True if any value is True.
    AllN: Returns True only if all values are True.
    FindFirstIndex: Finds first index matching condition.
    FindFirstIndexHighest: Index of first highest value.
    FindFirstIndexLowest: Index of first lowest value.
    FindLastIndex: Finds last index matching condition.
    FindLastIndexHighest: Index of last highest value.
    FindLastIndexLowest: Index of last lowest value.
    Accum: Cumulative sum (aliases: CumSum, CumulativeSum).
    Average: Arithmetic mean (aliases: ArithmeticMean, Mean).
    ExponentialSmoothing: EMA-style smoothing (alias: ExpSmoothing).
    ExponentialSmoothingDynamic: Dynamic alpha smoothing (alias: ExpSmoothingDynamic).
    WeightedAverage: Weighted average (alias: AverageWeighted).

Example:
    class MyStrategy(bt.Strategy):
        def __init__(self):
            # Calculate highest and lowest prices over 20 periods
            self.highest = bt.indicators.Highest(self.data.close, period=20)
            self.lowest = bt.indicators.Lowest(self.data.close, period=20)

            # Calculate average price
            self.avg = bt.indicators.Average(self.data.close, period=20)

        def next(self):
            # Buy when price breaks above highest of last 20 bars
            if self.data.close[0] > self.highest[-1]:
                self.buy()

            # Sell when price breaks below lowest of last 20 bars
            elif self.data.close[0] < self.lowest[-1]:
                self.sell()
"""

import functools
import math
import operator

from ..utils.py3 import map, range
from . import Indicator


[文档] class PeriodN(Indicator): """ Base class for indicators which take a period (__init__ has to be called either via supper or explicitly) This class has no defined lines """ params = (("period", 1),) def __init__(self): """Initialize the period-based indicator. Sets minimum period based on the period parameter. """ super().__init__() self.addminperiod(self.p.period)
# Calculate data for past N periods using func, func is a callable function
[文档] class OperationN(PeriodN): """ Calculates "func" for a given period Serves as a base for classes that work with a period and can express the logic in a callable object Note: Base classes must provide a "func" attribute which is callable Formula: - line = func(data, period) """
[文档] def next(self): """Calculate function value for the current bar. Applies func to the last 'period' data values. """ # CRITICAL FIX: Use proper line assignment instead of direct array manipulation # The line[0] assignment will handle the buffer correctly value = self.func(self.data.get(size=self.p.period)) self.lines[0][0] = value
[文档] def once(self, start, end): """Optimized batch calculation for runonce mode - same approach as SMA""" try: # Get arrays for efficient calculation - use same approach as SMA dst = self.lines[0].array src = self.data.array period = self.p.period func = self.func # CRITICAL FIX: Handle case where start >= end (not enough data for this indicator) # This can happen when nested indicators have larger minperiod than available data if start >= end: # Still need to pre-fill the array with NaN while len(dst) < end: dst.append(float("nan")) return # No data to process # CRITICAL FIX: Pre-fill warmup period with NaN instead of 0.0 # This ensures that accessing indicator values before minperiod returns nan # instead of 0.0, which could trigger incorrect buy/sell signals while len(dst) < end: dst.append(float("nan")) # Calculate for each index from start to end for i in range(start, end): if i >= period - 1: # Calculate SMA-style: get last 'period' values start_idx = i - period + 1 end_idx = i + 1 if end_idx <= len(src): # Get slice of data slice_data = src[start_idx:end_idx] # Apply function (min, max, etc.) if len(slice_data) == period: try: result = func(slice_data) dst[i] = float(result) if result is not None else float("nan") except (ValueError, TypeError): dst[i] = float("nan") else: dst[i] = float("nan") else: dst[i] = float("nan") else: # Not enough data yet dst[i] = float("nan") except Exception: # Fallback to once_via_next if once() fails super().once_via_next(start, end)
# Set callable function when calculating indicators
[文档] class BaseApplyN(OperationN): """ Base class for ApplyN and others which may take a ``func`` as a parameter but want to define the lines in the indicator. Calculates ``func`` for a given period where func is given as a parameter, aka named argument or ``kwarg`` Formula: - lines[0] = func(data, period) Any extra lines defined beyond the first (index 0) are not calculated """ params = (("func", None),) def __init__(self): """Initialize the base apply indicator. Sets func from parameter and initializes parent. """ self.func = self.p.func super().__init__()
# Calculate specific line based on the set callable function
[文档] class ApplyN(BaseApplyN): """ Calculates ``func`` for a given period Formula: - line = func(data, period) """ lines = ("apply",)
# Calculate highest price in past N periods
[文档] class Highest(OperationN): """ Calculates the highest value for the data in a given period Uses the built-in ``max`` for the calculation Formula: - highest = max(data, period) """ alias = ("MaxN",) lines = ("highest",) func = max
# Calculate lowest price in past N periods
[文档] class Lowest(OperationN): """ Calculates the lowest value for the data in a given period Uses the built-in ``min`` for the calculation Formula: - lowest = min(data, period) """ alias = ("MinN",) lines = ("lowest",) func = min
# Mimic Python's reduce functionality
[文档] class ReduceN(OperationN): """ Calculates the Reduced value of the ``period`` data points applying ``function`` Uses the built-in ``reduce`` for the calculation plus the ``func`` that subclassess define Formula: - reduced = reduce (function(data, period)), initializer=initializer) Notes: - In order to mimic the python `reduce`, this indicator takes a ``function`` non-named argument as the 1st argument, unlike other Indicators which take only named arguments """ lines = ("reduced",) func = functools.reduce def __init__(self, function, **kwargs): """Initialize the ReduceN indicator. Sets up reduce function with optional initializer. Args: function: The reduce function to apply. **kwargs: Optional 'initializer' parameter. """ if "initializer" not in kwargs: self.func = functools.partial(self.func, function) else: self.func = functools.partial(self.func, function, initializer=kwargs["initializer"]) super().__init__()
# Calculate sum of past N periods
[文档] class SumN(OperationN): """ Calculates the Sum of the data values over a given period Uses ``math.fsum`` for the calculation rather than the built-in ``sum`` to avoid precision errors Formula: - sumn = sum(data, period) """ lines = ("sumn",) func = math.fsum
# Return True if any value in past N periods is True
[文档] class AnyN(OperationN): """ Has a value of ``True`` (stored as ``1.0`` in the lines) if *any* of the values in the ``period`` evaluates to non-zero (ie: ``True``) Uses the built-in `any` for the calculation Formula: - anyn = any(data, period) """ lines = ("anyn",) func = any
# Return True only if all values in past N periods are True
[文档] class AllN(OperationN): """ Has a value of ``True`` (stored as ``1.0`` in the lines) if *all* of the values in the ``period`` evaluates to non-zero (ie: ``True``) Uses the built-in `all` for the calculation Formula: - alln = all(data, period) """ lines = ("alln",) func = all
# Return the first data point that satisfies the condition
[文档] class FindFirstIndex(OperationN): """ Returns the index of the last data that satisfies equality with the condition generated by the parameter _evalfunc Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = first for which data[index] == _evalfunc(data) """ lines = ("index",) params = (("_evalfunc", None),)
[文档] def func(self, iterable): """Find first index where value matches eval function result. Args: iterable: Data values to search. Returns: Index of first matching value (looking backwards). """ m = self.p._evalfunc(iterable) return next(i for i, v in enumerate(reversed(iterable)) if v == m)
# Get the earliest occurrence of the highest price in the past
[文档] class FindFirstIndexHighest(FindFirstIndex): """ Returns the index of the first data that is the highest in the period Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = index of first data which is the highest """ params = (("_evalfunc", max),)
# Get the earliest occurrence of the lowest price in the past
[文档] class FindFirstIndexLowest(FindFirstIndex): """ Returns the index of the first data that is the lowest in the period Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = index of first data which is the lowest """ params = (("_evalfunc", min),)
# Get the index of the last data point that satisfies the condition
[文档] class FindLastIndex(OperationN): """ Returns the index of the last data that satisfies equality with the condition generated by the parameter _evalfunc Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = last for which data[index] == _evalfunc(data) """ lines = ("index",) params = (("_evalfunc", None),)
[文档] def func(self, iterable): """Find last index where value matches eval function result. Args: iterable: Data values to search. Returns: Index of last matching value (looking backwards). """ m = self.p._evalfunc(iterable) index = next(i for i, v in enumerate(iterable) if v == m) # The iterable goes from 0 -> period - 1. If the last element # which is the current bar is returned and without the -1 then # period - index = 1 ... and must be zero! return self.p.period - index - 1
# Get the latest occurrence of the highest price in the past
[文档] class FindLastIndexHighest(FindLastIndex): """ Returns the index of the last data that is the highest in the period Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = index of last data which is the highest """ params = (("_evalfunc", max),)
# Get the latest occurrence of the lowest price in the past
[文档] class FindLastIndexLowest(FindLastIndex): """ Returns the index of the last data that is the lowest in the period Note: Returned indexes look backwards. 0 is the current index and 1 is the previous bar. Formula: - index = index of last data which is the lowest """ params = (("_evalfunc", min),)
# Calculate cumulative sum
[文档] class Accum(Indicator): """ Cummulative sum of the data values Formula: - accum += data """ alias = ( "CumSum", "CumulativeSum", ) lines = ("accum",) params = (("seed", 0.0),) # xxxstart methods use the seed (starting value) and passed data to # construct the first value keeping the minperiod to 1 since no # initial look-back value is needed
[文档] def nextstart(self): """Start accumulation with seed value. accum = seed + data[0] """ self.lines[0][0] = self.p.seed + self.data[0]
[文档] def next(self): """Add current data value to accumulation. accum += data """ self.lines[0][0] = self.lines[0][-1] + self.data[0]
[文档] def oncestart(self, start, end): """Start accumulation in runonce mode. accum = seed + data for each bar. """ dst = self.lines[0].array src = self.data.array prev = self.p.seed for i in range(start, end): dst[i] = prev = prev + src[i]
[文档] def once(self, start, end): """Continue accumulation in runonce mode. accum = prev_accum + data for each bar. """ dst = self.lines[0].array src = self.data.array prev = dst[start - 1] for i in range(start, end): dst[i] = prev = prev + src[i]
# Calculate arithmetic mean
[文档] class Average(PeriodN): """ Averages a given data arithmetically over a period Formula: - av = data(period) / period See also: - https://en.wikipedia.org/wiki/Arithmetic_mean """ alias = ( "ArithmeticMean", "Mean", ) lines = ("av",)
[文档] def next(self): """Calculate arithmetic mean for the current bar. av = sum(data, period) / period """ data_values = self.data.get(size=self.p.period) avg_value = math.fsum(data_values) / self.p.period self.lines[0][0] = avg_value
[文档] def once(self, start, end): """Calculate Average (SMA) in runonce mode""" src = self.data.array dst = self.lines[0].array period = self.p.period # Ensure destination array is large enough while len(dst) < end: dst.append(0.0) for i in range(start, end): if i >= period - 1: start_idx = i - period + 1 end_idx = i + 1 if end_idx <= len(src): dst[i] = sum(src[start_idx:end_idx]) / period else: dst[i] = float("nan") else: dst[i] = float("nan")
# Calculate exponential moving average
[文档] class ExponentialSmoothing(Average): """ Averages a given data over a period using exponential smoothing A regular ArithmeticMean (Average) is used as the seed value considering the first period values of data Formula: - av = prev * (1 - alpha) + data * alpha See also: - https://en.wikipedia.org/wiki/Exponential_smoothing """ alias = ("ExpSmoothing",) params = (("alpha", None),) def __init__(self): """Initialize the exponential smoothing indicator. Calculates alpha and alpha1 for smoothing calculation. """ self.alpha = self.p.alpha if self.alpha is None: self.alpha = 2.0 / (1.0 + self.p.period) # def EMA value self.alpha1 = 1.0 - self.alpha super().__init__()
[文档] def nextstart(self): """Seed exponential smoothing with SMA value. Uses parent's SMA calculation for initial seed. """ # Fetch the seed value from the base class calculation super().next()
[文档] def next(self): """Calculate EMA for the current bar. av = prev * alpha1 + data * alpha """ self.lines[0][0] = self.lines[0][-1] * self.alpha1 + self.data[0] * self.alpha
[文档] def oncestart(self, start, end): """Calculate seed value in runonce mode. Uses parent's SMA calculation for initial seed. """ # Calculate seed value using parent's once method (SMA of first period values) # Call parent's once method to populate seed at index period-1 if start == self.p.period - 1: super().once(start, end)
[文档] def once(self, start, end): """Calculate EMA in runonce mode""" darray = self.data.array larray = self.lines[0].array alpha = self.alpha alpha1 = self.alpha1 period = self.p.period # CRITICAL FIX: Ensure array is properly sized while len(larray) < end: larray.append(0.0) # CRITICAL FIX: Pre-fill warmup period with NaN to match expected behavior # This prevents invalid comparisons during prenext when strategy calls next() for i in range(0, min(period - 1, len(darray))): larray[i] = float("nan") # CRITICAL FIX: Calculate seed value (SMA of first period values) # EMA starts at index period-1 with seed = SMA of first period values seed_idx = period - 1 # Calculate seed as SMA of first period values prev = None if seed_idx < len(darray) and seed_idx >= 0: seed_start = max(0, seed_idx - period + 1) seed_end = seed_idx + 1 if seed_end <= len(darray) and seed_end > seed_start: seed_data = darray[seed_start:seed_end] if len(seed_data) >= period: prev = sum(seed_data) / period elif len(seed_data) > 0: prev = sum(seed_data) / len(seed_data) # Fallback: use first data point if seed calculation failed if prev is None or prev <= 0.0 or (isinstance(prev, float) and math.isnan(prev)): if len(darray) > 0: prev = float(darray[0]) else: prev = 0.0 # Set seed value at index period-1 if within calculation range if seed_idx >= start and seed_idx < end: larray[seed_idx] = prev # Calculate EMA for indices from period to end calc_start = max(start, period) for i in range(calc_start, end): if i < len(darray) and i >= 0: # Use previous EMA value if available, otherwise use seed if i > calc_start: prev_ema = larray[i - 1] if prev_ema > 0.0 and not ( isinstance(prev_ema, float) and math.isnan(prev_ema) ): prev = prev_ema # EMA formula: prev * alpha1 + current * alpha current_val = float(darray[i]) prev = prev * alpha1 + current_val * alpha larray[i] = prev elif i >= len(darray): break
# Dynamic exponential moving average
[文档] class ExponentialSmoothingDynamic(ExponentialSmoothing): """ Averages a given data over a period using exponential smoothing A regular ArithmeticMean (Average) is used as the seed value considering the first period values of data Note: - alpha is an array of values which can be calculated dynamically Formula: - av = prev * (1 - alpha) + data * alpha See also: - https://en.wikipedia.org/wiki/Exponential_smoothing """ alias = ("ExpSmoothingDynamic",) def __init__(self): """Initialize the dynamic exponential smoothing indicator. Sets up alpha1 line for dynamic alpha values. """ super().__init__() # CRITICAL FIX: Handle cases where alpha is a float instead of a LineBuffer # The parent class sets self.alpha to a float value, but ExponentialSmoothingDynamic # expects it to be a line-like object with _minperiod and array access if hasattr(self.alpha, "_minperiod"): # alpha is a LineBuffer or similar object minperioddiff = max(0, self.alpha._minperiod - self.p.period) self.lines[0].incminperiod(minperioddiff) # Set up alpha1 as a line that computes 1 - alpha from . import Indicator class Alpha1Line(Indicator): """Helper class to compute 1 - alpha dynamically.""" lines = ("alpha1",) params = (("alpha_source", None),) def __init__(self): """Initialize with alpha source reference.""" self.alpha_source = self.p.alpha_source super().__init__() def next(self): """Calculate 1 - alpha for current bar.""" self.lines.alpha1[0] = 1.0 - self.alpha_source[0] def once(self, start, end): """Calculate 1 - alpha in runonce mode.""" alpha_array = self.alpha_source.array alpha1_array = self.lines.alpha1.array for i in range(start, end): alpha1_array[i] = 1.0 - alpha_array[i] self.alpha1 = Alpha1Line(alpha_source=self.alpha) else: # alpha is a float value - convert it to work with dynamic smoothing # In this case, we can't do true dynamic smoothing, so we fall back to static # print(f"WARNING: ExponentialSmoothingDynamic received float alpha={self.alpha}, falling back to static smoothing") # Removed for performance pass # No additional minperiod adjustment needed for static alpha # self.alpha1 is already set in parent class as a float
[文档] def next(self): """Calculate dynamic EMA for the current bar. Handles both float and LineBuffer alpha sources. """ # CRITICAL FIX: Handle both float and LineBuffer cases for alpha if hasattr(self.alpha, "__getitem__"): # alpha is a LineBuffer - use array access self.lines[0][0] = self.lines[0][-1] * self.alpha1[0] + self.data[0] * self.alpha[0] else: # alpha is a float - use regular arithmetic (fall back to parent behavior) self.lines[0][0] = self.lines[0][-1] * self.alpha1 + self.data[0] * self.alpha
[文档] def once(self, start, end): """Calculate dynamic EMA in runonce mode. Handles both float and LineBuffer alpha sources. """ # CRITICAL FIX: Handle both float and LineBuffer cases for alpha darray = self.data.array larray = self.line.array if hasattr(self.alpha, "array"): # alpha is a LineBuffer - use array access alpha = self.alpha.array alpha1 = self.alpha1.array # Seed value from SMA calculated with the call to oncestart prev = larray[start - 1] for i in range(start, end): larray[i] = prev = prev * alpha1[i] + darray[i] * alpha[i] else: # alpha is a float - use regular arithmetic (fall back to parent behavior) alpha = self.alpha alpha1 = self.alpha1 # Seed value from SMA calculated with the call to oncestart prev = larray[start - 1] for i in range(start, end): larray[i] = prev = prev * alpha1 + darray[i] * alpha
# Calculate weighted moving average
[文档] class WeightedAverage(PeriodN): """ Calculates the weighted average of the given data over a period The default weights (if none are provided) are linear to assigne more weight to the most recent data The result will be multiplied by a given "coef" Formula: - av = coef * sum(mul(data, period), weights) See: - https://en.wikipedia.org/wiki/Weighted_arithmetic_mean """ alias = ("AverageWeighted",) lines = ("av",) params = ( ("coef", 1.0), ("weights", tuple()), ) def __init__(self): """Initialize the Weighted Average indicator. Sets up parameters for weighted average calculation. """ super().__init__()
[文档] def next(self): """Calculate weighted average for the current bar. Multiplies data by weights and sums, then applies coefficient. """ data = self.data.get(size=self.p.period) dataweighted = map(operator.mul, data, self.p.weights) self.lines[0][0] = self.p.coef * math.fsum(dataweighted)
[文档] def once(self, start, end): """Calculate weighted average in runonce mode. Computes weighted averages across all bars efficiently. """ darray = self.data.array larray = self.line.array period = self.p.period coef = self.p.coef weights = self.p.weights for i in range(start, end): data = darray[i - period + 1 : i + 1] larray[i] = coef * math.fsum(map(operator.mul, data, weights))
AverageWeighted = WeightedAverage