Source code for backtrader.feeds.csvgeneric

#!/usr/bin/env python
"""Generic CSV Data Feed Module - CSV file parsing.

This module provides the GenericCSVData feed for parsing CSV files
with customizable column mappings for backtesting.

Classes:
    GenericCSVData: Parses CSV files with configurable column mappings.

Example:
    >>> data = bt.feeds.GenericCSVData(
    ...     dataname='data.csv',
    ...     datetime=0,
    ...     open=1,
    ...     high=2,
    ...     low=3,
    ...     close=4,
    ...     volume=5
    ... )
    >>> cerebro.adddata(data)
"""

import math
from datetime import date, datetime, timezone

from .. import feed
from ..dataseries import TimeFrame
from ..utils import date2num
from ..utils.py3 import integer_types, string_types

# Python 3.11+ has datetime.UTC, earlier versions use timezone.utc
UTC = timezone.utc
_INF = float("inf")
_NEG_INF = float("-inf")
_FLOAT = float
_OBJECT_SETATTR = object.__setattr__
_HOURS_PER_DAY = 24.0
_MINUTES_PER_DAY = 1440.0
_SECONDS_PER_DAY = 86400.0
_DAYS_BEFORE_MONTH = (0, 0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334)
_DAYS_IN_MONTH = (0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)


def _parse_ymd_compact(
    date_text,
    time_text=None,
    fallback_format="%Y%m%d",
    time_has_seconds=True,
):
    if len(date_text) == 8 and date_text.isdigit():
        year = int(date_text[0:4])
        month = int(date_text[4:6])
        day = int(date_text[6:8])
        if time_text is None:
            return datetime(year, month, day)
        return _parse_time(
            year,
            month,
            day,
            time_text,
            date_text + "T" + time_text,
            fallback_format,
            time_has_seconds,
        )

    if time_text is None:
        return datetime.strptime(date_text, fallback_format)
    return datetime.strptime(date_text + "T" + time_text, fallback_format)


def _parse_ymd_separated(
    date_text,
    time_text=None,
    separator="-",
    fallback_format="%Y-%m-%d",
    time_has_seconds=True,
):
    if len(date_text) == 10 and date_text[4] == separator and date_text[7] == separator:
        year = int(date_text[0:4])
        month = int(date_text[5:7])
        day = int(date_text[8:10])
        if time_text is None:
            return datetime(year, month, day)
        return _parse_time(
            year,
            month,
            day,
            time_text,
            date_text + "T" + time_text,
            fallback_format,
            time_has_seconds,
        )

    if time_text is None:
        return datetime.strptime(date_text, fallback_format)
    return datetime.strptime(date_text + "T" + time_text, fallback_format)


def _parse_ymd_hms(date_text):
    if (
        len(date_text) == 19
        and date_text[4] == "-"
        and date_text[7] == "-"
        and date_text[10] == " "
        and date_text[13] == ":"
        and date_text[16] == ":"
    ):
        return datetime(
            int(date_text[0:4]),
            int(date_text[5:7]),
            int(date_text[8:10]),
            int(date_text[11:13]),
            int(date_text[14:16]),
            int(date_text[17:19]),
        )
    return datetime.strptime(date_text, "%Y-%m-%d %H:%M:%S")


def _parse_time(
    year,
    month,
    day,
    time_text,
    fallback_text,
    fallback_format,
    time_has_seconds,
):
    if not time_has_seconds and len(time_text) == 5 and time_text[2] == ":":
        return datetime(
            year,
            month,
            day,
            int(time_text[0:2]),
            int(time_text[3:5]),
        )

    if time_has_seconds and len(time_text) == 8 and time_text[2] == ":" and time_text[5] == ":":
        hour = int(time_text[0:2])
        minute = int(time_text[3:5])
        second = int(time_text[6:8])
        return datetime(year, month, day, hour, minute, second)

    return datetime.strptime(fallback_text, fallback_format)


def _parse_time_num(time_text, time_has_seconds):
    if not time_has_seconds and len(time_text) == 5 and time_text[2] == ":":
        return int(time_text[0:2]), int(time_text[3:5]), 0

    if time_has_seconds and len(time_text) == 8 and time_text[2] == ":" and time_text[5] == ":":
        return int(time_text[0:2]), int(time_text[3:5]), int(time_text[6:8])

    return None


def _ordinal_to_num(ordinal, hour=0, minute=0, second=0):
    return math.fsum(
        (
            float(ordinal),
            hour / _HOURS_PER_DAY,
            minute / _MINUTES_PER_DAY,
            second / _SECONDS_PER_DAY,
        )
    )


def _parse_ymd_compact_num(date_text, time_text=None, time_has_seconds=True):
    if len(date_text) != 8 or not date_text.isdigit():
        return None

    ordinal = date(
        int(date_text[0:4]),
        int(date_text[4:6]),
        int(date_text[6:8]),
    ).toordinal()
    if time_text is None:
        return float(ordinal)

    parsed_time = _parse_time_num(time_text, time_has_seconds)
    if parsed_time is None:
        return None

    return _ordinal_to_num(ordinal, *parsed_time)


def _parse_ymd_separated_num(date_text, time_text=None, separator="-", time_has_seconds=True):
    if not (len(date_text) == 10 and date_text[4] == separator and date_text[7] == separator):
        return None

    ordinal = date(
        int(date_text[0:4]),
        int(date_text[5:7]),
        int(date_text[8:10]),
    ).toordinal()
    if time_text is None:
        return float(ordinal)

    parsed_time = _parse_time_num(time_text, time_has_seconds)
    if parsed_time is None:
        return None

    return _ordinal_to_num(ordinal, *parsed_time)


def _parse_ymd_hms_num(date_text):
    if not (
        len(date_text) == 19
        and date_text[4] == "-"
        and date_text[7] == "-"
        and date_text[10] == " "
        and date_text[13] == ":"
        and date_text[16] == ":"
    ):
        return None

    ordinal = date(
        int(date_text[0:4]),
        int(date_text[5:7]),
        int(date_text[8:10]),
    ).toordinal()
    return _ordinal_to_num(
        ordinal,
        int(date_text[11:13]),
        int(date_text[14:16]),
        int(date_text[17:19]),
    )


def _build_datetime_parser(dtformat, tmformat, has_time):
    if has_time:
        fallback_format = dtformat + "T" + tmformat
        if dtformat == "%Y%m%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_compact(
                date_text,
                time_text,
                fallback_format,
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y-%m-%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated(
                date_text,
                time_text,
                "-",
                fallback_format,
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y.%m.%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated(
                date_text,
                time_text,
                ".",
                fallback_format,
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y/%m/%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated(
                date_text,
                time_text,
                "/",
                fallback_format,
                tmformat == "%H:%M:%S",
            )
        return lambda date_text, time_text: datetime.strptime(
            date_text + "T" + time_text,
            fallback_format,
        )

    if dtformat == "%Y%m%d":
        return lambda date_text, _: _parse_ymd_compact(date_text)
    if dtformat == "%Y-%m-%d":
        return lambda date_text, _: _parse_ymd_separated(date_text)
    if dtformat == "%Y.%m.%d":
        return lambda date_text, _: _parse_ymd_separated(
            date_text,
            separator=".",
            fallback_format="%Y.%m.%d",
        )
    if dtformat == "%Y/%m/%d":
        return lambda date_text, _: _parse_ymd_separated(
            date_text,
            separator="/",
            fallback_format="%Y/%m/%d",
        )
    if dtformat == "%Y-%m-%d %H:%M:%S":
        return lambda date_text, _: _parse_ymd_hms(date_text)
    return lambda date_text, _: datetime.strptime(date_text, dtformat)


def _build_datetime_num_parser(dtformat, tmformat, has_time):
    if has_time:
        if dtformat == "%Y%m%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_compact_num(
                date_text,
                time_text,
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y-%m-%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated_num(
                date_text,
                time_text,
                "-",
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y.%m.%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated_num(
                date_text,
                time_text,
                ".",
                tmformat == "%H:%M:%S",
            )
        if dtformat == "%Y/%m/%d" and tmformat in ("%H:%M", "%H:%M:%S"):
            return lambda date_text, time_text: _parse_ymd_separated_num(
                date_text,
                time_text,
                "/",
                tmformat == "%H:%M:%S",
            )
        return None

    if dtformat == "%Y%m%d":
        return lambda date_text, _: _parse_ymd_compact_num(date_text)
    if dtformat == "%Y-%m-%d":
        return lambda date_text, _: _parse_ymd_separated_num(date_text)
    if dtformat == "%Y.%m.%d":
        return lambda date_text, _: _parse_ymd_separated_num(date_text, separator=".")
    if dtformat == "%Y/%m/%d":
        return lambda date_text, _: _parse_ymd_separated_num(date_text, separator="/")
    if dtformat == "%Y-%m-%d %H:%M:%S":
        return lambda date_text, _: _parse_ymd_hms_num(date_text)
    return None


[docs] class GenericCSVData(feed.CSVDataBase): """Parses a CSV file according to the order and field presence defined by the parameters Specific parameters (or specific meaning): - ``dataname``: The filename to parse or a file-like object - The lines parameters (datetime, open, high ...) take numeric values A value of -1 indicates absence of that field in the CSV source - If ``time`` is present (parameter time >=0), the source contains separated fields for date and time, which will be combined - ``nullvalue`` Value that will be used if a value which should be there is missing (the CSV field is empty) - ``dtformat``: Format used to parse the datetime CSV field. See the python strptime/strftime documentation for the format. If a numeric value is specified, it will be interpreted as follows - ``1``: The value is a Unix timestamp of a type ``int`` representing the number of seconds since Jan 1st, 1970 - ``2``: The value is a Unix timestamp of a type ``float`` If a **callable** is passed - It will accept a string and return a `datetime.datetime` python instance - ``tmformat``: Format used to parse the time CSV field if "present" (the default for the "time" CSV field is not to be present) """ # Common parameters for csv data params = ( ("nullvalue", float("NaN")), ("dtformat", "%Y-%m-%d %H:%M:%S"), ("tmformat", "%H:%M:%S"), ("datetime", 0), ("time", -1), ("open", 1), ("high", 2), ("low", 3), ("close", 4), ("volume", 5), ("openinterest", 6), ) def __init__(self, *args, **kwargs): """Initialize the Generic CSV data feed. Args: *args: Positional arguments for data feed configuration. **kwargs: Keyword arguments for data feed configuration. """ super().__init__(*args, **kwargs) self._dtconvert = None self._dtstr = None self._has_time = None
[docs] def start(self): """Start the Generic CSV data feed. Sets up datetime conversion based on dtformat parameter. """ super().start() p = self.p self._datetime_idx = p.datetime self._time_idx = p.time self._timeframe = p.timeframe self._sessionend = p.sessionend self._datetime_line = self.lines.datetime self._nullvalue = p.nullvalue field_cache = [] missing_field_cache = [] direct_field_cache = [] direct_missing_field_cache = [] last_alias = self._getlinealias(0) for linefield in self.getlinealiases(): if linefield == "datetime": continue csvidx = getattr(p, linefield) line = getattr(self.lines, linefield) tick_name = "tick_" + linefield is_last = linefield == last_alias if csvidx is None or csvidx < 0: value = float(p.nullvalue) if value in (_INF, _NEG_INF): value = line._default_value missing_field_cache.append((line, value)) direct_missing_field_cache.append((line, value, tick_name, is_last)) else: field_cache.append((csvidx, line)) direct_field_cache.append((csvidx, line, tick_name, is_last)) self._field_cache = tuple(field_cache) self._missing_field_cache = tuple(missing_field_cache) self._direct_field_cache = tuple(direct_field_cache) self._direct_missing_field_cache = tuple(direct_missing_field_cache) # If string type, set self._dtstr to True, otherwise default is False self._dtstr = False if isinstance(p.dtformat, string_types): self._dtstr = True self._has_time = self._time_idx >= 0 if self._has_time and p.dtformat == "%Y%m%d" and p.tmformat == "%H:%M:%S": self._dt_num_fast = 1 elif self._has_time and p.dtformat == "%Y%m%d" and p.tmformat == "%H:%M": self._dt_num_fast = 2 else: self._dt_num_fast = 0 self._dtconvert = _build_datetime_parser( p.dtformat, p.tmformat, self._has_time, ) self._dtconvert_num = _build_datetime_num_parser( p.dtformat, p.tmformat, self._has_time, ) # If integer, set time conversion method based on different integer values elif isinstance(p.dtformat, integer_types): self._dtconvert_num = None self._dt_num_fast = 0 idt = int(p.dtformat) if idt == 1: # self._dtconvert = lambda x: datetime.utcfromtimestamp(int(x)) self._dtconvert = lambda x, _: datetime.fromtimestamp(int(x), UTC) elif idt == 2: # self._dtconvert = lambda x: datetime.utcfromtimestamp(float(x)) self._dtconvert = lambda x, _: datetime.fromtimestamp(float(x), UTC) # If dtformat is callable, conversion method is itself else: # assume callable dtformat = p.dtformat self._dtconvert = lambda x, _: dtformat(x) self._dtconvert_num = None self._dt_num_fast = 0
def _runnext_direct_load_ready(self): """Return whether Cerebro can call load() directly in single-data runnext.""" try: return object.__getattribute__(self, "_runnext_direct_load_ready_cache") except AttributeError: pass try: ready = ( type(self) is GenericCSVData and self.f is not None and object.__getattribute__(self, "_tzinput") is None and object.__getattribute__(self, "fromdate") == _NEG_INF and object.__getattribute__(self, "todate") == _INF and not self._filters and not self._barstack and not self._barstash and not self.resampling and not self.replaying and not self._clone ) except AttributeError: ready = False object.__setattr__(self, "_runnext_direct_load_ready_cache", ready) if ready: object.__setattr__(self, "_use_direct_csv_load", True) try: if self._runnext_direct_ymdhms_ohlcv_ready(): object.__setattr__( self, "_runnext_direct_load", self._load_direct_ymdhms_ohlcv, ) except AttributeError: pass return ready def _runnext_direct_ymdhms_ohlcv_ready(self): """Return whether the narrow runnext CSV loader can be used.""" try: return object.__getattribute__(self, "_runnext_direct_ymdhms_ohlcv_ready_cache") except AttributeError: pass p = self.p try: lines = ( self.lines.open, self.lines.high, self.lines.low, self.lines.close, self.lines.volume, self.lines.openinterest, self.lines.datetime, ) line0 = lines[0] line0_idx = line0._idx line0_lencount = line0.lencount ready = ( type(self) is GenericCSVData and self.separator == "," and self._dt_num_fast == 1 and self._timeframe < TimeFrame.Days and self._datetime_idx == 0 and self._time_idx == 1 and p.open == 2 and p.high == 3 and p.low == 4 and p.close == 5 and p.volume == 6 and (p.openinterest is None or p.openinterest < 0) and all(line.mode != line.QBuffer and line._clock is None for line in lines) and all(not line.bindings for line in lines) and lines[1]._idx == line0_idx and lines[1].lencount == line0_lencount and lines[2]._idx == line0_idx and lines[2].lencount == line0_lencount and lines[3]._idx == line0_idx and lines[3].lencount == line0_lencount and lines[4]._idx == line0_idx and lines[4].lencount == line0_lencount and lines[5]._idx == line0_idx and lines[5].lencount == line0_lencount and lines[6]._idx == line0_idx and lines[6].lencount == line0_lencount ) except AttributeError: ready = False lines = None object.__setattr__(self, "_runnext_direct_ymdhms_ohlcv_ready_cache", ready) if ready: object.__setattr__(self, "_fast_ymdhms_lines", lines) object.__setattr__( self, "_fast_ymdhms_appends", ( lines[0].array.append, lines[1].array.append, lines[2].array.append, lines[3].array.append, lines[4].array.append, lines[5].array.append, lines[6].array.append, ), ) object.__setattr__(self, "_fast_ymdhms_ohlcv_lines", lines[:6]) object.__setattr__( self, "_fast_ymdhms_ohlcv_appends", tuple(line.array.append for line in lines[:6]), ) object.__setattr__(self, "_fast_ymdhms_datetime_append", lines[6].array.append) object.__setattr__(self, "_fast_ymdhms_readline", self.f.readline) object.__setattr__(self, "_fast_ymdhms_openinterest_default", lines[5]._default_value) object.__setattr__(self, "_fast_ymdhms_tick_dict", self.__dict__) object.__setattr__(self, "_load_forward_lines", lines) object.__setattr__(self, "_use_direct_ymdhms_load", True) object.__setattr__(self, "_fast_ymdhms_ohlcv_fields", True) object.__setattr__(self, "_direct_ymdhms_last_datefield", "") object.__setattr__(self, "_direct_ymdhms_last_ordinal", 0) object.__setattr__(self, "_direct_ymdhms_time_fractions", {}) return ready def _load_direct_ymdhms_ohlcv(self, _float=_FLOAT, _inf=_INF, _neg_inf=_NEG_INF): """Load a standard YMD/HMS OHLCV CSV row for single-data runnext.""" ( open_line, high_line, low_line, close_line, volume_line, openinterest_line, datetime_line, ) = self._fast_ymdhms_lines ( open_append, high_append, low_append, close_append, volume_append, openinterest_append, datetime_append, ) = self._fast_ymdhms_appends line = self._fast_ymdhms_readline() if not line: return False try: direct_layout = ( line[8] == "," and line[17] == "," and line[11] == ":" and line[14] == ":" ) except IndexError: direct_layout = False if not direct_layout: open_line._idx += 1 open_line.lencount += 1 open_append(open_line._default_value) high_line._idx += 1 high_line.lencount += 1 high_append(high_line._default_value) low_line._idx += 1 low_line.lencount += 1 low_append(low_line._default_value) close_line._idx += 1 close_line.lencount += 1 close_append(close_line._default_value) volume_line._idx += 1 volume_line.lencount += 1 volume_append(volume_line._default_value) openinterest_line._idx += 1 openinterest_line.lencount += 1 openinterest_append(openinterest_line._default_value) datetime_line._idx += 1 datetime_line.lencount += 1 datetime_append(datetime_line._default_value) loadret = self._loadline(line.rstrip("\n").split(self.separator)) if not loadret: self.backwards(force=True) return loadret return True linetokens = line.split(",", 7) datefield = line[:8] if datefield == self._direct_ymdhms_last_datefield: ordinal = self._direct_ymdhms_last_ordinal else: year = int(line[0:4]) month = int(line[4:6]) day = int(line[6:8]) year_minus_one = year - 1 leap = year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) if ( year < 1 or month < 1 or month > 12 or day < 1 or day > (_DAYS_IN_MONTH[month] + (1 if month == 2 and leap else 0)) ): date(year, month, day).toordinal() ordinal = ( year_minus_one * 365 + year_minus_one // 4 - year_minus_one // 100 + year_minus_one // 400 + _DAYS_BEFORE_MONTH[month] + day ) if month > 2 and leap: ordinal += 1 _OBJECT_SETATTR(self, "_direct_ymdhms_last_datefield", datefield) _OBJECT_SETATTR(self, "_direct_ymdhms_last_ordinal", ordinal) timefield = line[9:17] time_fractions = self._direct_ymdhms_time_fractions try: day_fraction = time_fractions[timefield] except KeyError: seconds = int(line[9:11]) * 3600 + int(line[12:14]) * 60 + int(line[15:17]) day_fraction = seconds / _SECONDS_PER_DAY time_fractions[timefield] = day_fraction dtnum = ordinal + day_fraction nullvalue = self._nullvalue try: open_value = _float(linetokens[2]) high_value = _float(linetokens[3]) low_value = _float(linetokens[4]) close_value = _float(linetokens[5]) volume_value = _float(linetokens[6]) except ValueError: open_value = _float(linetokens[2] or nullvalue) high_value = _float(linetokens[3] or nullvalue) low_value = _float(linetokens[4] or nullvalue) close_value = _float(linetokens[5] or nullvalue) volume_value = _float(linetokens[6] or nullvalue) if open_value in (_inf, _neg_inf): open_value = open_line._default_value if high_value in (_inf, _neg_inf): high_value = high_line._default_value if low_value in (_inf, _neg_inf): low_value = low_line._default_value if close_value in (_inf, _neg_inf): close_value = close_line._default_value if volume_value in (_inf, _neg_inf): volume_value = volume_line._default_value openinterest_value = self._fast_ymdhms_openinterest_default datetime_value = dtnum if dtnum >= 1.0 else 1.0 next_idx = open_line._idx + 1 next_lencount = open_line.lencount + 1 open_line._idx = next_idx open_line.lencount = next_lencount open_append(open_value) high_line._idx = next_idx high_line.lencount = next_lencount high_append(high_value) low_line._idx = next_idx low_line.lencount = next_lencount low_append(low_value) close_line._idx = next_idx close_line.lencount = next_lencount close_append(close_value) volume_line._idx = next_idx volume_line.lencount = next_lencount volume_append(volume_value) openinterest_line._idx = next_idx openinterest_line.lencount = next_lencount openinterest_append(openinterest_value) datetime_line._idx = next_idx datetime_line.lencount = next_lencount datetime_append(datetime_value) tick_values = self._fast_ymdhms_tick_dict tick_values["tick_open"] = open_value tick_values["tick_high"] = high_value tick_values["tick_low"] = low_value tick_values["tick_close"] = close_value tick_values["tick_volume"] = volume_value tick_values["tick_openinterest"] = openinterest_value tick_values["tick_last"] = close_value tick_values["_tick_direct_filled"] = True return True
[docs] def load(self): """Load one CSV bar through a narrow no-filter fast path.""" try: use_direct_csv_load = object.__getattribute__(self, "_use_direct_csv_load") except AttributeError: use_direct_csv_load = ( object.__getattribute__(self, "_tzinput") is None and object.__getattribute__(self, "fromdate") == _NEG_INF and object.__getattribute__(self, "todate") == _INF and not self._filters and not self._barstack and not self._barstash ) object.__setattr__(self, "_use_direct_csv_load", use_direct_csv_load) if not use_direct_csv_load or self._filters or self._barstack or self._barstash: return super().load() try: forward_lines = self._load_forward_lines except AttributeError: try: lines = self.lines.lines if any(line.mode == line.QBuffer or line._clock is not None for line in lines): self._load_forward_lines = None forward_lines = None else: forward_lines = tuple(lines) self._load_forward_lines = forward_lines except AttributeError: self._load_forward_lines = None forward_lines = None if forward_lines is None: self.forward() else: for line in forward_lines: line._idx += 1 line.lencount += 1 line.array.append(line._default_value) f = self.f if f is None: self.backwards(force=True) return False line = f.readline() if not line: self.backwards(force=True) return False linetokens = None if forward_lines is not None: try: use_direct_ymdhms_load = object.__getattribute__(self, "_use_direct_ymdhms_load") except AttributeError: use_direct_ymdhms_load = ( type(self) is GenericCSVData and self._dt_num_fast == 1 and self._timeframe < TimeFrame.Days and not self._datetime_line.bindings and all(not line.bindings for _, line, _, _ in self._direct_field_cache) and all(not line.bindings for line, _, _, _ in self._direct_missing_field_cache) ) object.__setattr__(self, "_use_direct_ymdhms_load", use_direct_ymdhms_load) if use_direct_ymdhms_load: linetokens = line.split(self.separator) dtfield = linetokens[self._datetime_idx] timefield = linetokens[self._time_idx] if ( dtfield[8:9] == "" and timefield[8:9] == "" and timefield[2:3] == ":" and timefield[5:6] == ":" ): try: year = int(dtfield[0:4]) month = int(dtfield[4:6]) day = int(dtfield[6:8]) hour = int(timefield[0:2]) minute = int(timefield[3:5]) second = int(timefield[6:8]) except ValueError: pass else: year_minus_one = year - 1 leap = year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) if ( year < 1 or month < 1 or month > 12 or day < 1 or day > (_DAYS_IN_MONTH[month] + (1 if month == 2 and leap else 0)) ): date(year, month, day).toordinal() ordinal = ( year_minus_one * 365 + year_minus_one // 4 - year_minus_one // 100 + year_minus_one // 400 + _DAYS_BEFORE_MONTH[month] + day ) if month > 2 and leap: ordinal += 1 dtnum = ( float(ordinal) + (hour * 3600 + minute * 60 + second) / _SECONDS_PER_DAY ) line_datetime = self._datetime_line datetime_idx = line_datetime._idx if datetime_idx < 0: line_datetime[0] = dtnum else: try: line_datetime.array[datetime_idx] = dtnum if dtnum >= 1.0 else 1.0 except IndexError: line_datetime[0] = dtnum nullvalue = self._nullvalue set_attr = object.__setattr__ try: fast_ymdhms_ohlcv_fields = self._fast_ymdhms_ohlcv_fields except AttributeError: p = self.p fast_ymdhms_ohlcv_fields = ( self.separator == "," and self._datetime_idx == 0 and self._time_idx == 1 and p.open == 2 and p.high == 3 and p.low == 4 and p.close == 5 and p.volume == 6 and (p.openinterest is None or p.openinterest < 0) ) object.__setattr__( self, "_fast_ymdhms_ohlcv_fields", fast_ymdhms_ohlcv_fields, ) if fast_ymdhms_ohlcv_fields: object.__setattr__( self, "_fast_ymdhms_ohlcv_lines", ( self.lines.open, self.lines.high, self.lines.low, self.lines.close, self.lines.volume, self.lines.openinterest, ), ) if fast_ymdhms_ohlcv_fields: try: open_value = float(linetokens[2] or nullvalue) high_value = float(linetokens[3] or nullvalue) low_value = float(linetokens[4] or nullvalue) close_value = float(linetokens[5] or nullvalue) volume_value = float(linetokens[6] or nullvalue) except (IndexError, ValueError, TypeError): pass else: ( open_line, high_line, low_line, close_line, volume_line, openinterest_line, ) = self._fast_ymdhms_ohlcv_lines if open_value in (_INF, _NEG_INF): open_value = open_line._default_value if high_value in (_INF, _NEG_INF): high_value = high_line._default_value if low_value in (_INF, _NEG_INF): low_value = low_line._default_value if close_value in (_INF, _NEG_INF): close_value = close_line._default_value if volume_value in (_INF, _NEG_INF): volume_value = volume_line._default_value openinterest_value = openinterest_line._default_value open_line.array[open_line._idx] = open_value high_line.array[high_line._idx] = high_value low_line.array[low_line._idx] = low_value close_line.array[close_line._idx] = close_value volume_line.array[volume_line._idx] = volume_value openinterest_line.array[openinterest_line._idx] = openinterest_value set_attr(self, "tick_open", open_value) set_attr(self, "tick_high", high_value) set_attr(self, "tick_low", low_value) set_attr(self, "tick_close", close_value) set_attr(self, "tick_volume", volume_value) set_attr(self, "tick_openinterest", openinterest_value) set_attr(self, "tick_last", close_value) set_attr(self, "_tick_direct_filled", True) return True tick_last = None for csvidx, field_line, tick_name, is_last in self._direct_field_cache: csvfield = linetokens[csvidx] if csvfield == "": csvfield = nullvalue value = float(csvfield) if value in (_INF, _NEG_INF): value = field_line._default_value field_idx = field_line._idx if field_idx < 0: field_line[0] = value else: try: field_line.array[field_idx] = value except IndexError: field_line[0] = value set_attr(self, tick_name, value) if is_last: tick_last = value for ( field_line, value, tick_name, is_last, ) in self._direct_missing_field_cache: field_idx = field_line._idx if field_idx < 0: field_line[0] = value else: try: field_line.array[field_idx] = value except IndexError: field_line[0] = value set_attr(self, tick_name, value) if is_last: tick_last = value if tick_last is None: tick_last = self._datetime_line.array[datetime_idx] set_attr(self, "tick_last", tick_last) set_attr(self, "_tick_direct_filled", True) return True linetokens = line.rstrip("\n").split(self.separator) loadret = self._loadline(linetokens) if not loadret: self.backwards(force=True) return loadret return True
# After reading csv file line, split line's data into linetokens, then further processing def _loadline(self, linetokens): line_datetime = self._datetime_line # Datetime needs special treatment # First get specific date based on datetime order dtfield = linetokens[self._datetime_idx] timefield = linetokens[self._time_idx] if self._has_time else None dtnum = None if not self._tzinput and self._timeframe < TimeFrame.Days: dt_num_fast = self._dt_num_fast if dt_num_fast == 1: if ( dtfield[8:9] == "" and timefield[8:9] == "" and timefield[2:3] == ":" and timefield[5:6] == ":" ): try: year = int(dtfield[0:4]) month = int(dtfield[4:6]) day = int(dtfield[6:8]) hour = int(timefield[0:2]) minute = int(timefield[3:5]) second = int(timefield[6:8]) except ValueError: pass else: year_minus_one = year - 1 leap = year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) if ( year < 1 or month < 1 or month > 12 or day < 1 or day > (_DAYS_IN_MONTH[month] + (1 if month == 2 and leap else 0)) ): date(year, month, day).toordinal() ordinal = ( year_minus_one * 365 + year_minus_one // 4 - year_minus_one // 100 + year_minus_one // 400 + _DAYS_BEFORE_MONTH[month] + day ) if month > 2 and leap: ordinal += 1 seconds = hour * 3600 + minute * 60 + second dtnum = float(ordinal) + seconds / _SECONDS_PER_DAY elif dt_num_fast == 2: if dtfield[8:9] == "" and timefield[5:6] == "" and timefield[2:3] == ":": try: year = int(dtfield[0:4]) month = int(dtfield[4:6]) day = int(dtfield[6:8]) hour = int(timefield[0:2]) minute = int(timefield[3:5]) except ValueError: pass else: year_minus_one = year - 1 leap = year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) if ( year < 1 or month < 1 or month > 12 or day < 1 or day > (_DAYS_IN_MONTH[month] + (1 if month == 2 and leap else 0)) ): date(year, month, day).toordinal() ordinal = ( year_minus_one * 365 + year_minus_one // 4 - year_minus_one // 100 + year_minus_one // 400 + _DAYS_BEFORE_MONTH[month] + day ) if month > 2 and leap: ordinal += 1 seconds = hour * 3600 + minute * 60 dtnum = float(ordinal) + seconds / _SECONDS_PER_DAY else: dtconvert_num = self._dtconvert_num if dtconvert_num is not None: dtnum = dtconvert_num(dtfield, timefield) if dtnum is None: dt = self._dtconvert(dtfield, timefield) # If trading interval is greater than or equal to day if self._timeframe >= TimeFrame.Days: # check if the expected end of session is larger than parsed # If _tzinput is True, need to localize date, otherwise date remains original if self._tzinput: dtin = self._tzinput.localize(dt) # pytz compatible-ized else: dtin = dt # Use date2num to convert date to number dtnum = date2num(dtin) # utc'ize # Combine date and sessionend, convert to number dteos = datetime.combine(dt.date(), self._sessionend) dteosnum = self.date2num(dteos) # utc'ize # If number converted from combined sessionend date is greater than converted date number, use former number as time if dteosnum > dtnum: dtnum = dteosnum # If not greater, if self._tzinput is True, directly convert dt to time, if not True, use original dtnum else: # Avoid reconversion if already converted dtin == dt dtnum = date2num(dt) if self._tzinput else dtnum # If trading cycle is less than day, convert time directly else: dtnum = date2num(dt) if line_datetime.bindings: line_datetime[0] = dtnum else: idx = line_datetime._idx if idx < 0: line_datetime[0] = dtnum else: try: line_datetime.array[idx] = dtnum if dtnum >= 1.0 else 1.0 except IndexError: line_datetime[0] = dtnum if not self._tzinput and (dtnum < self.fromdate or dtnum > self.todate): return True # Process cached fields nullvalue = self._nullvalue for csvidx, line in self._field_cache: csvfield = linetokens[csvidx] if csvfield == "": csvfield = nullvalue value = float(csvfield) if value in (_INF, _NEG_INF): value = line._default_value if line.bindings: line[0] = value continue idx = line._idx if idx < 0: line[0] = value continue try: line.array[idx] = value except IndexError: line[0] = value for line, value in self._missing_field_cache: if line.bindings: line[0] = value continue idx = line._idx if idx < 0: line[0] = value continue try: line.array[idx] = value except IndexError: line[0] = value return True
[docs] class GenericCSV(feed.CSVFeedBase): """Generic CSV feed class. Wrapper class for GenericCSVData feed functionality. """ DataCls = GenericCSVData