Skip to content

Commit

Permalink
VER: Release 0.43.1
Browse files Browse the repository at this point in the history
See release notes.
  • Loading branch information
nmacholl authored Oct 15, 2024
2 parents dba53d7 + f7cf43b commit fd8276b
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 11 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.43.1 - 2024-10-15

#### Enhancements
- Keyword arguments to `DBNStore.to_parquet` will now allow `where` and `schema` to be specified
- Improved record processing time for the `Live` client

#### Bug fixes
- Fixed an issue where validating the checksum of a batch file loaded the entire file into memory

## 0.43.0 - 2024-10-09

This release drops support for Python 3.8 which has reached end-of-life.
Expand Down
15 changes: 10 additions & 5 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ def to_df(
def to_parquet(
self,
path: PathLike[str] | str,
price_type: Literal["fixed", "float"] = "float",
price_type: PriceType | str = PriceType.FLOAT,
pretty_ts: bool = True,
map_symbols: bool = True,
schema: Schema | str | None = None,
Expand Down Expand Up @@ -992,6 +992,9 @@ def to_parquet(
This is only required when reading a DBN stream with mixed record types.
mode : str, default "w"
The file write mode to use, either "x" or "w".
**kwargs : Any
Keyword arguments to pass to the `pyarrow.parquet.ParquetWriter`.
These can be used to override the default behavior of the writer.
Raises
------
Expand All @@ -1000,10 +1003,12 @@ def to_parquet(
If the DBN schema is unspecified and cannot be determined.
"""
if price_type == "decimal":
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
price_type = validate_enum(price_type, PriceType, "price_type")

if price_type == PriceType.DECIMAL:
raise ValueError("the 'decimal' price type is not currently supported")

file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
schema = validate_maybe_enum(schema, Schema, "schema")
if schema is None:
if self.schema is None:
Expand All @@ -1025,8 +1030,8 @@ def to_parquet(
# Initialize the writer using the first DataFrame
parquet_schema = pa.Schema.from_pandas(frame)
writer = pq.ParquetWriter(
where=file_path,
schema=parquet_schema,
where=kwargs.pop("where", file_path),
schema=kwargs.pop("schema", parquet_schema),
**kwargs,
)
writer.write_table(
Expand Down
6 changes: 5 additions & 1 deletion databento/historical/api/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,11 @@ def _download_batch_file(
hash_algo, _, hash_hex = batch_download_file.hash_str.partition(":")

if hash_algo == "sha256":
output_hash = hashlib.sha256(output_path.read_bytes())
output_hash = hashlib.new(hash_algo)
with open(output_path, "rb") as fd:
while chunk := fd.read(32_000_000):
output_hash.update(chunk)

if output_hash.hexdigest() != hash_hex:
warn_msg = f"Downloaded file failed checksum validation: {output_path.name}"
logger.warning(warn_msg)
Expand Down
6 changes: 3 additions & 3 deletions databento/live/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def __init__(
self._metadata: SessionMetadata = metadata
self._user_callbacks = user_callbacks
self._user_streams = user_streams
self._last_ts_event: pd.Timestamp | None = None
self._last_ts_event: int | None = None

def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
if self._metadata:
Expand Down Expand Up @@ -228,7 +228,7 @@ def received_record(self, record: DBNRecord) -> None:
self._dispatch_callbacks(record)
if self._dbn_queue.is_enabled():
self._queue_for_iteration(record)
self._last_ts_event = record.pretty_ts_event
self._last_ts_event = record.ts_event

return super().received_record(record)

Expand Down Expand Up @@ -653,7 +653,7 @@ async def _reconnect(self) -> None:
if self._protocol._last_ts_event is not None:
gap_start = pd.Timestamp(self._protocol._last_ts_event, tz="UTC")
elif self._metadata.data is not None:
gap_start = self._metadata.data.start
gap_start = pd.Timestamp(self._metadata.data.start, tz="UTC")
else:
gap_start = pd.Timestamp.utcnow()

Expand Down
2 changes: 1 addition & 1 deletion databento/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.43.0"
__version__ = "0.43.1"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databento"
version = "0.43.0"
version = "0.43.1"
description = "Official Python client library for Databento"
authors = [
"Databento <[email protected]>",
Expand Down
27 changes: 27 additions & 0 deletions tests/test_historical_bento.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,33 @@ def test_to_parquet(
pd.testing.assert_frame_equal(actual, expected)


def test_to_parquet_kwargs(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
test_data: Callable[[Dataset, Schema], bytes],
) -> None:
# Arrange
monkeypatch.setattr(databento.common.dbnstore, "PARQUET_CHUNK_SIZE", 1)
stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO)
data = DBNStore.from_bytes(data=stub_data)
parquet_file = tmp_path / "test.parquet"

# Act
expected = data.to_df()
data.to_parquet(
parquet_file,
compression="zstd",
write_statistics="false",
)
actual = pd.read_parquet(parquet_file)

# Replace None values with np.nan
actual.fillna(value=np.nan)

# Assert
pd.testing.assert_frame_equal(actual, expected)


@pytest.mark.parametrize(
"expected_schema",
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
Expand Down

0 comments on commit fd8276b

Please sign in to comment.