Skip to content

Commit

Permalink
Add TTL option to DriftClient.walk method (#38)
Browse files Browse the repository at this point in the history
* add ttl option to DriftClient.walk method

* refactor tests

* update tests

* update CHANGELOG

* fix test names
  • Loading branch information
atimin authored Dec 14, 2023
1 parent b4525d7 commit 04d2929
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## Added

- TTL option to `DriftClient.walk` method, [PR-38](https://github.com/panda-official/DriftPythonClient/pull/38)

### Fixed

- Error handling in DriftClient.walk, [PR-37](https://github.com/panda-official/DriftPythonClient/pull/37)
- Error handling in `DriftClient.walk`, [PR-37](https://github.com/panda-official/DriftPythonClient/pull/37)

## 0.8.1 - 2023-09-14

Expand Down
6 changes: 4 additions & 2 deletions pkg/drift_client/drift_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def walk(
topic: str,
start: Union[float, datetime, str],
stop: Union[float, datetime, str],
**kwargs,
) -> Iterator[DriftDataPackage]:
"""Walks through history data for selected topic
Expand All @@ -227,7 +228,8 @@ def walk(
Format: ISO string, datetime or float timestamp
stop: End of request timeframe,
Format: ISO string, datetime or float timestamp
KwArgs:
ttl: Time to live for the query only for ReductStore
Returns:
Iterator with DriftDataPackage
Raises:
Expand All @@ -247,7 +249,7 @@ def walk(
else:
start = _convert_type(start)
stop = _convert_type(stop)
for package in self._blob_storage.walk(topic, start, stop):
for package in self._blob_storage.walk(topic, start, stop, **kwargs):
yield DriftDataPackage(package)

def subscribe_data(self, topic: str, handler: Callable[[DriftDataPackage], None]):
Expand Down
7 changes: 5 additions & 2 deletions pkg/drift_client/reduct_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ def fetch_data(self, path: str) -> Optional[bytes]:
except ReductError as err:
raise DriftClientError(f"Could not read item at {path}") from err

def walk(self, entry: str, start: int, stop: int) -> Iterator[bytes]:
def walk(self, entry: str, start: int, stop: int, **kwargs) -> Iterator[bytes]:
"""
Walk through the records of an entry between start and stop.
Args:
entry: entry name
start: start timestamp UNIX in seconds
stop: stop timestamp UNIX in seconds
Keyword Args:
ttl: time to live for the query
Raises:
DriftClientError: if failed to fetch data
"""

bucket: Bucket = self._run(self._client.get_bucket(self._bucket))

ait = bucket.query(entry, start * 1000_000, stop * 1000_000, ttl=60)
ttl = kwargs.get("ttl", 60)
ait = bucket.query(entry, start * 1000_000, stop * 1000_000, ttl=ttl)

async def get_next():
try:
Expand Down
43 changes: 26 additions & 17 deletions tests/reduct_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ def test__check_server_available():
_ = ReductStoreClient("http://localhost:8383", "password")


@pytest.mark.usefixtures("reduct_client")
def test__check_packages_names_available(bucket):
@pytest.fixture(name="drift_client")
def _make_drift_client(reduct_client):
_ = reduct_client
return ReductStoreClient("http://localhost:8383", "password")


def test__check_packages_names_available(bucket, drift_client):
"""should check if server is available"""
bucket.get_entry_list.return_value = [
EntryInfo(
Expand All @@ -64,15 +69,13 @@ def test__check_packages_names_available(bucket):
)
]

client = ReductStoreClient("http://localhost:8383", "password")
assert client.check_package_list(
assert drift_client.check_package_list(
["topic/1.dp", "topic/2.dp", "topic/3.dp", "topic/4.dp"]
) == ["topic/2.dp", "topic/3.dp"]
assert client.check_package_list(["unknown/3.dp", "unknown/4.dp"]) == []
assert drift_client.check_package_list(["unknown/3.dp", "unknown/4.dp"]) == []


@pytest.mark.usefixtures("reduct_client")
def test__fetch_package(mocker, bucket):
def test__fetch_package(mocker, bucket, drift_client):
"""should fetch package from reduct storage"""

def make_record(timestamp: int, data: bytes) -> Record:
Expand All @@ -97,8 +100,7 @@ async def read(_n: int):
ctx.__aexit__.return_value = mocker.Mock()
bucket.read.return_value = ctx

client = ReductStoreClient("http://localhost:8383", "password")
assert client.fetch_data("topic/1.dp") == b"test"
assert drift_client.fetch_data("topic/1.dp") == b"test"


class _Rec: # pylint: disable=too-few-public-methods
Expand All @@ -110,8 +112,7 @@ async def read_all(self):
return self.data


@pytest.mark.usefixtures("reduct_client")
def test__walk_records(bucket):
def test__walk_records(bucket, drift_client):
"""should walk records"""

items = [b"1", b"2", b"3", b"4", b"5"]
Expand All @@ -122,21 +123,29 @@ async def _iter():

bucket.query.return_value = _iter()

client = ReductStoreClient("http://localhost:8383", "password")
assert list(client.walk("topic", 0, 1)) == items
assert list(drift_client.walk("topic", 0, 1)) == items

bucket.query.assert_called_with("topic", 0, 1000_000, ttl=60)


@pytest.mark.usefixtures("reduct_client")
def test___walk_with_error(bucket):
def test__walk_with_error(bucket, drift_client):
"""should raise error if failed to walk records"""

async def _iter():
yield _Rec(b"1")
raise ReductError(400, "test")

bucket.query.return_value = _iter()
client = ReductStoreClient("http://localhost:8383", "password")
with pytest.raises(DriftClientError):
list(client.walk("topic", 0, 1))
list(drift_client.walk("topic", 0, 1))


def test__walk_with_ttl(bucket, drift_client):
"""should walk records with ttl"""

async def _iter():
yield _Rec(b"1")

bucket.query.return_value = _iter()
list(drift_client.walk("topic", 0, 1, ttl=10))
bucket.query.assert_called_with("topic", 0, 1000_000, ttl=10)

0 comments on commit 04d2929

Please sign in to comment.