Skip to content

Commit

Permalink
VER: Release 0.23.1
Browse files Browse the repository at this point in the history
See release notes.
  • Loading branch information
nmacholl authored Nov 10, 2023
2 parents 2479d10 + 17f1a02 commit 91fe40a
Show file tree
Hide file tree
Showing 24 changed files with 528 additions and 49 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.23.1 - 2023-11-10

#### Enhancements
- Added new publishers for consolidated DBEQ.BASIC and DBEQ.PLUS

#### Bug fixes
- Fixed an issue where `Live.block_for_close` and `Live.wait_for_close` would not flush streams if the timeout was reached
- Fixed a performance regression when reading a historical DBN file into a numpy array

## 0.23.0 - 2023-10-26

#### Enhancements
Expand Down
139 changes: 119 additions & 20 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@
from io import BytesIO
from os import PathLike
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Literal, overload
from typing import (
IO,
TYPE_CHECKING,
Any,
BinaryIO,
Callable,
Literal,
Protocol,
overload,
)

import databento_dbn
import numpy as np
Expand Down Expand Up @@ -638,7 +647,7 @@ def from_file(cls, path: PathLike[str] | str) -> DBNStore:
Raises
------
FileNotFoundError
If a non-existant file is specified.
If a non-existent file is specified.
ValueError
If an empty file is specified.
Expand Down Expand Up @@ -1072,20 +1081,43 @@ def to_ndarray(
"""
schema = validate_maybe_enum(schema, Schema, "schema")
if schema is None:
if self.schema is None:
ndarray_iter: NDArrayIterator

if self.schema is None:
# If schema is None, we're handling heterogeneous data from the live client.
# This is less performant because the records of a given schema are not contiguous in memory.
if schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
schema = self.schema

dtype = SCHEMA_DTYPES_MAP[schema]
ndarray_iter = NDArrayIterator(
filter(lambda r: isinstance(r, SCHEMA_STRUCT_MAP[schema]), self),
dtype,
count,
)
schema_struct = SCHEMA_STRUCT_MAP[schema]
schema_dtype = SCHEMA_DTYPES_MAP[schema]
schema_filter = filter(lambda r: isinstance(r, schema_struct), self)

ndarray_iter = NDArrayBytesIterator(
records=map(bytes, schema_filter),
dtype=schema_dtype,
count=count,
)
else:
# If schema is set, we're handling homogeneous historical data.
schema_dtype = SCHEMA_DTYPES_MAP[self.schema]

if self._metadata.ts_out:
schema_dtype.append(("ts_out", "u8"))

if schema is not None and schema != self.schema:
# This is to maintain identical behavior with NDArrayBytesIterator
ndarray_iter = iter([np.empty([0, 1], dtype=schema_dtype)])
else:
ndarray_iter = NDArrayStreamIterator(
reader=self.reader,
dtype=schema_dtype,
offset=self._metadata_length,
count=count,
)

if count is None:
return next(ndarray_iter, np.empty([0, 1], dtype=dtype))
return next(ndarray_iter, np.empty([0, 1], dtype=schema_dtype))

return ndarray_iter

Expand Down Expand Up @@ -1124,10 +1156,66 @@ def _transcode(
transcoder.flush()


class NDArrayIterator:
class NDArrayIterator(Protocol):
@abc.abstractmethod
def __iter__(self) -> NDArrayIterator:
...

@abc.abstractmethod
def __next__(self) -> np.ndarray[Any, Any]:
...


class NDArrayStreamIterator(NDArrayIterator):
"""
Iterator for homogeneous byte streams of DBN records.
"""

def __init__(
self,
reader: IO[bytes],
dtype: list[tuple[str, str]],
offset: int = 0,
count: int | None = None,
) -> None:
self._reader = reader
self._dtype = np.dtype(dtype)
self._offset = offset
self._count = count

self._reader.seek(offset)

def __iter__(self) -> NDArrayStreamIterator:
return self

def __next__(self) -> np.ndarray[Any, Any]:
if self._count is None:
read_size = -1
else:
read_size = self._dtype.itemsize * max(self._count, 1)

if buffer := self._reader.read(read_size):
try:
return np.frombuffer(
buffer=buffer,
dtype=self._dtype,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
)

raise StopIteration


class NDArrayBytesIterator(NDArrayIterator):
"""
Iterator for heterogeneous streams of DBN records.
"""

def __init__(
self,
records: Iterator[DBNRecord],
records: Iterator[bytes],
dtype: list[tuple[str, str]],
count: int | None,
):
Expand All @@ -1144,22 +1232,33 @@ def __next__(self) -> np.ndarray[Any, Any]:
num_records = 0
for record in itertools.islice(self._records, self._count):
num_records += 1
record_bytes.write(bytes(record))
record_bytes.write(record)

if num_records == 0:
if self._first_next:
return np.empty([0, 1], dtype=self._dtype)
raise StopIteration

self._first_next = False
return np.frombuffer(
record_bytes.getvalue(),
dtype=self._dtype,
count=num_records,
)

try:
return np.frombuffer(
record_bytes.getbuffer(),
dtype=self._dtype,
count=num_records,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
)


class DataFrameIterator:
"""
Iterator for DataFrames that supports batching and column formatting for
DBN records.
"""

def __init__(
self,
records: Iterator[np.ndarray[Any, Any]],
Expand Down
35 changes: 35 additions & 0 deletions databento/common/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class Venue(StringyMixin, str, Enum):
ICE Futures Europe (Commodities).
NDEX
ICE Endex.
DBEQ
Databento Equities - Consolidated.
"""

Expand Down Expand Up @@ -137,6 +139,7 @@ class Venue(StringyMixin, str, Enum):
MXOP = "MXOP"
IFEU = "IFEU"
NDEX = "NDEX"
DBEQ = "DBEQ"

@classmethod
def from_int(cls, value: int) -> Venue:
Expand Down Expand Up @@ -221,6 +224,8 @@ def from_int(cls, value: int) -> Venue:
return Venue.IFEU
if value == 39:
return Venue.NDEX
if value == 40:
return Venue.DBEQ
raise ValueError(f"Integer value {value} does not correspond with any Venue variant")

def to_int(self) -> int:
Expand Down Expand Up @@ -305,6 +310,8 @@ def to_int(self) -> int:
return 38
if self == Venue.NDEX:
return 39
if self == Venue.DBEQ:
return 40
raise ValueError("Invalid Venue")

@property
Expand Down Expand Up @@ -390,6 +397,8 @@ def description(self) -> str:
return "ICE Futures Europe (Commodities)"
if self == Venue.NDEX:
return "ICE Endex"
if self == Venue.DBEQ:
return "Databento Equities - Consolidated"
raise ValueError("Unexpected Venue value")

@unique
Expand Down Expand Up @@ -805,6 +814,10 @@ class Publisher(StringyMixin, str, Enum):
ICE Futures Europe (Commodities).
NDEX_IMPACT_NDEX
ICE Endex.
DBEQ_BASIC_DBEQ
DBEQ Basic - Consolidated.
DBEQ_PLUS_DBEQ
DBEQ Plus - Consolidated.
"""

Expand Down Expand Up @@ -866,6 +879,8 @@ class Publisher(StringyMixin, str, Enum):
DBEQ_PLUS_FINC = "DBEQ.PLUS.FINC"
IFEU_IMPACT_IFEU = "IFEU.IMPACT.IFEU"
NDEX_IMPACT_NDEX = "NDEX.IMPACT.NDEX"
DBEQ_BASIC_DBEQ = "DBEQ.BASIC.DBEQ"
DBEQ_PLUS_DBEQ = "DBEQ.PLUS.DBEQ"

@classmethod
def from_int(cls, value: int) -> Publisher:
Expand Down Expand Up @@ -988,6 +1003,10 @@ def from_int(cls, value: int) -> Publisher:
return Publisher.IFEU_IMPACT_IFEU
if value == 58:
return Publisher.NDEX_IMPACT_NDEX
if value == 59:
return Publisher.DBEQ_BASIC_DBEQ
if value == 60:
return Publisher.DBEQ_PLUS_DBEQ
raise ValueError(f"Integer value {value} does not correspond with any Publisher variant")

def to_int(self) -> int:
Expand Down Expand Up @@ -1110,6 +1129,10 @@ def to_int(self) -> int:
return 57
if self == Publisher.NDEX_IMPACT_NDEX:
return 58
if self == Publisher.DBEQ_BASIC_DBEQ:
return 59
if self == Publisher.DBEQ_PLUS_DBEQ:
return 60
raise ValueError("Invalid Publisher")
@property
def venue(self) -> Venue:
Expand Down Expand Up @@ -1232,6 +1255,10 @@ def venue(self) -> Venue:
return Venue.IFEU
if self == Publisher.NDEX_IMPACT_NDEX:
return Venue.NDEX
if self == Publisher.DBEQ_BASIC_DBEQ:
return Venue.DBEQ
if self == Publisher.DBEQ_PLUS_DBEQ:
return Venue.DBEQ
raise ValueError("Unexpected Publisher value")
@property
def dataset(self) -> Dataset:
Expand Down Expand Up @@ -1354,6 +1381,10 @@ def dataset(self) -> Dataset:
return Dataset.IFEU_IMPACT
if self == Publisher.NDEX_IMPACT_NDEX:
return Dataset.NDEX_IMPACT
if self == Publisher.DBEQ_BASIC_DBEQ:
return Dataset.DBEQ_BASIC
if self == Publisher.DBEQ_PLUS_DBEQ:
return Dataset.DBEQ_PLUS
raise ValueError("Unexpected Publisher value")

@property
Expand Down Expand Up @@ -1477,4 +1508,8 @@ def description(self) -> str:
return "ICE Futures Europe (Commodities)"
if self == Publisher.NDEX_IMPACT_NDEX:
return "ICE Endex"
if self == Publisher.DBEQ_BASIC_DBEQ:
return "DBEQ Basic - Consolidated"
if self == Publisher.DBEQ_PLUS_DBEQ:
return "DBEQ Plus - Consolidated"
raise ValueError("Unexpected Publisher value")
8 changes: 4 additions & 4 deletions databento/common/symbology.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def insert_metadata(self, metadata: Metadata) -> None:
stype_out=stype_out,
)

self._insert_inverval(
self._insert_interval(
instrument_id,
MappingInterval(
start_date=start_date,
Expand Down Expand Up @@ -308,7 +308,7 @@ def insert_symbol_mapping_msg(
else:
symbol = msg.stype_out_symbol

self._insert_inverval(
self._insert_interval(
msg.hd.instrument_id,
MappingInterval(
start_date=pd.Timestamp(start_ts, unit="ns", tz="utc").date(),
Expand Down Expand Up @@ -383,7 +383,7 @@ def insert_json(
stype_out=stype_out,
)

self._insert_inverval(
self._insert_interval(
instrument_id,
MappingInterval(
start_date=start_date,
Expand Down Expand Up @@ -540,7 +540,7 @@ def map_symbols_json(

return out_file_valid

def _insert_inverval(self, instrument_id: int, interval: MappingInterval) -> None:
def _insert_interval(self, instrument_id: int, interval: MappingInterval) -> None:
"""
Insert a SymbolInterval into the map.
Expand Down
Loading

0 comments on commit 91fe40a

Please sign in to comment.