From d60941eaa97a4db6ffb3d0cdf90f344b0da04dcc Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Sun, 1 Oct 2023 11:02:40 +0200 Subject: [PATCH 1/7] support for provisioned resources --- reduct/bucket.py | 3 +++ reduct/client.py | 3 +++ tests/bucket_test.py | 1 + tests/client_test.py | 1 + 4 files changed, 8 insertions(+) diff --git a/reduct/bucket.py b/reduct/bucket.py index 8a61ad6..12a35ee 100644 --- a/reduct/bucket.py +++ b/reduct/bucket.py @@ -58,6 +58,9 @@ class BucketInfo(BaseModel): latest_record: int """UNIX timestamp of the latest record in microseconds""" + is_provisioned: bool = False + """bucket is provisioned amd you can't remove it or change its settings""" + class EntryInfo(BaseModel): """Entry of bucket""" diff --git a/reduct/client.py b/reduct/client.py index fc5ee6f..e56b3f8 100644 --- a/reduct/client.py +++ b/reduct/client.py @@ -70,6 +70,9 @@ class Token(BaseModel): created_at: datetime """creation time of token""" + is_provisioned: bool = False + """token is provisioned and can't be deleted or changed""" + class FullTokenInfo(Token): """Full information about token with permissions""" diff --git a/tests/bucket_test.py b/tests/bucket_test.py index cf3dc65..602abd7 100644 --- a/tests/bucket_test.py +++ b/tests/bucket_test.py @@ -59,6 +59,7 @@ async def test__get_info(bucket_2): "name": "bucket-2", "oldest_record": 5000000, "size": 22, + "is_provisioned": False, } diff --git a/tests/client_test.py b/tests/client_test.py index 24118a8..6f756fe 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -174,6 +174,7 @@ async def test__get_token(client, with_token): """Should get a token by name""" token = await client.get_token(with_token) assert token.name == with_token + assert not token.is_provisioned assert token.permissions.dict() == { "full_access": True, "read": ["bucket-1"], From faff06c561b642c6bcb438781567d170c5a5f6a7 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 19:32:15 +0200 Subject: [PATCH 2/7] implement batched write --- reduct/bucket.py | 60 ++++++++++++++++++++++++++++++++++++++++++++---- reduct/client.py | 32 ++++++++++++-------------- reduct/error.py | 5 ++++ reduct/http.py | 18 ++++++++++++--- reduct/record.py | 47 +++++++++++++++++++++++++++++++++---- 5 files changed, 132 insertions(+), 30 deletions(-) diff --git a/reduct/bucket.py b/reduct/bucket.py index 12a35ee..b31c9fb 100644 --- a/reduct/bucket.py +++ b/reduct/bucket.py @@ -9,12 +9,21 @@ List, AsyncIterator, Union, + Dict, ) from pydantic import BaseModel +from reduct.error import ReductError from reduct.http import HttpClient -from reduct.record import Record, parse_batched_records, parse_record +from reduct.record import ( + Record, + parse_batched_records, + parse_record, + Batch, + TIME_PREFIX, + ERROR_PREFIX, +) class QuotaType(Enum): @@ -234,6 +243,48 @@ async def write( **kwargs, ) + async def write_batch( + self, entry_name: str, batch: Batch + ) -> Dict[int, ReductError]: + """ + Write a batch of records to entries in a sole request + + Args: + entry_name: name of entry in the bucket + batch: list of records + Returns: + dict of errors with timestamps as keys + Raises: + ReductError: if there is an HTTP or communication error + """ + + record_headers = {} + body = b"" + for time_stamp, record in batch.items(): + header = f"{record.size},{record.content_type}" + body += record.data + for label, value in record.labels.items(): + if "," in label or "=" in label: + header += f',{label}="{value}"' + else: + header += f",{label}={value}" + + record_headers[f"{TIME_PREFIX}{time_stamp}"] = header + + _, headers = await self._http.request_all( + "POST", + f"/b/{self.name}/{entry_name}/batch", + data=body, + headers=record_headers, + ) + + errors = {} + for key, value in headers.items(): + if key.startswith(ERROR_PREFIX): + errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value) + + return errors + async def query( self, entry_name: str, @@ -293,9 +344,8 @@ async def get_full_info(self) -> BucketFullInfo: """ Get full information about bucket (settings, statistics, entries) """ - return BucketFullInfo.parse_raw( - await self._http.request_all("GET", f"/b/{self.name}") - ) + body, _ = await self._http.request_all("GET", f"/b/{self.name}") + return BucketFullInfo.model_validate_json(body) async def subscribe( self, entry_name: str, start: Optional[int] = None, poll_interval=1.0, **kwargs @@ -371,7 +421,7 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs): params["limit"] = kwargs["limit"] url = f"/b/{self.name}/{entry_name}" - data = await self._http.request_all( + data, _ = await self._http.request_all( "GET", f"{url}/q", params=params, diff --git a/reduct/client.py b/reduct/client.py index e56b3f8..6cf4f0b 100644 --- a/reduct/client.py +++ b/reduct/client.py @@ -140,7 +140,8 @@ async def info(self) -> ServerInfo: Raises: ReductError: if there is an HTTP error """ - return ServerInfo.parse_raw(await self._http.request_all("GET", "/info")) + body, _ = await self._http.request_all("GET", "/info") + return ServerInfo.model_validate_json(body) async def list(self) -> List[BucketInfo]: """ @@ -151,9 +152,8 @@ async def list(self) -> List[BucketInfo]: Raises: ReductError: if there is an HTTP error """ - return BucketList.parse_raw( - await self._http.request_all("GET", "/list") - ).buckets + body, _ = await self._http.request_all("GET", "/list") + return BucketList.model_validate_json(body).buckets async def get_bucket(self, name: str) -> Bucket: """ @@ -187,7 +187,7 @@ async def create_bucket( Raises: ReductError: if there is an HTTP error """ - data = settings.json() if settings else None + data = settings.model_dump_json() if settings else None try: await self._http.request_all("POST", f"/b/{name}", data=data) except ReductError as err: @@ -204,9 +204,8 @@ async def get_token_list(self) -> List[Token]: Raises: ReductError: if there is an HTTP error """ - return TokenList.parse_raw( - await self._http.request_all("GET", "/tokens") - ).tokens + body, _ = await self._http.request_all("GET", "/tokens") + return TokenList.model_validate_json(body).tokens async def get_token(self, name: str) -> FullTokenInfo: """ @@ -218,9 +217,8 @@ async def get_token(self, name: str) -> FullTokenInfo: Raises: ReductError: if there is an HTTP error """ - return FullTokenInfo.parse_raw( - await self._http.request_all("GET", f"/tokens/{name}") - ) + body, _ = await self._http.request_all("GET", f"/tokens/{name}") + return FullTokenInfo.model_validate_json(body) async def create_token(self, name: str, permissions: Permissions) -> str: """ @@ -233,11 +231,10 @@ async def create_token(self, name: str, permissions: Permissions) -> str: Raises: ReductError: if there is an HTTP error """ - return TokenCreateResponse.parse_raw( - await self._http.request_all( - "POST", f"/tokens/{name}", data=permissions.json() - ) - ).value + body, _ = await self._http.request_all( + "POST", f"/tokens/{name}", data=permissions.model_dump_json() + ) + return TokenCreateResponse.model_validate_json(body).value async def remove_token(self, name: str) -> None: """ @@ -257,4 +254,5 @@ async def me(self) -> FullTokenInfo: Raises: ReductError: if there is an HTTP error """ - return FullTokenInfo.parse_raw(await self._http.request_all("GET", "/me")) + body, _ = await self._http.request_all("GET", "/me") + return FullTokenInfo.model_validate_json(body) diff --git a/reduct/error.py b/reduct/error.py index da52862..b007641 100644 --- a/reduct/error.py +++ b/reduct/error.py @@ -9,6 +9,11 @@ def __init__(self, code: int, message: str): self._message = message super().__init__(f"Status {self._code}: {self.message}") + @staticmethod + def from_header(header: str) -> "ReductError": + status_code, message = header.split(",", 1) + return ReductError(int(status_code), message) + @property def status_code(self): """Return HTTP status code""" diff --git a/reduct/http.py b/reduct/http.py index f07e753..48e24c0 100644 --- a/reduct/http.py +++ b/reduct/http.py @@ -112,10 +112,22 @@ async def _request( 599, f"Connection failed, server {self._url} cannot be reached" ) from None - async def request_all(self, method: str, path: str = "", **kwargs) -> bytes: - """Http request""" + async def request_all( + self, method: str, path: str = "", **kwargs + ) -> (bytes, Dict[str, str]): + """Http request + Args: + method (str): HTTP method + path (str, optional): Path. Defaults to "". + **kwargs: kwargs for aiohttp.request + Returns: + bytes: response body + Dict[str, str]: response headers + Raises: + ReductError: if request failed + """ async with self.request(method, path, **kwargs) as response: - return await response.read() + return await response.read(), response.headers async def request_chunked( self, method: str, path: str = "", chunk_size=1024, **kwargs diff --git a/reduct/record.py b/reduct/record.py index ec30c15..f81d086 100644 --- a/reduct/record.py +++ b/reduct/record.py @@ -2,7 +2,7 @@ import asyncio from dataclasses import dataclass from functools import partial -from typing import Dict, Callable, AsyncIterator, Awaitable +from typing import Dict, Callable, AsyncIterator, Awaitable, Optional, List, Tuple from aiohttp import ClientResponse @@ -28,7 +28,46 @@ class Record: """labels of record""" +class Batch: + def __init__(self): + self._records: Dict[int, Record] = {} + + def add( + self, + timestamp: int, + data: bytes, + content_type: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + ): + """Add record to batch + Args: + timestamp: UNIX timestamp in microseconds + data: data to store + content_type: content type of data (default: application/octet-stream) + labels: labels of record (default: {}) + """ + if content_type is None: + content_type = "application/octet-stream" + if labels is None: + labels = {} + + record = Record() + record.timestamp = timestamp + record.size = len(data) + record.content_type = content_type + record.labels = labels + record.read_all = lambda: data + + self._records[timestamp] = record + + def items(self) -> List[Tuple[int, Record]]: + """Get records as dict items""" + return sorted(self._records.items()) + + LABEL_PREFIX = "x-reduct-label-" +TIME_PREFIX = "x-reduct-time-" +ERROR_PREFIX = "x-reduct-error-" CHUNK_SIZE = 512_000 @@ -109,14 +148,12 @@ async def _read_all(buffer): async def parse_batched_records(resp: ClientResponse) -> AsyncIterator[Record]: """Parse batched records from response""" - records_total = sum( - 1 for header in resp.headers if header.startswith("x-reduct-time-") - ) + records_total = sum(1 for header in resp.headers if header.startswith(TIME_PREFIX)) records_count = 0 head = resp.method == "HEAD" for name, value in resp.headers.items(): - if name.startswith("x-reduct-time-"): + if name.startswith(TIME_PREFIX): timestamp = int(name[14:]) content_length, content_type, labels = _parse_header_as_csv_row(value) From 26bc5d6be7860f7a4266dba739dffd853f5f9cfc Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 20:36:04 +0200 Subject: [PATCH 3/7] add tests --- reduct/bucket.py | 13 +++++++--- reduct/error.py | 3 +++ reduct/http.py | 6 ++++- reduct/record.py | 21 +++++++++++----- tests/bucket_test.py | 58 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 11 deletions(-) diff --git a/reduct/bucket.py b/reduct/bucket.py index b31c9fb..531c072 100644 --- a/reduct/bucket.py +++ b/reduct/bucket.py @@ -259,10 +259,10 @@ async def write_batch( """ record_headers = {} - body = b"" + content_length = 0 for time_stamp, record in batch.items(): + content_length += record.size header = f"{record.size},{record.content_type}" - body += record.data for label, value in record.labels.items(): if "," in label or "=" in label: header += f',{label}="{value}"' @@ -271,11 +271,16 @@ async def write_batch( record_headers[f"{TIME_PREFIX}{time_stamp}"] = header + async def iter_body(): + for _, rec in batch.items(): + yield await rec.read_all() + _, headers = await self._http.request_all( "POST", f"/b/{self.name}/{entry_name}/batch", - data=body, - headers=record_headers, + data=iter_body(), + extra_headers=record_headers, + content_length=content_length, ) errors = {} diff --git a/reduct/error.py b/reduct/error.py index b007641..20b7df0 100644 --- a/reduct/error.py +++ b/reduct/error.py @@ -23,3 +23,6 @@ def status_code(self): def message(self): """Return error message""" return self._message + + def __eq__(self, other: "ReductError"): + return self._code == other._code and self._message == other._message diff --git a/reduct/http.py b/reduct/http.py index 48e24c0..997a7cb 100644 --- a/reduct/http.py +++ b/reduct/http.py @@ -42,7 +42,7 @@ async def request( ) -> AsyncIterator[ClientResponse]: """HTTP request with ReductError exception""" - extra_headers = {} + extra_headers = kwargs.pop("extra_headers", {}) expect100 = False if "content_length" in kwargs: @@ -120,6 +120,10 @@ async def request_all( method (str): HTTP method path (str, optional): Path. Defaults to "". **kwargs: kwargs for aiohttp.request + Kwargs: + data (bytes | AsyncIterator[bytes): request body + extra_headers (Dict[str, str]): extra headers + content_length (int): content length Returns: bytes: response body Dict[str, str]: response headers diff --git a/reduct/record.py b/reduct/record.py index f81d086..a46689a 100644 --- a/reduct/record.py +++ b/reduct/record.py @@ -51,12 +51,21 @@ def add( if labels is None: labels = {} - record = Record() - record.timestamp = timestamp - record.size = len(data) - record.content_type = content_type - record.labels = labels - record.read_all = lambda: data + def read(n: int) -> AsyncIterator[bytes]: + raise NotImplementedError() + + async def read_all(): + return data + + record = Record( + timestamp=timestamp, + size=len(data), + content_type=content_type, + labels=labels, + read_all=read_all, + read=read, + last=False, + ) self._records[timestamp] = record diff --git a/tests/bucket_test.py b/tests/bucket_test.py index 602abd7..072678c 100644 --- a/tests/bucket_test.py +++ b/tests/bucket_test.py @@ -7,6 +7,7 @@ import pytest from reduct import ReductError, BucketSettings, QuotaType, Record, BucketFullInfo +from reduct.record import Batch from tests.conftest import requires_api @@ -382,3 +383,60 @@ async def read_chunks(rec: Record): data = await read_chunks(records[2]) assert data == (b"3" * size) + + +@requires_api("1.7") +@pytest.mark.asyncio +async def test_batched_write(bucket_1): + """Should write batched records""" + batch = Batch() + batch.add(1000, b"Hey,", "plain/text", {"label1": "value1"}) + batch.add(2000, b"how", "plain/text", {"label2": "value2"}) + batch.add( + 3000, + b"are", + "plain/text", + ) + batch.add(4000, b"you?") + + await bucket_1.write_batch("entry-3", batch) + + records = [record async for record in bucket_1.query("entry-3")] + frase = b" ".join( + [await record.read_all() async for record in bucket_1.query("entry-3")] + ) + assert len(records) == 4 + + assert records[0].timestamp == 1000 + assert records[0].content_type == "plain/text" + assert records[0].labels == {"label1": "value1"} + + assert records[1].timestamp == 2000 + assert records[1].content_type == "plain/text" + assert records[1].labels == {"label2": "value2"} + + assert records[2].timestamp == 3000 + assert records[2].content_type == "plain/text" + assert records[2].labels == {} + + assert records[3].timestamp == 4000 + assert records[3].content_type == "application/octet-stream" + assert records[3].labels == {} + + assert frase == b"Hey, how are you?" + + +@requires_api("1.7") +@pytest.mark.asyncio +async def test_batched_write_with_errors(bucket_1): + """Should write batched records and return errors""" + + await bucket_1.write("entry-3", b"1", timestamp=1) + + batch = Batch() + batch.add(1, b"new") + batch.add(2, b"reocrd") + + errors = await bucket_1.write_batch("entry-3", batch) + assert len(errors) == 1 + assert errors[1] == ReductError(409, "A record with timestamp 1 already exists") From 1bfd4016caf830293709984f68c417c7ad8873a0 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 20:37:28 +0200 Subject: [PATCH 4/7] docs --- reduct/error.py | 1 + reduct/record.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/reduct/error.py b/reduct/error.py index 20b7df0..bbb1aaa 100644 --- a/reduct/error.py +++ b/reduct/error.py @@ -11,6 +11,7 @@ def __init__(self, code: int, message: str): @staticmethod def from_header(header: str) -> "ReductError": + """Create ReductError from HTTP header with status code and message (batched write)""" status_code, message = header.split(",", 1) return ReductError(int(status_code), message) diff --git a/reduct/record.py b/reduct/record.py index a46689a..09a6f29 100644 --- a/reduct/record.py +++ b/reduct/record.py @@ -29,6 +29,8 @@ class Record: class Batch: + """Batch of records to write them in one request""" + def __init__(self): self._records: Dict[int, Record] = {} From 910be4c94603069dd545ab58e3d9ff9d61ada57b Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 20:39:58 +0200 Subject: [PATCH 5/7] update README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fda2c64..e48655f 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,12 @@ This package provides an asynchronous HTTP client for interacting with the [Redu ## Features -* Supports the [ReductStore HTTP API v1.6](https://docs.reduct.store/http-api) +* Supports the [ReductStore HTTP API v1.7](https://docs.reduct.store/http-api) * Bucket management * API Token management * Write, read and query data * Labels -* Batching records +* Batching records for read and write operations * Subscription on new data ## Install From 4759f96e54ca63d7f685f1482795ad04b5c2e8bc Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 20:41:30 +0200 Subject: [PATCH 6/7] update CHANGELOG --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 402dbf4..9876350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] + +### Added: + +- Support for ReductStore HTTP API v1.7, see `Bucket.write_batch` method, [PR-95](https://github.com/reductstore/reduct-py/pull/95) + ### Changed: - Update dependencies and migrate to Pydantic v2, [PR-94](https://github.com/reductstore/reduct-py/pull/94) From bb26e7f46f1e578ac65e1909d6bf1f3db7dc8c52 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 2 Oct 2023 20:42:15 +0200 Subject: [PATCH 7/7] update docs --- reduct/error.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reduct/error.py b/reduct/error.py index bbb1aaa..3a2b261 100644 --- a/reduct/error.py +++ b/reduct/error.py @@ -11,7 +11,9 @@ def __init__(self, code: int, message: str): @staticmethod def from_header(header: str) -> "ReductError": - """Create ReductError from HTTP header with status code and message (batched write)""" + """Create ReductError from HTTP header + with status code and message (batched write + )""" status_code, message = header.split(",", 1) return ReductError(int(status_code), message)