From 9dac0977e710ff1d2ca50b817a05c441c239ad5d Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sat, 27 Jul 2024 06:13:20 +0200 Subject: [PATCH] RS-31: Add Bucket.update and Bucket.update_batch method for changing labels (#113) * implement changing labels * update CHANGELOG * fix pylint error --- CHANGELOG.md | 4 ++ README.md | 2 +- reduct/bucket.py | 106 +++++++++++++++++++++++++++++++++++-------- reduct/record.py | 5 +- tests/bucket_test.py | 32 +++++++++++++ 5 files changed, 128 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5522332..eb1609a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added: + +- RS-31: `Bucket.update` and `Bucket.update_batch` method for changing labels, [PR-113](https://github.com/reductstore/reduct-py/pull/113) + ## [1.10.0] - 2024-06-11 ### Added: diff --git a/README.md b/README.md index 2f9301b..ef3ca05 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This package provides an asynchronous HTTP client for interacting with [ReductS ## Features -* Supports the [ReductStore HTTP API v1.10](https://www.reduct.store/docs/http-api) +* Supports the [ReductStore HTTP API v1.11](https://www.reduct.store/docs/http-api) * Bucket management * API Token management * Write, read and query data diff --git a/reduct/bucket.py b/reduct/bucket.py index 8e5bd0c..147c504 100644 --- a/reduct/bucket.py +++ b/reduct/bucket.py @@ -12,6 +12,7 @@ AsyncIterator, Union, Dict, + Tuple, ) from pydantic import BaseModel @@ -271,23 +272,11 @@ async def write_batch( ReductError: if there is an HTTP or communication error """ - record_headers = {} - content_length = 0 - for time_stamp, record in batch.items(): - content_length += record.size - header = f"{record.size},{record.content_type}" - for label, value in record.labels.items(): - if "," in label or "=" in label: - header += f',{label}="{value}"' - else: - header += f",{label}={value}" - - record_headers[f"{TIME_PREFIX}{time_stamp}"] = header - async def iter_body(): for _, rec in batch.items(): yield await rec.read_all() + content_length, record_headers = self._make_headers(batch) _, headers = await self._http.request_all( "POST", f"/b/{self.name}/{entry_name}/batch", @@ -296,12 +285,66 @@ async def iter_body(): content_length=content_length, ) - errors = {} - for key, value in headers.items(): - if key.startswith(ERROR_PREFIX): - errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value) + return self._parse_errors_from_headers(headers) - return errors + async def update( + self, + entry_name: str, + timestamp: Union[int, datetime, float, str], + labels: Dict[str, str], + ): + """Update labels of an existing record + If a label doesn't exist, it will be created. + If a label is empty, it will be removed. + + Args: + entry_name: name of entry in the bucket + timestamp: timestamp of record in microseconds + labels: new labels + Raises: + ReductError: if there is an HTTP error + + Examples: + >>> await bucket.update("entry-1", "2022-01-01T01:00:00", + {"label1": "value1", "label2": ""}) + + """ + timestamp = unix_timestamp_from_any(timestamp) + await self._http.request_all( + "PATCH", f"/b/{self.name}/{entry_name}?ts={timestamp}", labels=labels + ) + + async def update_batch( + self, entry_name: str, batch: Batch + ) -> Dict[int, ReductError]: + """Update labels of existing records + If a label doesn't exist, it will be created. + If a label is empty, it will be removed. + + Args: + entry_name: name of entry in the bucket + batch: dict of timestamps as keys and labels as values + Returns: + dict of errors with timestamps as keys + Raises: + ReductError: if there is an HTTP error + + Examples: + >>> batch = Batch() + >>> batch.add(1640995200000000, labels={"label1": "value1", "label2": ""}) + >>> await bucket.update_batch("entry-1", batch) + + """ + + content_length, record_headers = self._make_headers(batch) + _, headers = await self._http.request_all( + "PATCH", + f"/b/{self.name}/{entry_name}/batch", + extra_headers=record_headers, + content_length=content_length, + ) + + return self._parse_errors_from_headers(headers) async def query( self, @@ -478,3 +521,30 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs): ) query_id = json.loads(data)["id"] return query_id + + @staticmethod + def _make_headers(batch: Batch) -> Tuple[int, Dict[str, str]]: + """Make headers for batch""" + record_headers = {} + content_length = 0 + for time_stamp, record in batch.items(): + content_length += record.size + header = f"{record.size},{record.content_type}" + for label, value in record.labels.items(): + if "," in label or "=" in label: + header += f',{label}="{value}"' + else: + header += f",{label}={value}" + + record_headers[f"{TIME_PREFIX}{time_stamp}"] = header + + record_headers["Content-Type"] = "application/octet-stream" + return content_length, record_headers + + @staticmethod + def _parse_errors_from_headers(headers): + errors = {} + for key, value in headers.items(): + if key.startswith(ERROR_PREFIX): + errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value) + return errors diff --git a/reduct/record.py b/reduct/record.py index 337327a..d8bf9e6 100644 --- a/reduct/record.py +++ b/reduct/record.py @@ -57,7 +57,7 @@ def __init__(self): def add( self, timestamp: Union[int, datetime, float, str], - data: bytes, + data: bytes = b"", content_type: Optional[str] = None, labels: Optional[Dict[str, str]] = None, ): @@ -70,7 +70,8 @@ def add( labels: labels of record (default: {}) """ if content_type is None: - content_type = "application/octet-stream" + content_type = "" + if labels is None: labels = {} diff --git a/tests/bucket_test.py b/tests/bucket_test.py index 5e500c5..0198193 100644 --- a/tests/bucket_test.py +++ b/tests/bucket_test.py @@ -508,3 +508,35 @@ async def test_query_records_each_n(bucket_1): ] assert len(records) == 1 assert records[0].timestamp == 3000000 + + +@pytest.mark.asyncio +@requires_api("1.11") +async def test_update_labels(bucket_1): + """Should update labels of a record""" + await bucket_1.update( + "entry-2", 3000000, {"label1": "new-value", "label2": "", "label3": "value3"} + ) + + async with bucket_1.read("entry-2", timestamp=3000000) as record: + assert record.labels == {"label1": "new-value", "label3": "value3"} + + +@pytest.mark.asyncio +@requires_api("1.11") +async def test_update_labels_batch(bucket_1): + """Should update labels of records in a batch""" + batch = Batch() + batch.add(3000000, labels={"label1": "new-value", "label2": "", "label3": "value3"}) + batch.add(4000000, labels={"label1": "new-value", "label2": "", "label4": "value4"}) + batch.add(8000000) + + errors = await bucket_1.update_batch("entry-2", batch) + assert len(errors) == 1 + assert errors[8000000] == ReductError(404, "No record with timestamp 8000000") + + async with bucket_1.read("entry-2", timestamp=3000000) as record: + assert record.labels == {"label1": "new-value", "label3": "value3"} + + async with bucket_1.read("entry-2", timestamp=4000000) as record: + assert record.labels == {"label1": "new-value", "label4": "value4"}