backtrader.feeds.rollover 源代码

#!/usr/bin/env python
"""Rollover Data Feed Module - Futures contract rollover.

This module provides the RollOver feed for automatically rolling over
to the next futures contract when conditions are met.

Classes:
    RollOver: Rolls over to the next future when conditions are met.

Example:
    >>> data_old = bt.feeds.BacktraderCSVData(dataname='contract_old.csv')
    >>> data_new = bt.feeds.BacktraderCSVData(dataname='contract_new.csv')
    >>> data = bt.feeds.RollOver(data_old, data_new, checkdate=my_check_func)
    >>> cerebro.adddata(data)
"""

from datetime import datetime

from ..feed import DataBase
from ..utils.date import Localizer


[文档] class RollOver(DataBase): # Roll over to the next future when conditions are met """Class that rolls over to the next future when a condition is met Params: - ``checkdate`` (default: ``None``) This must be a *callable* with the following signature:: Checkdate(dt, d): Where: - ``dt`` is a ``datetime.datetime`` object - ``d`` is the current data feed for the active future Expected Return Values: - ``True``: as long as the callable returns this, a switchover can happen to the next future If a commodity expires on the 3rd Friday of March, `checkdate` could return `True` for the entire week in which the expiration takes place. - ``False``: the expiration cannot take place # This parameter is a callable object checkdate(dt,d), where dt is a time object, d is current active data, # If return value is True, will switch to next contract; if False, will not switch to next contract - ``checkcondition`` (default: ``None``) **Note**: This will only be called if ``checkdate`` has returned ``True`` If ``None`` this will evaluate to ``True`` (execute roll over) internally Else this must be a *callable* with this signature:: Checkcondition(d0, d1) Where: - ``d0`` is the current data feed for the active future - ``d1`` is the data feed for the next expiration Expected Return Values: - ``True``: roll-over to the next future Following with the example from ``checkdate``, this could say that the roll-over can only happen if the *volume* from ``d0`` is already less than the volume from ``d1`` - ``False``: the expiration cannot take place # When checkdate returns True, this function will be called, this must be a callable object, checkcondition(d0,d1) # Where d0 is current active futures contract, d1 is next expiring contract, if True, will switch from d0 to d1, if not, switch will not happen. """ params = ( # ('rolls', []), # array of futures to roll over ("checkdate", None), # callable ("checkcondition", None), # callable )
[文档] def islive(self): # Make data live form, will avoid preloading and runonce """Returns ``True`` to notify ``Cerebro`` that preloading and runonce should be deactivated""" return True
def __init__(self, *args, **kwargs): """Initialize the RollOver data feed. Args: *args: Data feeds to roll over between. **kwargs: Keyword arguments for data feed configuration. """ # Handle timeframe and compression parameters, originally handled by metaclass if args: # Copy timeframe and compression from first data source kwargs.setdefault("timeframe", getattr(args[0], "_timeframe", None)) kwargs.setdefault("compression", getattr(args[0], "_compression", None)) super().__init__(**kwargs) # Prepare futures contracts for rollover self._dts = None self._dexp = None self._d = None self._ds = None self._rolls = args
[文档] def start(self): """Start the RollOver data feed. Initializes all data feeds for rollover functionality. """ super().start() # Loop through all data, prepare to start for d in self._rolls: d.setenvironment(self._env) d._start() # put the references in a separate list to have pops # todo Using list again here seems not very useful, because self._rolls is already list format self._ds = list(self._rolls) # First data self._d = self._ds.pop(0) if self._ds else None # Expiration data self._dexp = None # Here defaults a minimum time, when comparing with any time, will move self._dts = [datetime.min for xx in self._ds]
[文档] def stop(self): """Stop the RollOver data feed. Stops all underlying data feeds. """ # End data super().stop() for d in self._rolls: d.stop()
def _gettz(self): # Get specific timezone """To be overriden by subclasses which may auto-calculate the timezone""" if self._rolls: return self._rolls[0]._gettz() return Localizer(self.p.tz) def _checkdate(self, dt, d): # Calculate if current rollover conditions are met if self.p.checkdate is not None: return self.p.checkdate(dt, d) return False def _checkcondition(self, d0, d1): # Prepare to start rollover if self.p.checkcondition is not None: return self.p.checkcondition(d0, d1) return True def _load(self): # Method to load data while self._d is not None: # When self._d is not None, call next _next = self._d.next() # If _next value is None, continue calling next if _next is None: # no values yet, more will come continue # If _next value is False, current data switches to next data, if _next is False: # no values from current data src if self._ds: self._d = self._ds.pop(0) self._dts.pop(0) else: self._d = None continue # Current time of current data dt0 = self._d.datetime.datetime() # current dt for active data # Synchronize other datas using dt0 # Synchronize other data based on current time for i, d_dt in enumerate(zip(self._ds, self._dts)): d, dt = d_dt # If other data's time is less than current time, move other data forward, increase time, and save time to self._dts while dt < dt0: if d.next() is None: continue self._dts[i] = dt = d.datetime.datetime() # Move expired future as much as needed # Move expired data while self._dexp is not None: if not self._dexp.next(): self._dexp = None break if self._dexp.datetime.datetime() < dt0: continue if self._dexp is None and self._checkdate(dt0, self._d): # rule has been met ... check other factors only if 2 datas # still there if self._ds and self._checkcondition(self._d, self._ds[0]): # Time to switch to next data self._dexp = self._d self._d = self._ds.pop(0) self._dts.pop(0) # Fill the line and tell we die self.lines.datetime[0] = self._d.lines.datetime[0] self.lines.open[0] = self._d.lines.open[0] self.lines.high[0] = self._d.lines.high[0] self.lines.low[0] = self._d.lines.low[0] self.lines.close[0] = self._d.lines.close[0] self.lines.volume[0] = self._d.lines.volume[0] self.lines.openinterest[0] = self._d.lines.openinterest[0] return True # Out of the loop -> self._d is None, no data feed to return from return False