Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VER: Release 0.36.0 #58

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 0.36.0 - 2024-06-11

#### Enhancements
- Upgraded `databento-dbn` to 0.18.1

#### Bug fixes
- Fixed an issue where `heartbeat_interval_s` was not being sent to the gateway
- Fixed an issue where a truncated DBN stream could be written by the `Live` client in the event of an ungraceful disconnect

#### Breaking changes
- Output streams of the `Live` client added with `Live.add_stream` will now upgrade to the latest DBN version before being written

## 0.35.0 - 2024-06-04

#### Enhancements
Expand All @@ -9,7 +21,7 @@
- Added new off-market publisher values for `IFEU.IMPACT` and `NDEX.IMPACT`

#### Breaking changes
- Renamed `CbboMsg` to `CBBOMsg`.
- Renamed `CbboMsg` to `CBBOMsg`
- Renamed `use_snapshot` parameter in `Live.subscribe` function to `snapshot`
- All Python exceptions raised by `databento-dbn` have been changed to use the `DBNError` type

Expand Down Expand Up @@ -244,7 +256,7 @@ In some cases, DBN v1 records will be converted to their v2 counterparts:
- Fixed an issue where `DBNStore.from_bytes` did not rewind seekable buffers
- Fixed an issue where the `DBNStore` would not map symbols with input symbology of `SType.INSTRUMENT_ID`
- Fixed an issue with `DBNStore.request_symbology` when the DBN metadata's start date and end date were the same
- Fixed an issue where closed streams were not removed from a `Live` client on shutdown.
- Fixed an issue where closed streams were not removed from a `Live` client on shutdown

## 0.20.0 - 2023-09-21

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The library is fully compatible with the latest distribution of Anaconda 3.8 and
The minimum dependencies as found in the `pyproject.toml` are also listed below:
- python = "^3.8"
- aiohttp = "^3.8.3"
- databento-dbn = "0.18.0"
- databento-dbn = "0.18.1"
- numpy= ">=1.23.5"
- pandas = ">=1.5.3"
- pip-system-certs = ">=4.0" (Windows only)
Expand Down
3 changes: 2 additions & 1 deletion databento/common/cram.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import hashlib
import os
import sys
from typing import Final


BUCKET_ID_LENGTH = 5
BUCKET_ID_LENGTH: Final = 5


def get_challenge_response(challenge: str, key: str) -> str:
Expand Down
21 changes: 15 additions & 6 deletions databento/live/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from io import BytesIO
from operator import attrgetter
from typing import SupportsBytes
from typing import TypeVar

from databento_dbn import Encoding
Expand All @@ -20,16 +21,21 @@


@dataclasses.dataclass
class GatewayControl:
class GatewayControl(SupportsBytes):
"""
Base class for gateway control messages.
"""

@classmethod
def parse(cls: type[T], line: str) -> T:
def parse(cls: type[T], line: str | bytes) -> T:
"""
Parse a message of type `T` from a string.

Parameters
----------
line : str | bytes
The data to parse into a GatewayControl message.

Returns
-------
T
Expand All @@ -40,17 +46,20 @@ def parse(cls: type[T], line: str) -> T:
If the line fails to parse.

"""
if isinstance(line, bytes):
line = line.decode("utf-8")

if not line.endswith("\n"):
raise ValueError(f"`{line.strip()}` does not end with a newline")
raise ValueError(f"'{line!r}' does not end with a newline")

split_tokens = [t.partition("=") for t in line[:-1].split("|")]
split_tokens = [t.partition("=") for t in line.strip().split("|")]
data_dict = {k: v for k, _, v in split_tokens}

try:
return cls(**data_dict)
except TypeError:
raise ValueError(
f"`{line.strip()} is not a parsible {cls.__name__}",
f"'{line!r}'is not a parsible {cls.__name__}",
) from None

def __str__(self) -> str:
Expand Down Expand Up @@ -154,7 +163,7 @@ def parse_gateway_message(line: str) -> GatewayControl:
return message_cls.parse(line)
except ValueError:
continue
raise ValueError(f"`{line.strip()}` is not a parsible gateway message")
raise ValueError(f"'{line.strip()}' is not a parsible gateway message")


class GatewayDecoder:
Expand Down
91 changes: 57 additions & 34 deletions databento/live/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import dataclasses
import logging
import queue
import struct
import threading
from collections.abc import Iterable
from typing import IO
from typing import Callable
from typing import Final

import databento_dbn
from databento_dbn import Metadata
from databento_dbn import Schema
from databento_dbn import SType

Expand Down Expand Up @@ -188,41 +188,45 @@ def __init__(
ts_out: bool = False,
heartbeat_interval_s: int | None = None,
):
super().__init__(api_key, dataset, ts_out)
super().__init__(api_key, dataset, ts_out, heartbeat_interval_s)

self._dbn_queue = dbn_queue
self._loop = loop
self._metadata: SessionMetadata = metadata
self._user_callbacks = user_callbacks
self._user_streams = user_streams

def _process_dbn(self, data: bytes) -> None:
start_index = 0
if data.startswith(b"DBN") and self._metadata:
# We have already received metata for the stream
# Set start index to metadata length
start_index = int.from_bytes(data[4:8], byteorder="little") + 8
self._metadata.check(Metadata.decode(bytes(data[:start_index])))
for stream, exc_callback in self._user_streams.items():
try:
stream.write(data[start_index:])
except Exception as exc:
stream_name = getattr(stream, "name", str(stream))
logger.error(
"error writing %d bytes to `%s` stream",
len(data[start_index:]),
stream_name,
exc_info=exc,
)
if exc_callback is not None:
exc_callback(exc)
return super()._process_dbn(data)

def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
self._metadata.data = metadata
if self._metadata:
self._metadata.check(metadata)
else:
metadata_bytes = metadata.encode()
for stream, exc_callback in self._user_streams.items():
try:
stream.write(metadata_bytes)
except Exception as exc:
stream_name = getattr(stream, "name", str(stream))
logger.error(
"error writing %d bytes to `%s` stream",
len(metadata_bytes),
stream_name,
exc_info=exc,
)
if exc_callback is not None:
exc_callback(exc)

self._metadata.data = metadata
return super().received_metadata(metadata)

def received_record(self, record: DBNRecord) -> None:
self._dispatch_writes(record)
self._dispatch_callbacks(record)
if self._dbn_queue.is_enabled():
self._queue_for_iteration(record)

return super().received_record(record)

def _dispatch_callbacks(self, record: DBNRecord) -> None:
for callback, exc_callback in self._user_callbacks.items():
try:
callback(record)
Expand All @@ -236,18 +240,37 @@ def received_record(self, record: DBNRecord) -> None:
if exc_callback is not None:
exc_callback(exc)

if self._dbn_queue.is_enabled():
self._dbn_queue.put(record)
def _dispatch_writes(self, record: DBNRecord) -> None:
if hasattr(record, "ts_out"):
ts_out_bytes = struct.pack("Q", record.ts_out)
else:
ts_out_bytes = b""

# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
if self._dbn_queue.is_full():
logger.warning(
"record queue is full; %d record(s) to be processed",
self._dbn_queue.qsize(),
record_bytes = bytes(record) + ts_out_bytes

for stream, exc_callback in self._user_streams.items():
try:
stream.write(record_bytes)
except Exception as exc:
stream_name = getattr(stream, "name", str(stream))
logger.error(
"error writing %d bytes to `%s` stream",
len(record_bytes),
stream_name,
exc_info=exc,
)
self.transport.pause_reading()
if exc_callback is not None:
exc_callback(exc)

return super().received_record(record)
def _queue_for_iteration(self, record: DBNRecord) -> None:
self._dbn_queue.put(record)
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
if self._dbn_queue.is_full():
logger.warning(
"record queue is full; %d record(s) to be processed",
self._dbn_queue.qsize(),
)
self.transport.pause_reading()


class Session:
Expand Down
2 changes: 1 addition & 1 deletion databento/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.35.0"
__version__ = "0.36.0"
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databento"
version = "0.35.0"
version = "0.36.0"
description = "Official Python client library for Databento"
authors = [
"Databento <[email protected]>",
Expand Down Expand Up @@ -32,7 +32,7 @@ aiohttp = [
{version = "^3.8.3", python = "<3.12"},
{version = "^3.9.0", python = "^3.12"}
]
databento-dbn = "0.18.0"
databento-dbn = "0.18.1"
numpy = [
{version = ">=1.23.5", python = "<3.12"},
{version = "^1.26.0", python = "^3.12"}
Expand All @@ -47,12 +47,13 @@ zstandard = ">=0.21.0"
black = "^23.9.1"
mypy = "1.5.1"
pytest = "^7.4.2"
pytest-asyncio = ">=0.21.0"
pytest-asyncio = "==0.21.1"
ruff = "^0.0.291"
types-requests = "^2.30.0.0"
tomli = "^2.0.1"
teamcity-messages = "^1.32"
types-pytz = "^2024.1.0.20240203"
types-aiofiles = "^23.2.0.20240403"

[build-system]
requires = ["poetry-core"]
Expand All @@ -75,4 +76,4 @@ warn_unused_ignores = true

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--asyncio-mode auto"
asyncio_mode = "auto"
Loading
Loading