Skip to content

Commit

Permalink
VER: Release 0.34.0
Browse files Browse the repository at this point in the history
See release notes.
  • Loading branch information
nmacholl authored May 14, 2024
2 parents 290b82e + 2aaef36 commit 7237474
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 106 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## 0.34.0 - 2024-05-14

#### Enhancements
- Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy
- Iteration of the `Live` client will now automatically call `Live.stop` when the iterator is destroyed, such as when a for loop is escaped with an exception or `break` statement.

#### Bug fixes
- Fixed an issue where `batch.download` and `batch.download_async` would fail if requested files already existed in the output directory
- Fixed an issue where `batch.download`, `batch.download_async`, and `timeseries.get_range` could use a lot of memory while streaming data
- Fixed an issue where reusing a `Live` client with an open output stream would drop DBN records when received at the same time as the `Metadata` header

## 0.33.0 - 2024-04-16

#### Enhancements
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The minimum dependencies as found in the `pyproject.toml` are also listed below:
- databento-dbn = "0.17.1"
- numpy= ">=1.23.5"
- pandas = ">=1.5.3"
- pip-system-certs = ">=4.0" (Windows only)
- pyarrow = ">=13.0.0"
- requests = ">=2.24.0"
- zstandard = ">=0.21.0"
Expand Down
2 changes: 2 additions & 0 deletions databento/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
x[0]: np.iinfo(x[1]).max for x in InstrumentDefMsg._dtypes if not isinstance(x[1], str)
}

HTTP_STREAMING_READ_SIZE: Final = 2**12

SCHEMA_STRUCT_MAP: Final[dict[Schema, type[DBNRecord]]] = {
Schema.DEFINITION: InstrumentDefMsg,
Schema.IMBALANCE: ImbalanceMsg,
Expand Down
2 changes: 1 addition & 1 deletion databento/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class RecordFlags(StringyMixin, IntFlag): # type: ignore
Represents record flags.
F_LAST
Last message in the packet from the venue for a given `instrument_id`.
Marks the last record in a single event for a given `instrument_id`.
F_TOB
Indicates a top-of-book message, not an individual order.
F_SNAPSHOT
Expand Down
47 changes: 29 additions & 18 deletions databento/historical/api/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from databento_dbn import SType
from requests.auth import HTTPBasicAuth

from databento.common.constants import HTTP_STREAMING_READ_SIZE
from databento.common.enums import Delivery
from databento.common.enums import Packaging
from databento.common.enums import SplitDuration
Expand Down Expand Up @@ -394,8 +395,16 @@ def _download_batch_file(
headers: dict[str, str] = self._headers.copy()
if output_path.exists():
existing_size = output_path.stat().st_size
headers["Range"] = f"bytes={existing_size}-{batch_download_file.size - 1}"
mode = "ab"
if existing_size < batch_download_file.size:
headers["Range"] = f"bytes={existing_size}-{batch_download_file.size - 1}"
mode = "ab"
elif existing_size == batch_download_file.size:
# File exists and is complete
break
else:
raise FileExistsError(
f"Batch file {output_path.name} already exists and has a larger than expected size.",
)
else:
mode = "wb"
try:
Expand All @@ -408,7 +417,7 @@ def _download_batch_file(
) as response:
check_http_error(response)
with open(output_path, mode=mode) as f:
for chunk in response.iter_content(chunk_size=None):
for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE):
f.write(chunk)
except BentoHttpError as exc:
if exc.http_status == 429:
Expand All @@ -424,24 +433,26 @@ def _download_batch_file(
attempts += 1
continue # try again
raise BentoError(f"Error downloading file: {exc}") from None
else:
break

logger.debug("Download of %s completed", output_path.name)
hash_algo, _, hash_hex = batch_download_file.hash_str.partition(":")
logger.debug("Download of %s completed", output_path.name)
hash_algo, _, hash_hex = batch_download_file.hash_str.partition(":")

if hash_algo == "sha256":
output_hash = hashlib.sha256(output_path.read_bytes())
if output_hash.hexdigest() != hash_hex:
warn_msg = f"Downloaded file failed checksum validation: {output_path.name}"
logger.warning(warn_msg)
warnings.warn(warn_msg, category=BentoWarning)
else:
logger.warning(
"Skipping %s checksum because %s is not supported",
output_path.name,
hash_algo,
)
if hash_algo == "sha256":
output_hash = hashlib.sha256(output_path.read_bytes())
if output_hash.hexdigest() != hash_hex:
warn_msg = f"Downloaded file failed checksum validation: {output_path.name}"
logger.warning(warn_msg)
warnings.warn(warn_msg, category=BentoWarning)
else:
logger.warning(
"Skipping %s checksum because %s is not supported",
output_path.name,
hash_algo,
)

return output_path
return output_path


@dataclass
Expand Down
6 changes: 4 additions & 2 deletions databento/historical/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os import PathLike
from typing import IO
from typing import Any
from typing import Final

import aiohttp
import requests
Expand All @@ -16,6 +17,7 @@
from requests import Response
from requests.auth import HTTPBasicAuth

from databento.common.constants import HTTP_STREAMING_READ_SIZE
from databento.common.dbnstore import DBNStore
from databento.common.error import BentoClientError
from databento.common.error import BentoDeprecationWarning
Expand All @@ -25,7 +27,7 @@
from databento.common.system import USER_AGENT


WARNING_HEADER_FIELD: str = "X-Warning"
WARNING_HEADER_FIELD: Final = "X-Warning"


class BentoHttpAPI:
Expand Down Expand Up @@ -137,7 +139,7 @@ def _stream(
writer = open(path, "x+b")

try:
for chunk in response.iter_content(chunk_size=None):
for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE):
writer.write(chunk)
except Exception as exc:
raise BentoError(f"Error streaming response: {exc}") from None
Expand Down
157 changes: 93 additions & 64 deletions databento/live/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,78 +115,17 @@ def factory() -> _SessionProtocol:
if not Live._thread.is_alive():
Live._thread.start()

def __aiter__(self) -> Live:
def __aiter__(self) -> LiveIterator:
return iter(self)

async def __anext__(self) -> DBNRecord:
if not self._dbn_queue.is_enabled():
raise ValueError("iteration has not started")

loop = asyncio.get_running_loop()

try:
return self._dbn_queue.get_nowait()
except queue.Empty:
while True:
try:
return await loop.run_in_executor(
None,
self._dbn_queue.get,
True,
0.1,
)
except queue.Empty:
if self._session.is_disconnected():
break
finally:
if not self._dbn_queue.is_full() and not self._session.is_reading():
logger.debug(
"resuming reading with %d pending records",
self._dbn_queue.qsize(),
)
self._session.resume_reading()

self._dbn_queue.disable()
await self.wait_for_close()
logger.debug("completed async iteration")
raise StopAsyncIteration

def __iter__(self) -> Live:
def __iter__(self) -> LiveIterator:
logger.debug("starting iteration")
if self._session.is_started():
logger.error("iteration started after session has started")
raise ValueError(
"Cannot start iteration after streaming has started, records may be missed. Don't call `Live.start` before iterating.",
)
elif self.is_connected():
self.start()
self._dbn_queue._enabled.set()
return self

def __next__(self) -> DBNRecord:
if not self._dbn_queue.is_enabled():
raise ValueError("iteration has not started")

while True:
try:
record = self._dbn_queue.get(timeout=0.1)
except queue.Empty:
if self._session.is_disconnected():
break
else:
return record
finally:
if not self._dbn_queue.is_full() and not self._session.is_reading():
logger.debug(
"resuming reading with %d pending records",
self._dbn_queue.qsize(),
)
self._session.resume_reading()

self._dbn_queue.disable()
self.block_for_close()
logger.debug("completed iteration")
raise StopIteration
return LiveIterator(self)

def __repr__(self) -> str:
name = self.__class__.__name__
Expand Down Expand Up @@ -661,3 +600,93 @@ def _map_symbol(self, record: DBNRecord) -> None:
instrument_id = record.instrument_id
self._symbology_map[instrument_id] = record.stype_out_symbol
logger.info("added symbology mapping %s to %d", out_symbol, instrument_id)


class LiveIterator:
"""
Iterator class for the `Live` client. Automatically starts the client when
created and will stop it when destroyed. This provides context-manager-like
behavior to for loops.
Parameters
----------
client : Live
The Live client that spawned this LiveIterator.
"""

def __init__(self, client: Live):
client._dbn_queue._enabled.set()
client.start()
self._client = client

@property
def client(self) -> Live:
return self._client

def __iter__(self) -> LiveIterator:
return self

def __del__(self) -> None:
if self.client.is_connected():
self.client.stop()
self.client.block_for_close()
logger.debug("iteration aborted")

async def __anext__(self) -> DBNRecord:
if not self.client._dbn_queue.is_enabled():
raise ValueError("iteration has not started")

loop = asyncio.get_running_loop()

try:
return self.client._dbn_queue.get_nowait()
except queue.Empty:
while True:
try:
return await loop.run_in_executor(
None,
self.client._dbn_queue.get,
True,
0.1,
)
except queue.Empty:
if self.client._session.is_disconnected():
break
finally:
if not self.client._dbn_queue.is_full() and not self.client._session.is_reading():
logger.debug(
"resuming reading with %d pending records",
self.client._dbn_queue.qsize(),
)
self.client._session.resume_reading()

self.client._dbn_queue.disable()
await self.client.wait_for_close()
logger.debug("async iteration completed")
raise StopAsyncIteration

def __next__(self) -> DBNRecord:
if not self.client._dbn_queue.is_enabled():
raise ValueError("iteration has not started")

while True:
try:
record = self.client._dbn_queue.get(timeout=0.1)
except queue.Empty:
if self.client._session.is_disconnected():
break
else:
return record
finally:
if not self.client._dbn_queue.is_full() and not self.client._session.is_reading():
logger.debug(
"resuming reading with %d pending records",
self.client._dbn_queue.qsize(),
)
self.client._session.resume_reading()

self.client._dbn_queue.disable()
self.client.block_for_close()
logger.debug("iteration completed")
raise StopIteration
40 changes: 21 additions & 19 deletions databento/live/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Final

import databento_dbn
from databento_dbn import Metadata
from databento_dbn import Schema
from databento_dbn import SType

Expand Down Expand Up @@ -195,28 +196,29 @@ def __init__(
self._user_streams = user_streams

def _process_dbn(self, data: bytes) -> None:
# Do no re-write the metadata to the stream to avoid corruption
if not self._metadata or not data.startswith(b"DBN"):
for stream, exc_callback in self._user_streams.items():
try:
stream.write(data)
except Exception as exc:
stream_name = getattr(stream, "name", str(stream))
logger.error(
"error writing %d bytes to `%s` stream",
len(data),
stream_name,
exc_info=exc,
)
if exc_callback is not None:
exc_callback(exc)
start_index = 0
if data.startswith(b"DBN") and self._metadata:
# We have already received metata for the stream
# Set start index to metadata length
start_index = int.from_bytes(data[4:8], byteorder="little") + 8
self._metadata.check(Metadata.decode(bytes(data[:start_index])))
for stream, exc_callback in self._user_streams.items():
try:
stream.write(data[start_index:])
except Exception as exc:
stream_name = getattr(stream, "name", str(stream))
logger.error(
"error writing %d bytes to `%s` stream",
len(data[start_index:]),
stream_name,
exc_info=exc,
)
if exc_callback is not None:
exc_callback(exc)
return super()._process_dbn(data)

def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
if not self._metadata:
self._metadata.data = metadata
else:
self._metadata.check(metadata)
self._metadata.data = metadata
return super().received_metadata(metadata)

def received_record(self, record: DBNRecord) -> None:
Expand Down
2 changes: 1 addition & 1 deletion databento/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.33.0"
__version__ = "0.34.0"
Loading

0 comments on commit 7237474

Please sign in to comment.