Skip to content

Commit

Permalink
RS-261: Support each_n and each_s query parameters (#110)
Browse files Browse the repository at this point in the history
* refactor

* add tests for new parameters

* update tests

* suppress generator warnings

* update CHANGELOG and README
  • Loading branch information
atimin authored May 15, 2024
1 parent 997ff31 commit f4b9557
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
--env RS_API_TOKEN=${{matrix.token}}
--env RS_LOG_LEVEL=DEBUG
--env RS_LICENSE_PATH=${{matrix.license}}
-d reduct/store:main
-d reduct/store:${{matrix.reductstore_version}}

- name: Run Tests
run: PYTHONPATH=. RS_API_TOKEN=${{matrix.token}} RS_LICENSE_PATH=${{matrix.license}} pytest tests
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


### Added:

- RS-261: Support each_n and each_s query parameters, [PR-110](https://github.com/reductstore/reduct-py/pull/110)

## [1.9.1] - 2024-04-26

## Fixed:
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
[![PyPI - Downloads](https://img.shields.io/pypi/dm/reduct-py)](https://pypi.org/project/reduct-py/)
[![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/reductstore/reduct-py/ci.yml?branch=main)](https://github.com/reductstore/reduct-py/actions)

This package provides an asynchronous HTTP client for interacting with the [ReductStore](https://www.reduct.store) service.
This package provides an asynchronous HTTP client for interacting with [ReductStore](https://www.reduct.store) in Python.

## Features

* Supports the [ReductStore HTTP API v1.9](https://reduct.store/docs/http-api)
* Supports the [ReductStore HTTP API v1.10](https://reduct.store/docs/http-api)
* Bucket management
* API Token management
* Write, read and query data
* Labels
* Labeling records
* Batching records for read and write operations
* Subscription on new data
* Subscription
* Replication management

## Install

Expand Down
29 changes: 23 additions & 6 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ async def query(
include (dict): query records which have all labels from this dict
exclude (dict): query records which doesn't have all labels from this
head (bool): if True: get only the header of a recod with metadata
each_s(Union[int, float]): return a record for each S seconds
each_n(int): return each N-th record
limit (int): limit the number of records
Returns:
AsyncIterator[Record]: iterator to the records
Expand All @@ -345,7 +347,11 @@ async def query(
last = False
method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"

if self._http.api_version and self._http.api_version >= "1.5":
if (
self._http.api_version
and self._http.api_version[0] == 1
and self._http.api_version[1] >= 5
):
while not last:
async with self._http.request(
method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
Expand Down Expand Up @@ -409,7 +415,11 @@ async def subscribe(
)

method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"
if self._http.api_version and self._http.api_version >= "1.5":
if (
self._http.api_version
and self._http.api_version[0] == 1
and self._http.api_version[1] >= 5
):
while True:
async with self._http.request(
method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
Expand Down Expand Up @@ -437,8 +447,6 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs):
params["start"] = int(start)
if stop:
params["stop"] = int(stop)
if ttl:
params["ttl"] = int(ttl)

if "include" in kwargs:
for name, value in kwargs["include"].items():
Expand All @@ -447,12 +455,21 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs):
for name, value in kwargs["exclude"].items():
params[f"exclude-{name}"] = str(value)

if "continuous" in kwargs:
params["continuous"] = "true" if kwargs["continuous"] else "false"
if "each_s" in kwargs:
params["each_s"] = float(kwargs["each_s"])

if "each_n" in kwargs:
params["each_n"] = int(kwargs["each_n"])

if "limit" in kwargs:
params["limit"] = kwargs["limit"]

if ttl:
params["ttl"] = int(ttl)

if "continuous" in kwargs:
params["continuous"] = "true" if kwargs["continuous"] else "false"

url = f"/b/{self.name}/{entry_name}"
data, _ = await self._http.request_all(
"GET",
Expand Down
18 changes: 13 additions & 5 deletions reduct/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Internal HTTP helper"""

from contextlib import asynccontextmanager
from typing import Optional, AsyncIterator, Dict
from typing import Optional, AsyncIterator, Dict, Tuple

import aiohttp
from aiohttp import ClientTimeout, ClientResponse
Expand Down Expand Up @@ -39,7 +39,7 @@ def __init__(
self._verify_ssl = kwargs.pop("verify_ssl", True)

@asynccontextmanager
async def request(
async def request( # pylint: disable=contextmanager-generator-missing-cleanup
self, method: str, path: str = "", **kwargs
) -> AsyncIterator[ClientResponse]:
"""HTTP request with ReductError exception"""
Expand Down Expand Up @@ -135,7 +135,7 @@ async def request_all(
async with self.request(method, path, **kwargs) as response:
return await response.read(), response.headers

async def request_chunked(
async def request_chunked( # pylint: disable=contextmanager-generator-missing-cleanup
self, method: str, path: str = "", chunk_size=1024, **kwargs
) -> AsyncIterator[bytes]:
"""Http request"""
Expand All @@ -145,6 +145,14 @@ async def request_chunked(
return

@property
def api_version(self) -> Optional[str]:
def api_version(self) -> Optional[Tuple[int, int]]:
"""API version"""
return self._api_version
if self._api_version is None:
return None
return extract_api_version(self._api_version)


def extract_api_version(version: str) -> Tuple[int, int]:
"""Extract version"""
major, minor = version.split(".")
return int(major), int(minor)
55 changes: 44 additions & 11 deletions tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ async def test__get_entries(bucket_1):

assert entries[1].model_dump() == {
"block_count": 1,
"latest_record": 4000000,
"latest_record": 5000000,
"name": "entry-2",
"oldest_record": 3000000,
"record_count": 2,
"size": 108,
"record_count": 3,
"size": 157,
}


Expand All @@ -120,7 +120,7 @@ async def test__read_latest(bucket_1):
"""Should read the latest record if no timestamp"""
async with bucket_1.read("entry-2") as record:
data = await record.read_all()
assert data == b"some-data-4"
assert data == b"some-data-5"


@pytest.mark.asyncio
Expand Down Expand Up @@ -284,10 +284,10 @@ async def test_query_records_excluded_labels(bucket_2):
async def test_query_records_last(bucket_1):
"""Should query records for until last record"""
records: List[Record] = [
record async for record in bucket_1.query("entry-2", start=4_000_000)
record async for record in bucket_1.query("entry-2", start=5_000_000)
]
assert len(records) == 1
assert records[0].timestamp == 4_000_000
assert records[0].timestamp == 5_000_000


@pytest.mark.asyncio
Expand All @@ -305,22 +305,32 @@ async def test_query_records_limit(bucket_1):
async def test_query_records_all(bucket_1):
"""Should query records all data"""
records = [record async for record in bucket_1.query("entry-2")]
assert len(records) == 2
assert len(records) == 3


@pytest.mark.asyncio
async def test_read_record_in_chunks(bucket_1):
"""Should provide records with read method and read in chunks"""
data = [await record.read_all() async for record in bucket_1.query("entry-2")]
assert data == [b"some-data-3", b"some-data-4"]
assert data == [b"some-data-3", b"some-data-4", b"some-data-5"]

data = []

async for record in bucket_1.query("entry-2"):
async for chunk in record.read(n=4):
data.append(chunk)

assert data == [b"some", b"-dat", b"a-3", b"some", b"-dat", b"a-4"]
assert data == [
b"some",
b"-dat",
b"a-3",
b"some",
b"-dat",
b"a-4",
b"some",
b"-dat",
b"a-5",
]


@pytest.mark.asyncio
Expand Down Expand Up @@ -360,10 +370,10 @@ async def subscriber():
break

await asyncio.gather(
subscriber(), bucket_1.write("entry-2", b"some-data-5", labels={"stop": "true"})
subscriber(), bucket_1.write("entry-2", b"some-data-6", labels={"stop": "true"})
)

assert data == [b"some-data-3", b"some-data-4", b"some-data-5"]
assert data == [b"some-data-3", b"some-data-4", b"some-data-5", b"some-data-6"]


@pytest.mark.asyncio
Expand Down Expand Up @@ -475,3 +485,26 @@ async def test_batched_write_with_errors(bucket_1):
errors = await bucket_1.write_batch("entry-3", batch)
assert len(errors) == 1
assert errors[1] == ReductError(409, "A record with timestamp 1 already exists")


@pytest.mark.asyncio
@requires_api("1.10")
async def test_query_records_each_s(bucket_1):
"""Should query a record per 2 seconds"""
records: List[Record] = [
record async for record in bucket_1.query("entry-2", start=0, each_s=2.0)
]
assert len(records) == 2
assert records[0].timestamp == 3000000
assert records[1].timestamp == 5000000


@pytest.mark.asyncio
@requires_api("1.10")
async def test_query_records_each_n(bucket_1):
"""Should query each 3d records"""
records: List[Record] = [
record async for record in bucket_1.query("entry-2", start=0, each_n=3)
]
assert len(records) == 1
assert records[0].timestamp == 3000000
4 changes: 2 additions & 2 deletions tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ async def test__info(client):
await sleep(1)

info: ServerInfo = await client.info()
assert info.version >= "1.2.0"
assert info.version >= "1.10.0"
assert info.uptime >= 1
assert info.bucket_count == 2
assert info.usage == 324
assert info.usage == 373
assert info.oldest_record == 1_000_000
assert info.latest_record == 6_000_000

Expand Down
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from reduct import Client, Bucket, ReplicationSettings
from reduct.http import extract_api_version


def requires_env(key):
Expand All @@ -26,7 +27,7 @@ def requires_api(version):
"x-reduct-api"
]
return pytest.mark.skipif(
version > current_version,
extract_api_version(version)[1] > extract_api_version(current_version)[1],
reason=f"Not suitable API version {current_version} for current test",
)

Expand Down Expand Up @@ -64,6 +65,8 @@ async def _bucket_1(client) -> Bucket:
await bucket.write("entry-1", b"some-data-2", timestamp=2_000_000)
await bucket.write("entry-2", b"some-data-3", timestamp=3_000_000)
await bucket.write("entry-2", b"some-data-4", timestamp=4_000_000)
await bucket.write("entry-2", b"some-data-5", timestamp=5_000_000)

yield bucket
await bucket.remove()

Expand Down

0 comments on commit f4b9557

Please sign in to comment.