From dac70e4d3ca67a4cd5c64a8ccbf110bcd4d636bd Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Tue, 16 Apr 2024 11:05:02 -0700 Subject: [PATCH 1/8] MOD: Add pip-system-certs to Python client --- CHANGELOG.md | 5 +++++ README.md | 1 + pyproject.toml | 1 + 3 files changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc142bf..4a63630 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.34.0 - TBD + +#### Enhancements +- Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy + ## 0.33.0 - 2024-04-16 #### Enhancements diff --git a/README.md b/README.md index 76e41f4..cc86dbc 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ The minimum dependencies as found in the `pyproject.toml` are also listed below: - databento-dbn = "0.17.1" - numpy= ">=1.23.5" - pandas = ">=1.5.3" +- pip-system-certs = ">=4.0" (Windows only) - pyarrow = ">=13.0.0" - requests = ">=2.24.0" - zstandard = ">=0.21.0" diff --git a/pyproject.toml b/pyproject.toml index 7dcda22..531d934 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ numpy = [ {version = "^1.26.0", python = "^3.12"} ] pandas = ">=1.5.3" +pip-system-certs = {version=">=4.0", markers="platform_system == 'Windows'"} pyarrow = ">=13.0.0" requests = ">=2.24.0" zstandard = ">=0.21.0" From e2fd6fe6cec8a0b0cfeaa291bfbdcc278565d430 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Thu, 2 May 2024 10:44:50 -0700 Subject: [PATCH 2/8] ADD: Automatic Live client stop on iterator del --- CHANGELOG.md | 1 + databento/live/client.py | 157 +++++++++++++++++++++++---------------- 2 files changed, 94 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a63630..1cc76db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ #### Enhancements - Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy +- Iteration of the `Live` client will now automatically call `Live.stop` when the iterator is destroyed, such as when a for loop is escaped with an exception or `break` statement. ## 0.33.0 - 2024-04-16 diff --git a/databento/live/client.py b/databento/live/client.py index 57692e5..68b1eb8 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -115,78 +115,17 @@ def factory() -> _SessionProtocol: if not Live._thread.is_alive(): Live._thread.start() - def __aiter__(self) -> Live: + def __aiter__(self) -> LiveIterator: return iter(self) - async def __anext__(self) -> DBNRecord: - if not self._dbn_queue.is_enabled(): - raise ValueError("iteration has not started") - - loop = asyncio.get_running_loop() - - try: - return self._dbn_queue.get_nowait() - except queue.Empty: - while True: - try: - return await loop.run_in_executor( - None, - self._dbn_queue.get, - True, - 0.1, - ) - except queue.Empty: - if self._session.is_disconnected(): - break - finally: - if not self._dbn_queue.is_full() and not self._session.is_reading(): - logger.debug( - "resuming reading with %d pending records", - self._dbn_queue.qsize(), - ) - self._session.resume_reading() - - self._dbn_queue.disable() - await self.wait_for_close() - logger.debug("completed async iteration") - raise StopAsyncIteration - - def __iter__(self) -> Live: + def __iter__(self) -> LiveIterator: logger.debug("starting iteration") if self._session.is_started(): logger.error("iteration started after session has started") raise ValueError( "Cannot start iteration after streaming has started, records may be missed. Don't call `Live.start` before iterating.", ) - elif self.is_connected(): - self.start() - self._dbn_queue._enabled.set() - return self - - def __next__(self) -> DBNRecord: - if not self._dbn_queue.is_enabled(): - raise ValueError("iteration has not started") - - while True: - try: - record = self._dbn_queue.get(timeout=0.1) - except queue.Empty: - if self._session.is_disconnected(): - break - else: - return record - finally: - if not self._dbn_queue.is_full() and not self._session.is_reading(): - logger.debug( - "resuming reading with %d pending records", - self._dbn_queue.qsize(), - ) - self._session.resume_reading() - - self._dbn_queue.disable() - self.block_for_close() - logger.debug("completed iteration") - raise StopIteration + return LiveIterator(self) def __repr__(self) -> str: name = self.__class__.__name__ @@ -661,3 +600,93 @@ def _map_symbol(self, record: DBNRecord) -> None: instrument_id = record.instrument_id self._symbology_map[instrument_id] = record.stype_out_symbol logger.info("added symbology mapping %s to %d", out_symbol, instrument_id) + + +class LiveIterator: + """ + Iterator class for the `Live` client. Automatically starts the client when + created and will stop it when destroyed. This provides context-manager-like + behavior to for loops. + + Parameters + ---------- + client : Live + The Live client that spawned this LiveIterator. + + """ + + def __init__(self, client: Live): + client._dbn_queue._enabled.set() + client.start() + self._client = client + + @property + def client(self) -> Live: + return self._client + + def __iter__(self) -> LiveIterator: + return self + + def __del__(self) -> None: + if self.client.is_connected(): + self.client.stop() + self.client.block_for_close() + logger.debug("iteration aborted") + + async def __anext__(self) -> DBNRecord: + if not self.client._dbn_queue.is_enabled(): + raise ValueError("iteration has not started") + + loop = asyncio.get_running_loop() + + try: + return self.client._dbn_queue.get_nowait() + except queue.Empty: + while True: + try: + return await loop.run_in_executor( + None, + self.client._dbn_queue.get, + True, + 0.1, + ) + except queue.Empty: + if self.client._session.is_disconnected(): + break + finally: + if not self.client._dbn_queue.is_full() and not self.client._session.is_reading(): + logger.debug( + "resuming reading with %d pending records", + self.client._dbn_queue.qsize(), + ) + self.client._session.resume_reading() + + self.client._dbn_queue.disable() + await self.client.wait_for_close() + logger.debug("async iteration completed") + raise StopAsyncIteration + + def __next__(self) -> DBNRecord: + if not self.client._dbn_queue.is_enabled(): + raise ValueError("iteration has not started") + + while True: + try: + record = self.client._dbn_queue.get(timeout=0.1) + except queue.Empty: + if self.client._session.is_disconnected(): + break + else: + return record + finally: + if not self.client._dbn_queue.is_full() and not self.client._session.is_reading(): + logger.debug( + "resuming reading with %d pending records", + self.client._dbn_queue.qsize(), + ) + self.client._session.resume_reading() + + self.client._dbn_queue.disable() + self.client.block_for_close() + logger.debug("iteration completed") + raise StopIteration From e34c724d0472139ab5769ff001ad7e588af2cea7 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Wed, 8 May 2024 07:46:34 +1000 Subject: [PATCH 3/8] DOC: Improve F_LAST flag description --- databento/common/enums.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databento/common/enums.py b/databento/common/enums.py index 4498a3e..556022d 100644 --- a/databento/common/enums.py +++ b/databento/common/enums.py @@ -198,7 +198,7 @@ class RecordFlags(StringyMixin, IntFlag): # type: ignore Represents record flags. F_LAST - Last message in the packet from the venue for a given `instrument_id`. + Marks the last record in a single event for a given `instrument_id`. F_TOB Indicates a top-of-book message, not an individual order. F_SNAPSHOT From 6f2671d09b86f6bccec3454ecb40edc23d71bca1 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Tue, 7 May 2024 11:14:51 -0700 Subject: [PATCH 4/8] FIX: Python batch download for existing files --- CHANGELOG.md | 3 + databento/historical/api/batch.py | 44 +++++---- tests/test_historical_batch.py | 144 ++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cc76db..41354a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ - Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy - Iteration of the `Live` client will now automatically call `Live.stop` when the iterator is destroyed, such as when a for loop is escaped with an exception or `break` statement. +#### Bug fixes +- Fixed an issue where `batch.download` and `batch.download_async` would fail if requested files already existed in the output directory + ## 0.33.0 - 2024-04-16 #### Enhancements diff --git a/databento/historical/api/batch.py b/databento/historical/api/batch.py index f9fdabc..ed67038 100644 --- a/databento/historical/api/batch.py +++ b/databento/historical/api/batch.py @@ -394,8 +394,16 @@ def _download_batch_file( headers: dict[str, str] = self._headers.copy() if output_path.exists(): existing_size = output_path.stat().st_size - headers["Range"] = f"bytes={existing_size}-{batch_download_file.size - 1}" - mode = "ab" + if existing_size < batch_download_file.size: + headers["Range"] = f"bytes={existing_size}-{batch_download_file.size - 1}" + mode = "ab" + elif existing_size == batch_download_file.size: + # File exists and is complete + break + else: + raise FileExistsError( + f"Batch file {output_path.name} already exists and has a larger than expected size.", + ) else: mode = "wb" try: @@ -424,24 +432,26 @@ def _download_batch_file( attempts += 1 continue # try again raise BentoError(f"Error downloading file: {exc}") from None + else: + break - logger.debug("Download of %s completed", output_path.name) - hash_algo, _, hash_hex = batch_download_file.hash_str.partition(":") + logger.debug("Download of %s completed", output_path.name) + hash_algo, _, hash_hex = batch_download_file.hash_str.partition(":") - if hash_algo == "sha256": - output_hash = hashlib.sha256(output_path.read_bytes()) - if output_hash.hexdigest() != hash_hex: - warn_msg = f"Downloaded file failed checksum validation: {output_path.name}" - logger.warning(warn_msg) - warnings.warn(warn_msg, category=BentoWarning) - else: - logger.warning( - "Skipping %s checksum because %s is not supported", - output_path.name, - hash_algo, - ) + if hash_algo == "sha256": + output_hash = hashlib.sha256(output_path.read_bytes()) + if output_hash.hexdigest() != hash_hex: + warn_msg = f"Downloaded file failed checksum validation: {output_path.name}" + logger.warning(warn_msg) + warnings.warn(warn_msg, category=BentoWarning) + else: + logger.warning( + "Skipping %s checksum because %s is not supported", + output_path.name, + hash_algo, + ) - return output_path + return output_path @dataclass diff --git a/tests/test_historical_batch.py b/tests/test_historical_batch.py index 553536b..efbe35c 100644 --- a/tests/test_historical_batch.py +++ b/tests/test_historical_batch.py @@ -276,3 +276,147 @@ def test_batch_download_rate_limit_429( assert mocked_batch_list_files.call_args.args == (job_id,) assert len(downloaded_files) == 1 assert downloaded_files[0].read_bytes() == file_content + + +def test_batch_download_file_exists( + monkeypatch: pytest.MonkeyPatch, + historical_client: Historical, + tmp_path: Path, +) -> None: + """ + Tests batch download by setting up a MagicMock which will return the + content "unittest". + + A subsequent call to batch.download should not fail. + + """ + # Arrange + job_id = "GLBX-20220610-5DEFXVTMSM" + filename = "glbx-mdp3-20220610.mbo.csv.zst" + file_content = b"unittest" + file_hash = f"sha256:{hashlib.sha256(file_content).hexdigest()}" + file_size = len(file_content) + + # Mock the call to list files so it returns a test manifest + monkeypatch.setattr( + historical_client.batch, + "list_files", + mocked_batch_list_files := MagicMock( + return_value=[ + { + "filename": filename, + "hash": file_hash, + "size": file_size, + "urls": { + "https": f"localhost:442/v0/batch/download/TESTUSER/{job_id}/{filename}", + "ftp": "", + }, + }, + ], + ), + ) + + # Mock the call for get, so we can simulate a 429 response + ok_response = MagicMock() + ok_response.__enter__.return_value = MagicMock( + status_code=200, + iter_content=MagicMock(return_value=iter([file_content])), + ) + monkeypatch.setattr( + requests, + "get", + MagicMock( + side_effect=[ok_response], + ), + ) + + # Act + historical_client.batch.download( + job_id=job_id, + output_dir=tmp_path, + filename_to_download=filename, + ) + + downloaded_files = historical_client.batch.download( + job_id=job_id, + output_dir=tmp_path, + filename_to_download=filename, + ) + + # Assert + assert mocked_batch_list_files.call_args.args == (job_id,) + assert len(downloaded_files) == 1 + assert downloaded_files[0].read_bytes() == file_content + + +def test_batch_download_file_larger_than_expected( + monkeypatch: pytest.MonkeyPatch, + historical_client: Historical, + tmp_path: Path, +) -> None: + """ + Tests batch download by setting up a MagicMock which will return the + content "unittest". + + Then, write some extra bytes to that file, and ensure a subsequent + call to batch.download will raise an exception. + + """ + # Arrange + job_id = "GLBX-20220610-5DEFXVTMSM" + filename = "glbx-mdp3-20220610.mbo.csv.zst" + file_content = b"unittest" + file_hash = f"sha256:{hashlib.sha256(file_content).hexdigest()}" + file_size = len(file_content) + + # Mock the call to list files so it returns a test manifest + monkeypatch.setattr( + historical_client.batch, + "list_files", + MagicMock( + return_value=[ + { + "filename": filename, + "hash": file_hash, + "size": file_size, + "urls": { + "https": f"localhost:442/v0/batch/download/TESTUSER/{job_id}/{filename}", + "ftp": "", + }, + }, + ], + ), + ) + + # Mock the call for get, so we can simulate a 429 response + ok_response = MagicMock() + ok_response.__enter__.return_value = MagicMock( + status_code=200, + iter_content=MagicMock(return_value=iter([file_content])), + ) + monkeypatch.setattr( + requests, + "get", + MagicMock( + side_effect=[ok_response], + ), + ) + + # Act + downloaded_files = historical_client.batch.download( + job_id=job_id, + output_dir=tmp_path, + filename_to_download=filename, + ) + + # Increase the existing file size with some junk + with downloaded_files[-1].open(mode="ab") as out: + out.write(b"junk") + + # Assert + with pytest.raises(FileExistsError): + historical_client.batch.download( + job_id=job_id, + output_dir=tmp_path, + filename_to_download=filename, + ) From 64978df0b99ad27273f1d2de10207d8f271d6d19 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Wed, 8 May 2024 14:25:01 -0700 Subject: [PATCH 5/8] MOD: Define a chunk size for requests streaming --- CHANGELOG.md | 1 + databento/common/constants.py | 2 ++ databento/historical/api/batch.py | 3 ++- databento/historical/http.py | 6 ++++-- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41354a5..b552a61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bug fixes - Fixed an issue where `batch.download` and `batch.download_async` would fail if requested files already existed in the output directory +- Fixed an issue where `batch.download`, `batch.download_async`, and `timeseries.get_range` could use a lot of memory while streaming data ## 0.33.0 - 2024-04-16 diff --git a/databento/common/constants.py b/databento/common/constants.py index 4d176e6..5b023b2 100644 --- a/databento/common/constants.py +++ b/databento/common/constants.py @@ -25,6 +25,8 @@ x[0]: np.iinfo(x[1]).max for x in InstrumentDefMsg._dtypes if not isinstance(x[1], str) } +HTTP_STREAMING_READ_SIZE: Final = 2**12 + SCHEMA_STRUCT_MAP: Final[dict[Schema, type[DBNRecord]]] = { Schema.DEFINITION: InstrumentDefMsg, Schema.IMBALANCE: ImbalanceMsg, diff --git a/databento/historical/api/batch.py b/databento/historical/api/batch.py index ed67038..98200d9 100644 --- a/databento/historical/api/batch.py +++ b/databento/historical/api/batch.py @@ -24,6 +24,7 @@ from databento_dbn import SType from requests.auth import HTTPBasicAuth +from databento.common.constants import HTTP_STREAMING_READ_SIZE from databento.common.enums import Delivery from databento.common.enums import Packaging from databento.common.enums import SplitDuration @@ -416,7 +417,7 @@ def _download_batch_file( ) as response: check_http_error(response) with open(output_path, mode=mode) as f: - for chunk in response.iter_content(chunk_size=None): + for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE): f.write(chunk) except BentoHttpError as exc: if exc.http_status == 429: diff --git a/databento/historical/http.py b/databento/historical/http.py index 20bc760..13e2ff8 100644 --- a/databento/historical/http.py +++ b/databento/historical/http.py @@ -8,6 +8,7 @@ from os import PathLike from typing import IO from typing import Any +from typing import Final import aiohttp import requests @@ -16,6 +17,7 @@ from requests import Response from requests.auth import HTTPBasicAuth +from databento.common.constants import HTTP_STREAMING_READ_SIZE from databento.common.dbnstore import DBNStore from databento.common.error import BentoClientError from databento.common.error import BentoDeprecationWarning @@ -25,7 +27,7 @@ from databento.common.system import USER_AGENT -WARNING_HEADER_FIELD: str = "X-Warning" +WARNING_HEADER_FIELD: Final = "X-Warning" class BentoHttpAPI: @@ -137,7 +139,7 @@ def _stream( writer = open(path, "x+b") try: - for chunk in response.iter_content(chunk_size=None): + for chunk in response.iter_content(chunk_size=HTTP_STREAMING_READ_SIZE): writer.write(chunk) except Exception as exc: raise BentoError(f"Error streaming response: {exc}") from None From 364365b49e690327cceefae452934a0aba491a0a Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Thu, 9 May 2024 07:36:32 -0700 Subject: [PATCH 6/8] VER: Release 0.34.0 --- CHANGELOG.md | 2 +- databento/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b552a61..467ab13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.34.0 - TBD +## 0.34.0 - 2024-05-09 #### Enhancements - Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy diff --git a/databento/version.py b/databento/version.py index 571d8cc..eab0e9c 100644 --- a/databento/version.py +++ b/databento/version.py @@ -1 +1 @@ -__version__ = "0.33.0" +__version__ = "0.34.0" diff --git a/pyproject.toml b/pyproject.toml index 531d934..9402e81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databento" -version = "0.33.0" +version = "0.34.0" description = "Official Python client library for Databento" authors = [ "Databento ", From 736ad0c7a8c70598646907d94acfe57e27d135af Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 13 May 2024 12:58:46 -0700 Subject: [PATCH 7/8] FIX: Live client metadata reconnect handling --- CHANGELOG.md | 1 + databento/live/session.py | 40 ++++++++++++++++++++------------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 467ab13..5e0c6f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bug fixes - Fixed an issue where `batch.download` and `batch.download_async` would fail if requested files already existed in the output directory - Fixed an issue where `batch.download`, `batch.download_async`, and `timeseries.get_range` could use a lot of memory while streaming data +- Fixed an issue where reusing a `Live` client with an open output stream would drop DBN records when received at the same time as the `Metadata` header ## 0.33.0 - 2024-04-16 diff --git a/databento/live/session.py b/databento/live/session.py index 70240b7..dad2474 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -11,6 +11,7 @@ from typing import Final import databento_dbn +from databento_dbn import Metadata from databento_dbn import Schema from databento_dbn import SType @@ -195,28 +196,29 @@ def __init__( self._user_streams = user_streams def _process_dbn(self, data: bytes) -> None: - # Do no re-write the metadata to the stream to avoid corruption - if not self._metadata or not data.startswith(b"DBN"): - for stream, exc_callback in self._user_streams.items(): - try: - stream.write(data) - except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) - logger.error( - "error writing %d bytes to `%s` stream", - len(data), - stream_name, - exc_info=exc, - ) - if exc_callback is not None: - exc_callback(exc) + start_index = 0 + if data.startswith(b"DBN") and self._metadata: + # We have already received metata for the stream + # Set start index to metadata length + start_index = int.from_bytes(data[4:8], byteorder="little") + 8 + self._metadata.check(Metadata.decode(bytes(data[:start_index]))) + for stream, exc_callback in self._user_streams.items(): + try: + stream.write(data[start_index:]) + except Exception as exc: + stream_name = getattr(stream, "name", str(stream)) + logger.error( + "error writing %d bytes to `%s` stream", + len(data[start_index:]), + stream_name, + exc_info=exc, + ) + if exc_callback is not None: + exc_callback(exc) return super()._process_dbn(data) def received_metadata(self, metadata: databento_dbn.Metadata) -> None: - if not self._metadata: - self._metadata.data = metadata - else: - self._metadata.check(metadata) + self._metadata.data = metadata return super().received_metadata(metadata) def received_record(self, record: DBNRecord) -> None: From 2aaef363a2d8e86904fda4263bfa5ffc902d8b58 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 13 May 2024 12:35:35 -0700 Subject: [PATCH 8/8] VER: Release 0.34.0 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e0c6f4..36bb258 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.34.0 - 2024-05-09 +## 0.34.0 - 2024-05-14 #### Enhancements - Added `pip-system-certs` dependency for Windows platforms to prevent a connection issue in `requests` when behind a proxy