Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ReductStore API v1.7 #95

Merged
merged 7 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


### Added:

- Support for ReductStore HTTP API v1.7, see `Bucket.write_batch` method, [PR-95](https://github.com/reductstore/reduct-py/pull/95)

### Changed:

- Update dependencies and migrate to Pydantic v2, [PR-94](https://github.com/reductstore/reduct-py/pull/94)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ This package provides an asynchronous HTTP client for interacting with the [Redu

## Features

* Supports the [ReductStore HTTP API v1.6](https://docs.reduct.store/http-api)
* Supports the [ReductStore HTTP API v1.7](https://docs.reduct.store/http-api)
* Bucket management
* API Token management
* Write, read and query data
* Labels
* Batching records
* Batching records for read and write operations
* Subscription on new data

## Install
Expand Down
68 changes: 63 additions & 5 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
List,
AsyncIterator,
Union,
Dict,
)

from pydantic import BaseModel

from reduct.error import ReductError
from reduct.http import HttpClient
from reduct.record import Record, parse_batched_records, parse_record
from reduct.record import (
Record,
parse_batched_records,
parse_record,
Batch,
TIME_PREFIX,
ERROR_PREFIX,
)


class QuotaType(Enum):
Expand Down Expand Up @@ -58,6 +67,9 @@ class BucketInfo(BaseModel):
latest_record: int
"""UNIX timestamp of the latest record in microseconds"""

is_provisioned: bool = False
"""bucket is provisioned amd you can't remove it or change its settings"""


class EntryInfo(BaseModel):
"""Entry of bucket"""
Expand Down Expand Up @@ -231,6 +243,53 @@ async def write(
**kwargs,
)

async def write_batch(
self, entry_name: str, batch: Batch
) -> Dict[int, ReductError]:
"""
Write a batch of records to entries in a sole request

Args:
entry_name: name of entry in the bucket
batch: list of records
Returns:
dict of errors with timestamps as keys
Raises:
ReductError: if there is an HTTP or communication error
"""

record_headers = {}
content_length = 0
for time_stamp, record in batch.items():
content_length += record.size
header = f"{record.size},{record.content_type}"
for label, value in record.labels.items():
if "," in label or "=" in label:
header += f',{label}="{value}"'
else:
header += f",{label}={value}"

record_headers[f"{TIME_PREFIX}{time_stamp}"] = header

async def iter_body():
for _, rec in batch.items():
yield await rec.read_all()

_, headers = await self._http.request_all(
"POST",
f"/b/{self.name}/{entry_name}/batch",
data=iter_body(),
extra_headers=record_headers,
content_length=content_length,
)

errors = {}
for key, value in headers.items():
if key.startswith(ERROR_PREFIX):
errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value)

return errors

async def query(
self,
entry_name: str,
Expand Down Expand Up @@ -290,9 +349,8 @@ async def get_full_info(self) -> BucketFullInfo:
"""
Get full information about bucket (settings, statistics, entries)
"""
return BucketFullInfo.parse_raw(
await self._http.request_all("GET", f"/b/{self.name}")
)
body, _ = await self._http.request_all("GET", f"/b/{self.name}")
return BucketFullInfo.model_validate_json(body)

async def subscribe(
self, entry_name: str, start: Optional[int] = None, poll_interval=1.0, **kwargs
Expand Down Expand Up @@ -368,7 +426,7 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs):
params["limit"] = kwargs["limit"]

url = f"/b/{self.name}/{entry_name}"
data = await self._http.request_all(
data, _ = await self._http.request_all(
"GET",
f"{url}/q",
params=params,
Expand Down
35 changes: 18 additions & 17 deletions reduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Token(BaseModel):
created_at: datetime
"""creation time of token"""

is_provisioned: bool = False
"""token is provisioned and can't be deleted or changed"""


class FullTokenInfo(Token):
"""Full information about token with permissions"""
Expand Down Expand Up @@ -137,7 +140,8 @@ async def info(self) -> ServerInfo:
Raises:
ReductError: if there is an HTTP error
"""
return ServerInfo.parse_raw(await self._http.request_all("GET", "/info"))
body, _ = await self._http.request_all("GET", "/info")
return ServerInfo.model_validate_json(body)

async def list(self) -> List[BucketInfo]:
"""
Expand All @@ -148,9 +152,8 @@ async def list(self) -> List[BucketInfo]:
Raises:
ReductError: if there is an HTTP error
"""
return BucketList.parse_raw(
await self._http.request_all("GET", "/list")
).buckets
body, _ = await self._http.request_all("GET", "/list")
return BucketList.model_validate_json(body).buckets

async def get_bucket(self, name: str) -> Bucket:
"""
Expand Down Expand Up @@ -184,7 +187,7 @@ async def create_bucket(
Raises:
ReductError: if there is an HTTP error
"""
data = settings.json() if settings else None
data = settings.model_dump_json() if settings else None
try:
await self._http.request_all("POST", f"/b/{name}", data=data)
except ReductError as err:
Expand All @@ -201,9 +204,8 @@ async def get_token_list(self) -> List[Token]:
Raises:
ReductError: if there is an HTTP error
"""
return TokenList.parse_raw(
await self._http.request_all("GET", "/tokens")
).tokens
body, _ = await self._http.request_all("GET", "/tokens")
return TokenList.model_validate_json(body).tokens

async def get_token(self, name: str) -> FullTokenInfo:
"""
Expand All @@ -215,9 +217,8 @@ async def get_token(self, name: str) -> FullTokenInfo:
Raises:
ReductError: if there is an HTTP error
"""
return FullTokenInfo.parse_raw(
await self._http.request_all("GET", f"/tokens/{name}")
)
body, _ = await self._http.request_all("GET", f"/tokens/{name}")
return FullTokenInfo.model_validate_json(body)

async def create_token(self, name: str, permissions: Permissions) -> str:
"""
Expand All @@ -230,11 +231,10 @@ async def create_token(self, name: str, permissions: Permissions) -> str:
Raises:
ReductError: if there is an HTTP error
"""
return TokenCreateResponse.parse_raw(
await self._http.request_all(
"POST", f"/tokens/{name}", data=permissions.json()
)
).value
body, _ = await self._http.request_all(
"POST", f"/tokens/{name}", data=permissions.model_dump_json()
)
return TokenCreateResponse.model_validate_json(body).value

async def remove_token(self, name: str) -> None:
"""
Expand All @@ -254,4 +254,5 @@ async def me(self) -> FullTokenInfo:
Raises:
ReductError: if there is an HTTP error
"""
return FullTokenInfo.parse_raw(await self._http.request_all("GET", "/me"))
body, _ = await self._http.request_all("GET", "/me")
return FullTokenInfo.model_validate_json(body)
11 changes: 11 additions & 0 deletions reduct/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ def __init__(self, code: int, message: str):
self._message = message
super().__init__(f"Status {self._code}: {self.message}")

@staticmethod
def from_header(header: str) -> "ReductError":
"""Create ReductError from HTTP header
with status code and message (batched write
)"""
status_code, message = header.split(",", 1)
return ReductError(int(status_code), message)

@property
def status_code(self):
"""Return HTTP status code"""
Expand All @@ -18,3 +26,6 @@ def status_code(self):
def message(self):
"""Return error message"""
return self._message

def __eq__(self, other: "ReductError"):
return self._code == other._code and self._message == other._message
24 changes: 20 additions & 4 deletions reduct/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def request(
) -> AsyncIterator[ClientResponse]:
"""HTTP request with ReductError exception"""

extra_headers = {}
extra_headers = kwargs.pop("extra_headers", {})
expect100 = False

if "content_length" in kwargs:
Expand Down Expand Up @@ -112,10 +112,26 @@ async def _request(
599, f"Connection failed, server {self._url} cannot be reached"
) from None

async def request_all(self, method: str, path: str = "", **kwargs) -> bytes:
"""Http request"""
async def request_all(
self, method: str, path: str = "", **kwargs
) -> (bytes, Dict[str, str]):
"""Http request
Args:
method (str): HTTP method
path (str, optional): Path. Defaults to "".
**kwargs: kwargs for aiohttp.request
Kwargs:
data (bytes | AsyncIterator[bytes): request body
extra_headers (Dict[str, str]): extra headers
content_length (int): content length
Returns:
bytes: response body
Dict[str, str]: response headers
Raises:
ReductError: if request failed
"""
async with self.request(method, path, **kwargs) as response:
return await response.read()
return await response.read(), response.headers

async def request_chunked(
self, method: str, path: str = "", chunk_size=1024, **kwargs
Expand Down
58 changes: 53 additions & 5 deletions reduct/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
from dataclasses import dataclass
from functools import partial
from typing import Dict, Callable, AsyncIterator, Awaitable
from typing import Dict, Callable, AsyncIterator, Awaitable, Optional, List, Tuple

from aiohttp import ClientResponse

Expand All @@ -28,7 +28,57 @@ class Record:
"""labels of record"""


class Batch:
"""Batch of records to write them in one request"""

def __init__(self):
self._records: Dict[int, Record] = {}

def add(
self,
timestamp: int,
data: bytes,
content_type: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
):
"""Add record to batch
Args:
timestamp: UNIX timestamp in microseconds
data: data to store
content_type: content type of data (default: application/octet-stream)
labels: labels of record (default: {})
"""
if content_type is None:
content_type = "application/octet-stream"
if labels is None:
labels = {}

def read(n: int) -> AsyncIterator[bytes]:
raise NotImplementedError()

async def read_all():
return data

record = Record(
timestamp=timestamp,
size=len(data),
content_type=content_type,
labels=labels,
read_all=read_all,
read=read,
last=False,
)

self._records[timestamp] = record

def items(self) -> List[Tuple[int, Record]]:
"""Get records as dict items"""
return sorted(self._records.items())


LABEL_PREFIX = "x-reduct-label-"
TIME_PREFIX = "x-reduct-time-"
ERROR_PREFIX = "x-reduct-error-"
CHUNK_SIZE = 512_000


Expand Down Expand Up @@ -109,14 +159,12 @@ async def _read_all(buffer):
async def parse_batched_records(resp: ClientResponse) -> AsyncIterator[Record]:
"""Parse batched records from response"""

records_total = sum(
1 for header in resp.headers if header.startswith("x-reduct-time-")
)
records_total = sum(1 for header in resp.headers if header.startswith(TIME_PREFIX))
records_count = 0
head = resp.method == "HEAD"

for name, value in resp.headers.items():
if name.startswith("x-reduct-time-"):
if name.startswith(TIME_PREFIX):
timestamp = int(name[14:])
content_length, content_type, labels = _parse_header_as_csv_row(value)

Expand Down
Loading
Loading