#!/usr/bin/env python
"""Data Feed Module - Financial data feed implementations.
This module provides the base classes and implementations for data feeds
in backtrader. Data feeds are the source of price/volume data for strategies
and indicators.
Key Classes:
AbstractDataBase: Base class for all data feeds with core functionality.
DataBase: Full-featured data feed with replay/resample support.
CSVDataBase: Base class for CSV file data feeds.
FeedBase: Base for live/real-time data feeds.
Data feeds provide:
- OHLCV (Open, High, Low, Close, Volume) data
- Timeline management and session handling
- Replay and resampling capabilities
- Live data support for trading
Example:
Creating a custom data feed:
>>> class MyDataFeed(CSVDataBase):
... params = (('dataname', 'data.csv'),)
"""
import collections
import datetime
import inspect
import os.path
from . import dataseries, metabase
from .dataseries import SimpleFilterWrapper, TimeFrame
from .resamplerfilter import Replayer, Resampler
from .tradingcal import PandasMarketCalendar
from .utils import date2num, num2date, time2num, tzparse
from .utils.date import Localizer
from .utils.py3 import range, string_types, zip
# Refactor: Remove metaclass, use normal class and initialization method
[文档]
class AbstractDataBase(dataseries.OHLCDateTime):
"""Base class for all data feed implementations.
Provides the core functionality for data feeds including:
- Data loading and preprocessing
- Timeline management
- Session handling
- Live data support
- Notification system for data status changes
States:
CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED, LIVE,
NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN
Params:
dataname: Data source identifier (filename, URL, etc.).
name: Display name for the data feed.
compression: Timeframe compression factor.
timeframe: TimeFrame period (Days, Minutes, etc.).
fromdate: Start date for data filtering.
todate: End date for data filtering.
sessionstart: Session start time.
sessionend: Session end time.
filters: List of data filters to apply.
tz: Output timezone.
tzinput: Input timezone.
qcheck: Timeout in seconds for live event checking.
calendar: Trading calendar to use.
Example:
>>> data = AbstractDataBase(dataname='data.csv')
>>> cerebro.adddata(data)
"""
# Class-level registry dictionary, replacing metaclass _indcol functionality
_registry = {}
# Parameter initialization settings - use _params_tuple to save original definition
_params_tuple = (
("dataname", None),
("name", ""),
("compression", 1),
("timeframe", TimeFrame.Days),
("fromdate", None),
("todate", None),
("sessionstart", None),
("sessionend", None),
("filters", []),
("tz", None),
("tzinput", None),
("qcheck", 0.0), # timeout in seconds (float) to check for events
("calendar", None),
)
# Keep original params definition for compatibility with metaclass system
params = _params_tuple
# Eight different states of data
(
CONNECTED,
DISCONNECTED,
CONNBROKEN,
DELAYED,
LIVE,
NOTSUBSCRIBED,
NOTSUPPORTED_TF,
UNKNOWN,
) = range(8)
# Notification names
_NOTIFNAMES = [
"CONNECTED",
"DISCONNECTED",
"CONNBROKEN",
"DELAYED",
"LIVE",
"NOTSUBSCRIBED",
"NOTSUPPORTED_TIMEFRAME",
"UNKNOWN",
]
def __init__(self, *args, **kwargs):
"""Initialize the data feed.
Args:
*args: Positional arguments.
**kwargs: Keyword arguments for data feed parameters.
"""
# Execute the original metaclass dopreinit functionality
self._init_preinit(*args, **kwargs)
# Call parent class initialization
super().__init__(*args, **kwargs)
# Execute the original metaclass dopostinit functionality
self._init_postinit(*args, **kwargs)
# CRITICAL FIX: Mark all lines as belonging to a data feed
# This must be done AFTER _init_postinit to ensure lines are fully initialized
# This allows LineSeries.__getitem__ and LineBuffer.__getitem__ to correctly
# raise IndexError when accessing out-of-range indices
# This is essential for expire_order_close() to detect insufficient data
if hasattr(self, "lines") and self.lines is not None:
try:
for line in self.lines:
if hasattr(line, "__dict__"):
line._is_data_feed_line = True
except Exception:
pass
# CRITICAL FIX: Also explicitly mark the datetime line
# The datetime line might be accessed separately (e.g., self.datas[0].datetime)
# and needs to raise IndexError when accessing out of bounds
if hasattr(self, "datetime") and self.datetime is not None:
try:
if hasattr(self.datetime, "__dict__"):
self.datetime._is_data_feed_line = True
except Exception:
pass
# Original content from __init__
self._env = None
self._barstash = None
self._barstack = None
self._laststatus = None
def _init_preinit(self, *args, **kwargs):
"""Replace the original MetaAbstractDataBase.dopreinit"""
# Find the owner and store it
self._feed = self._find_feed_owner()
# Initialize a queue to store notifications from cerebro
self.notifs = collections.deque() # store notifications for cerebro
# Get _dataname value from parameters
self._dataname = getattr(self.p, "dataname", None)
# Default _name attribute is empty
self._name = ""
def _init_postinit(self, *args, **kwargs):
"""Replace the original MetaAbstractDataBase.dopostinit"""
# Debug: check parameter state at the beginning
# print(f"_init_postinit start: self.p.dataname = {getattr(self.p, 'dataname', 'NO_P_ATTR')}")
# print(f"_init_postinit kwargs: {kwargs}")
# Either set by subclass or the parameter or use the dataname (ticker)
# Reset _name attribute, if _name is not empty, keep it; if empty, set it to name parameter value
self._name = self._name or getattr(self.p, "name", "")
# If _name attribute value is still empty and dataname parameter value is string, set _name to dataname value
if not self._name and isinstance(getattr(self.p, "dataname", None), string_types):
self._name = self.p.dataname
# _compression value equals the compression parameter value
self._compression = getattr(self.p, "compression", 1)
# _timeframe value equals the timeframe parameter value
self._timeframe = getattr(self.p, "timeframe", TimeFrame.Days)
# Only set sessionstart/sessionend defaults if they weren't explicitly passed
# If start time is datetime format, equals specific time from sessionstart; if None, equals minimum time
sessionstart = getattr(self.p, "sessionstart", None)
if isinstance(sessionstart, datetime.datetime):
self.p.sessionstart = sessionstart.time()
elif sessionstart is None:
# CRITICAL FIX: Always set default if None (kwargs check was unreliable)
self.p.sessionstart = datetime.time.min
# If end time is datetime format, equals specific time from sessionend; if None, equals 23:59:59.999990
sessionend = getattr(self.p, "sessionend", None)
if isinstance(sessionend, datetime.datetime):
self.p.sessionend = sessionend.time()
elif sessionend is None:
# CRITICAL FIX: Always set default if None (kwargs check was unreliable)
# remove 9 to avoid precision rounding errors
self.p.sessionend = datetime.time(23, 59, 59, 999990)
# Debug: check parameter state after modification
# print(f"_init_postinit end: self.p.dataname = {getattr(self.p, 'dataname', 'NO_P_ATTR')}")
# If start date is date format and has no hour attribute, add sessionstart time to convert start date to date+time format
fromdate = getattr(self.p, "fromdate", None)
if isinstance(fromdate, datetime.date):
# push it to the end of the day, or else intraday
# values before the end of the day would be gone
if not hasattr(fromdate, "hour"):
self.p.fromdate = datetime.datetime.combine(fromdate, self.p.sessionstart)
# If end date is date format and has no hour attribute, add sessionend time to convert start date to date+time format
todate = getattr(self.p, "todate", None)
if isinstance(todate, datetime.date):
# push it to the end of the day, or else intraday
# values before the end of the day would be gone
if not hasattr(todate, "hour"):
self.p.todate = datetime.datetime.combine(todate, self.p.sessionend)
# Set _barstack and _barstash as queues for filter operations
self._barstack = collections.deque() # for filter operations
self._barstash = collections.deque() # for filter operations
# Set _filters and _ffilters as empty lists
self._filters = list()
self._ffilters = list()
# Iterate through filters in parameters, first check if it's a class; if class, instantiate first; if instance has last attribute, add filter to _ffilters
# If not a class, directly add filter to _filters
filters = getattr(self.p, "filters", [])
for fp in filters:
if inspect.isclass(fp):
fp = fp(self)
if hasattr(fp, "last"):
self._ffilters.append((fp, [], {}))
self._filters.append((fp, [], {}))
def _find_feed_owner(self):
"""Find the feed owner using metabase.findowner.
This method delegates to metabase.findowner which uses
OwnerContext for explicit owner management.
"""
# Use findowner which checks OwnerContext for owner lookup
return metabase.findowner(self, FeedBase)
@classmethod
def _getstatusname(cls, status):
return cls._NOTIFNAMES[status]
# Initialize the following variables, may be used in live trading
_compensate = None
_feed = None
_store = None
_clone = False
_qcheck = 0.0
# Time offset
_tmoffset = datetime.timedelta()
# Set to non 0 if resampling/replaying
# Whether resampling or replaying, if not, set to 0
resampling = 0
replaying = 0
# Whether started
_started = False
def _start_finish(self):
# A live feed (for example) may have learnt something about the
# timezones after the start, and that's why the date/time related
# parameters are converted at this late stage
# Get the output timezone (if any)
# Get specific timezone
self._tz = self._gettz()
# Lines have already been created, set the tz
# Set specific timezone for time
self.lines.datetime._settz(self._tz)
# This should probably be also called from an override-able method
# Localize input timezone
self._tzinput = Localizer(self._gettzinput())
# Convert user input times to the output timezone (or min/max)
# Convert user input start and end times to specific numbers; if None, start time is negative infinity, end time is positive infinity
# If specific time, use date2num to convert to specific number
if self.p.fromdate is None:
self.fromdate = float("-inf")
else:
self.fromdate = self.date2num(self.p.fromdate)
if self.p.todate is None:
self.todate = float("inf")
else:
self.todate = self.date2num(self.p.todate)
# FIXME: These two are never used and could be removed
# These two are not used and can be deleted
self.sessionstart = time2num(self.p.sessionstart)
self.sessionend = time2num(self.p.sessionend)
# Get calendar from parameters; if calendar is None, look for _tradingcal in local environment; if string, use PandasMarketCalendar
self._calendar = cal = self.p.calendar
if cal is None:
self._calendar = self._env._tradingcal if self._env else None
elif isinstance(cal, string_types):
self._calendar = PandasMarketCalendar(calendar=cal)
# Start state
self._started = True
def _start(self):
self.start()
# If not in start state yet, initialize first, then enter start state
if not self._started:
self._start_finish()
def _timeoffset(self):
# Time offset
return self._tmoffset
# Return next trading day end time in datetime format and numeric format
def _getnexteos(self):
"""Returns the next eos using a trading calendar if available"""
if self._clone:
return self.data._getnexteos()
if not len(self):
return datetime.datetime.min, 0.0
dt = self.lines.datetime[0]
dtime = num2date(dt)
if self._calendar is None:
nexteos = datetime.datetime.combine(dtime, self.p.sessionend)
nextdteos = self.date2num(nexteos) # locl'ed -> utc-like
nexteos = num2date(nextdteos) # utc
while dtime > nexteos:
nexteos += datetime.timedelta(days=1) # already utc-like
nextdteos = date2num(nexteos) # -> utc-like
else:
# returns times in utc
_, nexteos = self._calendar.schedule(dtime, self._tz)
nextdteos = date2num(nexteos) # nextos is already utc
return nexteos, nextdteos
# Parse tzinput and return
def _gettzinput(self):
"""Can be overriden by classes to return a timezone for input"""
return tzparse(self.p.tzinput)
# Parse tz and return
def _gettz(self):
"""To be overriden by subclasses which may auto-calculate the
timezone"""
return tzparse(self.p.tz)
# Convert time to number; if timezone info is not None, localize time first, then convert
[文档]
def date2num(self, dt):
"""Convert datetime to internal numeric format.
Args:
dt: datetime object to convert.
Returns:
float: Internal numeric representation of the datetime.
"""
if self._tz is not None:
return date2num(self._tz.localize(dt))
return date2num(dt)
# Convert number to date+time
[文档]
def num2date(self, dt=None, tz=None, naive=True):
"""Convert internal numeric format to datetime.
Args:
dt: Numeric datetime value (uses current if None).
tz: Timezone to use (uses feed tz if None).
naive: Return naive datetime if True.
Returns:
datetime: Converted datetime object.
"""
if dt is None:
return num2date(self.lines.datetime[0], tz or self._tz, naive)
return num2date(dt, tz or self._tz, naive)
# Whether has live data; default is False; if has live data, needs override
[文档]
def haslivedata(self):
"""Check if this data feed has live data.
Returns:
bool: False for base class, override for live data feeds.
"""
return False # must be overriden for those that can
# Wait interval when resampling live data
[文档]
def do_qcheck(self, onoff, qlapse):
"""Calculate wait interval for queue checking.
Args:
onoff: Whether queue checking is enabled.
qlapse: Time elapsed since last check.
"""
# if onoff is True, the data will wait p.qcheck for incoming live data
# on its queue.
qwait = self.p.qcheck if onoff else 0.0
qwait = max(0.0, qwait - qlapse)
self._qcheck = qwait
# Whether is live data; default is False; if True, cerebro will not use preload and runonce, because live data needs
# to be fetched tick by tick or bar by bar
[文档]
def islive(self):
"""If this returns True, ``Cerebro`` will deactivate ``preload`` and
``runonce`` because a live data source must be fetched tick by tick (or
bar by bar)"""
return False
# If latest status differs from current status, need to add info to notifs to update latest status
[文档]
def put_notification(self, status, *args, **kwargs):
"""Add arguments to notification queue"""
if self._laststatus != status:
self.notifs.append((status, args, kwargs))
self._laststatus = status
# Get notification info, save to notifs and return as result
[文档]
def get_notifications(self):
"""Return the pending "store" notifications"""
# The background thread could keep on adding notifications. The None
# mark allows to identify which is the last notification to deliver
# Add a None, when None is retrieved, it means the queue is empty and all info has been retrieved
self.notifs.append(None) # put a mark
notifs = list()
while True:
notif = self.notifs.popleft()
if notif is None: # mark is reached
break
notifs.append(notif)
return notifs
# Get feed
[文档]
def getfeed(self):
"""Get the parent feed object.
Returns:
FeedBase or None: The parent feed instance.
"""
return self._feed
# Amount of cached data
[文档]
def qbuffer(self, savemem=0, replaying=False):
"""Apply queued buffering to all lines.
Args:
savemem: Memory saving mode.
replaying: Whether replaying is active.
"""
extrasize = self.resampling or replaying
for line in self.lines:
line.qbuffer(savemem=savemem, extrasize=extrasize)
# Start, reset _barstack and _barstash
[文档]
def start(self):
"""Start the data feed.
Resets internal queues and sets initial status to CONNECTED.
"""
self._barstack = collections.deque()
self._barstash = collections.deque()
self._laststatus = self.CONNECTED
# End
[文档]
def stop(self):
"""Stop the data feed.
Override in subclasses for cleanup.
"""
pass
# Clone data
[文档]
def clone(self, **kwargs):
"""Create a clone of this data feed.
Args:
**kwargs: Additional keyword arguments for the clone.
Returns:
DataClone: A cloned data feed.
"""
return DataClone(dataname=self, **kwargs)
# Copy data and give it a different name
[文档]
def copyas(self, _dataname, **kwargs):
"""Copy the data feed with a different name.
Args:
_dataname: New name for the data feed.
**kwargs: Additional keyword arguments.
Returns:
DataClone: A cloned data feed with the new name.
"""
d = DataClone(dataname=self, **kwargs)
d._dataname = _dataname
d._name = _dataname
return d
# Set environment
[文档]
def setenvironment(self, env):
"""Keep a reference to the environment"""
self._env = env
# Get environment
[文档]
def getenvironment(self):
"""Get the cerebro environment reference.
Returns:
The cerebro environment instance.
"""
return self._env
# Add simple filter
[文档]
def addfilter_simple(self, f, *args, **kwargs):
"""Add a simple filter wrapper to this data feed.
Args:
f: Filter function to apply.
*args: Positional arguments for the filter.
**kwargs: Keyword arguments for the filter.
"""
fp = SimpleFilterWrapper(self, f, *args, **kwargs)
self._filters.append((fp, fp.args, fp.kwargs))
# Add filter
[文档]
def addfilter(self, p, *args, **kwargs):
"""Add a filter to this data feed.
Args:
p: Filter class or instance.
*args: Positional arguments for filter creation.
**kwargs: Keyword arguments for filter creation.
"""
if inspect.isclass(p):
pobj = p(self, *args, **kwargs)
self._filters.append((pobj, [], {}))
if hasattr(pobj, "last"):
self._ffilters.append((pobj, [], {}))
else:
self._filters.append((p, args, kwargs))
# Compensate
[文档]
def compensate(self, other):
"""Call it to let the broker know that actions on this asset will
compensate open positions in another"""
self._compensate = other
# Set tick_+name attribute to None for non-datetime names, mainly used when synthesizing low-frequency data from high-frequency data
def _tick_nullify(self):
# These are the updating prices in case the new bar is "updated"
# and the length doesn't change like if a replay is happening or
# a real-time data feed is in use and 1-minute bars are being
# constructed with 5-second updates
# PERFORMANCE OPTIMIZATION: Cache tick attribute names to avoid repeated string concat
tick_cache = getattr(self, "_tick_cache", None)
if tick_cache is None:
tick_cache = ["tick_" + alias for alias in self.getlinealiases() if alias != "datetime"]
self._tick_cache = tick_cache
for tick_name in tick_cache:
setattr(self, tick_name, None)
self.tick_last = None
# If tick_xxx related attribute value is None, need to consider using bar data to fill
def _tick_fill(self, force=False):
# If nothing filled the tick_xxx attributes, the bar is the tick
# PERFORMANCE OPTIMIZATION: Cache tick name/line pairs for faster access
tick_line_cache = getattr(self, "_tick_line_cache", None)
if tick_line_cache is None:
tick_line_cache = []
alias0 = self._getlinealias(0)
self._tick_alias0 = "tick_" + alias0
self._line_alias0 = getattr(self.lines, alias0)
for lalias in self.getlinealiases():
if lalias != "datetime":
tick_line_cache.append(("tick_" + lalias, getattr(self.lines, lalias)))
self._tick_line_cache = tick_line_cache
if force or getattr(self, self._tick_alias0, None) is None:
for tick_name, line in self._tick_line_cache:
setattr(self, tick_name, line[0])
self.tick_last = self._line_alias0[0]
# Get time of next bar
# PERFORMANCE OPTIMIZATION: Cache float("inf") as module-level constant
_INF = float("inf")
[文档]
def advance_peek(self):
"""Peek at the datetime of the next bar.
Returns:
float: Numeric datetime of next bar, or inf if unavailable.
"""
# PERFORMANCE OPTIMIZATION: Use cached _INF, avoid repeated float("inf") creation
_inf = self._INF
try:
if len(self) < self.buflen():
# CRITICAL FIX: Check if datetime[1] is valid before returning
try:
next_dt = self.lines.datetime[1]
# If next_dt is 0 or invalid, return inf
if next_dt is None or next_dt <= 0:
return _inf
return next_dt
except (IndexError, KeyError):
# If accessing datetime[1] fails, we're at the end
return _inf
return _inf # max date else
except Exception:
return _inf
# Move data forward by size
[文档]
def advance(self, size=1, datamaster=None, ticks=True):
"""Advance the data feed by the specified size.
Args:
size: Number of bars to advance (default: 1).
datamaster: Master data feed for synchronization.
ticks: Whether to process tick data.
"""
if ticks:
self._tick_nullify()
# Need intercepting this call to support datas with
# different lengths (timeframes)
self.lines.advance(size)
if datamaster is not None:
if len(self) > self.buflen():
# if no bar can be delivered, fill with an empty bar
self.rewind()
self.lines.forward()
return
if self.lines.datetime[0] > datamaster.lines.datetime[0]:
self.lines.rewind()
else:
if ticks:
self._tick_fill()
elif len(self) < self.buflen():
# a resampler may have advance us past the last point
if ticks:
self._tick_fill()
# What happens on data when next is called
[文档]
def next(self, datamaster=None, ticks=True):
"""Move to the next bar.
Args:
datamaster: Master data feed for synchronization.
ticks: Whether to process tick data.
Returns:
bool: True if a bar is available, False otherwise.
"""
# If data length is greater than cached data length, if it's ticks data, call _tick_nullify to generate tick_xxx attributes, then call load to try getting next bar; if ret is empty
# return ret. If master data is None, if it's ticks data, need to call _tick_fill.
# If own length is less than cached data length, move forward
# print("AbstractDataBase next function is being called")
if len(self) >= self.buflen():
if ticks:
self._tick_nullify()
# not preloaded - request next bar
ret = self.load()
# if ret is not None:
# print(f"AbstractDataBase next ret = {ret}")
if not ret:
# if the load cannot produce bars - forward the result
return ret
if datamaster is None:
# bar is there and no master ... return load's result
if ticks:
self._tick_fill()
return ret
else:
self.advance(ticks=ticks)
# If master data is not None, if current data time is greater than master data time, need to adjust backward;
# If current data time is not greater than master data time and data is ticks data, need to fill current data
# If master data is None and data is ticks data, need to fill current day data
# a bar is "loaded" or was preloaded - index has been moved to it
if datamaster is not None:
# there is a time reference to check against
if self.lines.datetime[0] > datamaster.lines.datetime[0]:
# can't deliver new bar, too early, go back
self.rewind()
return False
else:
if ticks:
self._tick_fill()
else:
if ticks:
self._tick_fill()
# tell the world there is a bar (either the new or the previous
# Indicate current bar exists
return True
# Preload data
[文档]
def preload(self):
"""Preload all available data from the data feed.
Loads all bars and resets position to the beginning.
"""
# Load data
while self.load():
pass
self._last()
self.home()
# Last chance to use filters
def _last(self, datamaster=None):
# A last chance for filters to deliver something
ret = 0
for ff, fargs, fkwargs in self._ffilters:
ret += ff.last(self, *fargs, **fkwargs)
doticks = False
if datamaster is not None and self._barstack:
doticks = True
while self._fromstack(forward=True):
# consume bar(s) produced by "last"s - adding room
pass
if doticks:
self._tick_fill()
return bool(ret)
# Check if verification is needed
def _check(self, forcedata=None):
for ff, fargs, fkwargs in self._filters:
if not hasattr(ff, "check"):
continue
ff.check(self, _forcedata=forcedata, *fargs, **fkwargs)
# Load data
[文档]
def load(self):
"""Load the next bar from the data feed.
Returns:
bool: True if a bar was loaded, False if no more data.
This method handles:
- Forwarding the data pointer
- Processing filters
- Checking date boundaries
"""
while True:
# move a data pointer forward for new bar
# Move data pointer forward by one
self.forward()
# If data has been retrieved from self._barstack and saved to line, directly return True
if self._fromstack(): # bar is available
return True
# If data cannot be retrieved from self._barstash, run the following code
if not self._fromstack(stash=True):
# _load() returns False, following code must run, but seems unnecessary to call this function or check following result, these two statements seem redundant
### Cannot be 100% certain for now, will review after code comments are completed #fix
_loadret = self._load()
if not _loadret: # no bar use force to make sure in exactbars
# the pointer is undone this covers especially (but not
# uniquely) the case in which the last bar has been seen
# and a backwards would ruin pointer accounting in the
# "stop" method of the strategy
self.backwards(force=True) # undo data pointer
# Return the actual returned value which may be None to
# signal no bar is available, but the data feed is not
# done. False means game over
return _loadret
# If bar was not retrieved from self._barstack but bar was retrieved from self._barstash, need to process bar
# Get a reference to current loaded time
# Get current time
dt = self.lines.datetime[0]
# A bar has been loaded, adapt the time
# If timezone processing is needed for input time, convert number to time, localize time, convert time to number, update current time
if self._tzinput:
# Input has been converted at face value, but it's not UTC in
# the input stream
dtime = num2date(dt) # get it in a naive datetime
# localize it
dtime = self._tzinput.localize(dtime) # pytz compatible-ized
self.lines.datetime[0] = dt = date2num(dtime) # keep UTC val
# Check standard date from/to filters
# If current time is less than start time, move backward to discard bar and continue
if dt < self.fromdate:
# discard loaded bar and carry on
self.backwards()
continue
# If time is greater than end time, move backward and undo data pointer, then break
if dt > self.todate:
# discard loaded bar and break out
self.backwards(force=True)
break
# Pass through filters
# Iterate through each filter
retff = False
for ff, fargs, fkwargs in self._filters:
# previous filter may have put things onto the stack
# If self._barstack is not empty
if self._barstack:
# Perform self._barstack number of _fromstack function calls, call filter ff
for i in range(len(self._barstack)):
self._fromstack(forward=True)
retff = ff(self, *fargs, **fkwargs)
# If self._barstack is empty, call filter once
else:
retff = ff(self, *fargs, **fkwargs)
# If retff is True, break out of filter loop
if retff: # bar removed from systemn
break # out of the inner loop
# If True, continue
if retff: # bar removed from system - loop to get new bar
continue # in the greater loop
# Checks let the bar through ... notify it
return True
# End loop, return False, no more bars or reached end date
# Out of the loop ... no more bars or past todate
return False
# Function that returns False
def _load(self):
return False
# Add bar data to self._barstack or self._barstash
def _add2stack(self, bar, stash=False):
"""Saves given bar (list of values) to the stack for later retrieval"""
if not stash:
self._barstack.append(bar)
else:
self._barstash.append(bar)
# Get bar data and save to self._barstack or self._barstash, provides parameter to delete bar
def _save2stack(self, erase=False, force=False, stash=False):
"""Saves current bar to the bar stack for later retrieval
Parameter ``erase`` determines removal from the data stream
"""
bar = [line[0] for line in self.itersize()]
if not stash:
self._barstack.append(bar)
else:
self._barstash.append(bar)
if erase: # remove bar if requested
self.backwards(force=force)
# This comment has issues, this function is used to update bar data to specific lines
def _updatebar(self, bar, forward=False, ago=0):
"""Load a value from the stack onto the lines to form the new bar
Returns True if values are present, False otherwise
"""
if forward:
self.forward()
for line, val in zip(self.itersize(), bar):
line[0 + ago] = val
# Get data from self._barstack or self._barstash, then save to line; if successful return True, if not return False
def _fromstack(self, forward=False, stash=False):
"""Load a value from the stack onto the lines to form the new bar
Returns True if values are present, False otherwise
"""
# When stash is False, coll equals self._barstack, otherwise it's self._barstash
coll = self._barstack if not stash else self._barstash
# If coll has data
if coll:
# If forward is True, call forward
if forward:
self.forward()
# Add data to line
for line, val in zip(self.itersize(), coll.popleft()):
line[0] = val
return True
return False
# Add resample filter
[文档]
def resample(self, **kwargs):
"""Add a resampling filter to this data feed.
Resampling converts data to a different timeframe (e.g., minutes to days).
Args:
**kwargs: Arguments for the Resampler filter.
"""
self.addfilter(Resampler, **kwargs)
# Add replay filter
[文档]
def replay(self, **kwargs):
"""Add a replay filter to this data feed.
Replay filters process tick data into bars with precise control.
Args:
**kwargs: Arguments for the Replayer filter.
"""
self.addfilter(Replayer, **kwargs)
@classmethod
def _gettuple(cls):
"""For compatibility, provide _gettuple method"""
return cls._params_tuple if hasattr(cls, "_params_tuple") else cls.params
# DataBase class, directly inherits from abstract DataBase
[文档]
class DataBase(AbstractDataBase):
"""Full-featured data feed class.
Inherits all functionality from AbstractDataBase.
This is the standard data feed class for most use cases.
"""
pass
# Refactor: Remove MetaParams metaclass, use normal parameter processing
[文档]
class FeedBase:
"""Base class for feed containers.
Manages multiple data feeds and provides parameter processing
without using metaclasses.
"""
# Parameter processing, originally merged parameters automatically via metaclass, now manual processing
[文档]
def __init__(self, **kwargs):
"""Initialize the feed base.
Args:
**kwargs: Keyword arguments for parameters.
"""
# Manually set parameters, replacing original metaclass functionality
self.p = self._create_params(**kwargs)
self.datas = list()
def _create_params(self, **kwargs):
"""Manually create parameter object, replacing metaclass parameter processing"""
# Create a simple parameter object
class Params:
"""Parameter container for FeedBase.
Stores parameter values and provides access via _getitems.
"""
def _getitems(self):
"""Simulate original _getitems method.
Returns:
list: List of (attribute_name, value) tuples for non-private attributes.
"""
# OPTIMIZED: Use __dict__ instead of dir() for better performance
items = []
for attr_name, value in self.__dict__.items():
if not attr_name.startswith("_") and not callable(value):
items.append((attr_name, value))
return items
params_obj = Params()
# Get default parameters from DataBase
if hasattr(DataBase, "params"):
base_params = DataBase.params
if isinstance(base_params, (tuple, list)):
for param_tuple in base_params:
if isinstance(param_tuple, (tuple, list)) and len(param_tuple) >= 2:
param_name, param_default = param_tuple[0], param_tuple[1]
setattr(params_obj, param_name, kwargs.get(param_name, param_default))
# Set other passed parameters
for key, value in kwargs.items():
if not hasattr(params_obj, key):
setattr(params_obj, key, value)
return params_obj
# Data start
[文档]
def start(self):
"""Start all managed data feeds."""
for data in self.datas:
data.start()
# Data end
[文档]
def stop(self):
"""Stop all managed data feeds."""
for data in self.datas:
data.stop()
# Get data based on dataname and add data to self.datas
[文档]
def getdata(self, dataname, name=None, **kwargs):
"""Get or create a data feed and add it to the managed datas.
Args:
dataname: Data source identifier (filename, URL, etc.).
name: Display name for the data feed.
**kwargs: Additional parameters for the data feed.
Returns:
DataBase: The created or retrieved data feed instance.
"""
# Merge parameters
final_kwargs = {}
if hasattr(self.p, "_getitems"):
for pname, pvalue in self.p._getitems():
final_kwargs[pname] = pvalue
elif hasattr(self.p, "__dict__"):
final_kwargs.update(self.p.__dict__)
final_kwargs.update(kwargs)
final_kwargs["dataname"] = dataname
data = self._getdata(**final_kwargs)
data._name = name
self.datas.append(data)
return data
def _getdata(self, dataname, **kwargs):
# Set keyword arguments
final_kwargs = {}
if hasattr(self.p, "_getitems"):
for pname, pvalue in self.p._getitems():
final_kwargs[pname] = pvalue
elif hasattr(self.p, "__dict__"):
final_kwargs.update(self.p.__dict__)
final_kwargs.update(kwargs)
final_kwargs["dataname"] = dataname
return self.DataCls(**final_kwargs)
# Refactor: Remove MetaCSVDataBase metaclass, use normal initialization method
[文档]
class CSVDataBase(DataBase):
"""
Base class for classes implementing CSV DataFeeds
The class takes care of opening the file, reading the lines and
tokenizing them.
Subclasses do only need to override:
- _loadline(tokens)
The return value of ``_loadline`` (True/False) will be the return value
of ``_load`` which has been overriden by this base class
"""
# Data defaults to None
f = None
# Set specific parameters, merge parent class parameters - use _params_tuple to save original definition
_params_tuple = (
("headers", True),
("separator", ","),
)
# Keep original params definition for compatibility with metaclass system
params = _params_tuple
# Get data and simple processing
def __init__(self, *args, **kwargs):
"""Initialize the CSV data base.
Args:
*args: Positional arguments.
**kwargs: Keyword arguments for parameters.
"""
# Execute original metaclass MetaCSVDataBase.dopostinit functionality
self._csv_postinit(**kwargs)
# Call parent class initialization
super().__init__(*args, **kwargs)
self.separator = None
def _csv_postinit(self, **kwargs):
"""Replace original MetaCSVDataBase.dopostinit"""
# If parameter has no name and _name attribute is empty, get specific name from data file name
# Use existing parameter system
dataname = getattr(self, "p", None) and getattr(self.p, "dataname", None)
if not dataname:
dataname = kwargs.get("dataname", "")
name = getattr(self, "p", None) and getattr(self.p, "name", None)
if not name:
name = kwargs.get("name", "")
if not name and not getattr(self, "_name", ""):
if isinstance(dataname, string_types):
self._name, _ = os.path.splitext(os.path.basename(dataname))
[文档]
def start(self):
"""Start the CSV data feed.
Opens the CSV file and optionally skips headers.
"""
super().start()
# If data is None
if self.f is None:
# If dataname parameter has readline attribute, it means dataname is a data source, directly set f to data in parameter
if hasattr(self.p.dataname, "readline"):
self.f = self.p.dataname
# If no readline attribute, it means dataname is a path, open file based on path to get data
else:
# Let an exception propagate to let the caller know
self.f = open(self.p.dataname)
# If there are headers, read a line and skip headers
if self.p.headers:
self.f.readline() # skip the headers
# Separator for each line of data
self.separator = self.p.separator
# Stop
[文档]
def stop(self):
"""Stop the CSV data feed.
Closes the CSV file if open.
"""
super().stop()
# If data file is not None, close file and set to None
if self.f is not None:
self.f.close()
self.f = None
# Preload data
[文档]
def preload(self):
"""Preload all data from the CSV file.
Loads all available data and closes the file handle.
"""
# Load data
while self.load():
pass
# Settings after load is finished
self._last()
self.home()
# preloaded - no need to keep the object around - breaks multip in 3.x
# Close data file and set to None
self.f.close()
self.f = None
# Load a line of data
def _load(self):
# If data file is None, return False; if line cannot be read, return False; process line, call _loadline to load
if self.f is None:
return False
# Let an exception propagate to let the caller know
line = self.f.readline()
if not line:
return False
line = line.rstrip("\n")
linetokens = line.split(self.separator)
return self._loadline(linetokens)
# Get next line of data
def _getnextline(self):
# This function is very similar to previous one, just previous one gets linetokens with additional _loadline call
if self.f is None:
return None
# Let an exception propagate to let the caller know
line = self.f.readline()
if not line:
return None
line = line.rstrip("\n")
linetokens = line.split(self.separator)
return linetokens
[文档]
class CSVFeedBase(FeedBase):
"""Base class for CSV feed containers.
Manages CSV data feeds with support for base path prefixing.
"""
# Set parameters
[文档]
def __init__(self, basepath="", **kwargs):
"""Initialize the CSV feed base.
Args:
basepath: Base path to prepend to data file names.
**kwargs: Additional keyword arguments for parameters.
"""
self.basepath = basepath
# Merge CSVDataBase parameters
csv_params = {}
if hasattr(CSVDataBase, "params"):
csv_base_params = CSVDataBase.params
if isinstance(csv_base_params, (tuple, list)):
for param_tuple in csv_base_params:
if isinstance(param_tuple, (tuple, list)) and len(param_tuple) >= 2:
param_name, param_default = param_tuple[0], param_tuple[1]
csv_params[param_name] = kwargs.get(param_name, param_default)
kwargs.update(csv_params)
super().__init__(**kwargs)
# Get data
def _getdata(self, dataname, **kwargs):
final_kwargs = {}
if hasattr(self.p, "_getitems"):
for pname, pvalue in self.p._getitems():
final_kwargs[pname] = pvalue
elif hasattr(self.p, "__dict__"):
final_kwargs.update(self.p.__dict__)
final_kwargs.update(kwargs)
return self.DataCls(dataname=self.basepath + dataname, **final_kwargs)
# Data clone
[文档]
class DataClone(AbstractDataBase):
"""Clones an existing data feed.
Creates a new data feed that references an existing data feed.
Useful for creating multiple views of the same data with
different parameters or filters.
"""
# Set _clone attribute to True
_clone = True
# Initialize, data equals dataname parameter value, _datename equals data's _dataname attribute value
# Then copy date, time, trading interval, compression parameters
def __init__(self, *args, **kwargs):
"""Initialize the data clone.
Args:
*args: Positional arguments.
**kwargs: Keyword arguments, must include 'dataname' (the source data feed).
Raises:
ValueError: If 'dataname' parameter is not provided.
"""
# CRITICAL FIX: Initialize these attributes BEFORE calling super().__init__
# to ensure they exist when parent class methods access them
self._dlen = None
self._preloading = None
# Get dataname and set it as self.data
dataname = kwargs.get("dataname")
if dataname is None:
raise ValueError("DataClone requires 'dataname' parameter")
# CRITICAL FIX: Store data reference using object.__setattr__ to bypass
# any custom __setattr__ that might interfere
object.__setattr__(self, "data", dataname)
self._dataname = getattr(self.data, "_dataname", None)
# Copy date/session parameters from source data
if hasattr(self.data, "p"):
kwargs.setdefault("fromdate", getattr(self.data.p, "fromdate", None))
kwargs.setdefault("todate", getattr(self.data.p, "todate", None))
kwargs.setdefault("sessionstart", getattr(self.data.p, "sessionstart", None))
kwargs.setdefault("sessionend", getattr(self.data.p, "sessionend", None))
kwargs.setdefault("timeframe", getattr(self.data.p, "timeframe", None))
kwargs.setdefault("compression", getattr(self.data.p, "compression", None))
super().__init__(*args, **kwargs)
# CRITICAL FIX: Ensure self.data is still set after parent init
# Re-set it to be safe, in case parent class __init__ cleared attributes
if not hasattr(self, "data") or object.__getattribute__(self, "data") is None:
object.__setattr__(self, "data", dataname)
def _start(self):
# redefine to copy data bits from guest data
self.start()
# Copy tz infos
if hasattr(self.data, "_tz"):
self._tz = self.data._tz
self.lines.datetime._settz(self._tz)
if hasattr(self.data, "_calendar"):
self._calendar = self.data._calendar
# guest data have already converted input
self._tzinput = None # no need to further converr
# Copy dates/session infos
if hasattr(self.data, "fromdate"):
self.fromdate = self.data.fromdate
if hasattr(self.data, "todate"):
self.todate = self.data.todate
# FIXME: if removed from guest, remove here too
if hasattr(self.data, "sessionstart"):
self.sessionstart = self.data.sessionstart
if hasattr(self.data, "sessionend"):
self.sessionend = self.data.sessionend
# Start
[文档]
def start(self):
"""Start the data clone.
Initializes internal tracking variables.
"""
super().start()
self._dlen = 0
self._preloading = False
# Preload data
[文档]
def preload(self):
"""Preload data from the source data feed.
After preloading, resets the source data's position.
"""
self._preloading = True
super().preload()
if hasattr(self.data, "home"):
self.data.home() # preloading data was pushed forward
self._preloading = False
# Load data
def _load(self):
"""Load data from the source data feed.
Returns:
bool: True if data was loaded, False otherwise.
"""
# assumption: the data is in the system
# copy the lines
# If preparing to preload, run following code to copy specific data bit by bit
if self._preloading:
# data is preloaded, we are preloading too, can move
# forward until have full bar or a data source is exhausted
# Move data forward
if hasattr(self.data, "advance"):
self.data.advance()
# If current data is greater than data buffer length, return False
if len(self.data) > self.data.buflen():
return False
# If current data length is not greater than buffered data length, set line data to dline data
for line, dline in zip(self.lines, self.data.lines):
line[0] = dline[0]
# Return True after successful setting
return True
# Not preloading
# This syntax is not very efficient, changing to len(self.data)<=self._dlen might save one comparison
if len(self.data) <= self._dlen:
# if not (len(self.data) > self._dlen): # backtrader built-in
# Data not beyond last seen bar
return False
# Increase data length by 1
self._dlen += 1
# Set line data to dline data
for line, dline in zip(self.lines, self.data.lines):
line[0] = dline[0]
return True
# Move forward by size
[文档]
def advance(self, size=1, datamaster=None, ticks=True):
"""Advance the data clone by the specified size.
Args:
size: Number of bars to advance.
datamaster: Master data feed for synchronization (unused).
ticks: Whether to process tick data.
"""
self._dlen += size
super().advance(size, datamaster, ticks=ticks)