Skip to content

Commit

Permalink
RS-31: Add Bucket.update and Bucket.update_batch method for changing …
Browse files Browse the repository at this point in the history
…labels (#113)

* implement changing labels

* update CHANGELOG

* fix pylint error
  • Loading branch information
atimin authored Jul 27, 2024
1 parent 48f517b commit 9dac097
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added:

- RS-31: `Bucket.update` and `Bucket.update_batch` method for changing labels, [PR-113](https://github.com/reductstore/reduct-py/pull/113)

## [1.10.0] - 2024-06-11

### Added:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This package provides an asynchronous HTTP client for interacting with [ReductS

## Features

* Supports the [ReductStore HTTP API v1.10](https://www.reduct.store/docs/http-api)
* Supports the [ReductStore HTTP API v1.11](https://www.reduct.store/docs/http-api)
* Bucket management
* API Token management
* Write, read and query data
Expand Down
106 changes: 88 additions & 18 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AsyncIterator,
Union,
Dict,
Tuple,
)

from pydantic import BaseModel
Expand Down Expand Up @@ -271,23 +272,11 @@ async def write_batch(
ReductError: if there is an HTTP or communication error
"""

record_headers = {}
content_length = 0
for time_stamp, record in batch.items():
content_length += record.size
header = f"{record.size},{record.content_type}"
for label, value in record.labels.items():
if "," in label or "=" in label:
header += f',{label}="{value}"'
else:
header += f",{label}={value}"

record_headers[f"{TIME_PREFIX}{time_stamp}"] = header

async def iter_body():
for _, rec in batch.items():
yield await rec.read_all()

content_length, record_headers = self._make_headers(batch)
_, headers = await self._http.request_all(
"POST",
f"/b/{self.name}/{entry_name}/batch",
Expand All @@ -296,12 +285,66 @@ async def iter_body():
content_length=content_length,
)

errors = {}
for key, value in headers.items():
if key.startswith(ERROR_PREFIX):
errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value)
return self._parse_errors_from_headers(headers)

return errors
async def update(
self,
entry_name: str,
timestamp: Union[int, datetime, float, str],
labels: Dict[str, str],
):
"""Update labels of an existing record
If a label doesn't exist, it will be created.
If a label is empty, it will be removed.
Args:
entry_name: name of entry in the bucket
timestamp: timestamp of record in microseconds
labels: new labels
Raises:
ReductError: if there is an HTTP error
Examples:
>>> await bucket.update("entry-1", "2022-01-01T01:00:00",
{"label1": "value1", "label2": ""})
"""
timestamp = unix_timestamp_from_any(timestamp)
await self._http.request_all(
"PATCH", f"/b/{self.name}/{entry_name}?ts={timestamp}", labels=labels
)

async def update_batch(
self, entry_name: str, batch: Batch
) -> Dict[int, ReductError]:
"""Update labels of existing records
If a label doesn't exist, it will be created.
If a label is empty, it will be removed.
Args:
entry_name: name of entry in the bucket
batch: dict of timestamps as keys and labels as values
Returns:
dict of errors with timestamps as keys
Raises:
ReductError: if there is an HTTP error
Examples:
>>> batch = Batch()
>>> batch.add(1640995200000000, labels={"label1": "value1", "label2": ""})
>>> await bucket.update_batch("entry-1", batch)
"""

content_length, record_headers = self._make_headers(batch)
_, headers = await self._http.request_all(
"PATCH",
f"/b/{self.name}/{entry_name}/batch",
extra_headers=record_headers,
content_length=content_length,
)

return self._parse_errors_from_headers(headers)

async def query(
self,
Expand Down Expand Up @@ -478,3 +521,30 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs):
)
query_id = json.loads(data)["id"]
return query_id

@staticmethod
def _make_headers(batch: Batch) -> Tuple[int, Dict[str, str]]:
"""Make headers for batch"""
record_headers = {}
content_length = 0
for time_stamp, record in batch.items():
content_length += record.size
header = f"{record.size},{record.content_type}"
for label, value in record.labels.items():
if "," in label or "=" in label:
header += f',{label}="{value}"'
else:
header += f",{label}={value}"

record_headers[f"{TIME_PREFIX}{time_stamp}"] = header

record_headers["Content-Type"] = "application/octet-stream"
return content_length, record_headers

@staticmethod
def _parse_errors_from_headers(headers):
errors = {}
for key, value in headers.items():
if key.startswith(ERROR_PREFIX):
errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value)
return errors
5 changes: 3 additions & 2 deletions reduct/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self):
def add(
self,
timestamp: Union[int, datetime, float, str],
data: bytes,
data: bytes = b"",
content_type: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
):
Expand All @@ -70,7 +70,8 @@ def add(
labels: labels of record (default: {})
"""
if content_type is None:
content_type = "application/octet-stream"
content_type = ""

if labels is None:
labels = {}

Expand Down
32 changes: 32 additions & 0 deletions tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,35 @@ async def test_query_records_each_n(bucket_1):
]
assert len(records) == 1
assert records[0].timestamp == 3000000


@pytest.mark.asyncio
@requires_api("1.11")
async def test_update_labels(bucket_1):
"""Should update labels of a record"""
await bucket_1.update(
"entry-2", 3000000, {"label1": "new-value", "label2": "", "label3": "value3"}
)

async with bucket_1.read("entry-2", timestamp=3000000) as record:
assert record.labels == {"label1": "new-value", "label3": "value3"}


@pytest.mark.asyncio
@requires_api("1.11")
async def test_update_labels_batch(bucket_1):
"""Should update labels of records in a batch"""
batch = Batch()
batch.add(3000000, labels={"label1": "new-value", "label2": "", "label3": "value3"})
batch.add(4000000, labels={"label1": "new-value", "label2": "", "label4": "value4"})
batch.add(8000000)

errors = await bucket_1.update_batch("entry-2", batch)
assert len(errors) == 1
assert errors[8000000] == ReductError(404, "No record with timestamp 8000000")

async with bucket_1.read("entry-2", timestamp=3000000) as record:
assert record.labels == {"label1": "new-value", "label3": "value3"}

async with bucket_1.read("entry-2", timestamp=4000000) as record:
assert record.labels == {"label1": "new-value", "label4": "value4"}

0 comments on commit 9dac097

Please sign in to comment.