Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RS-45: Update client to support replication API #100

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- docs: update link to new website, [PR-98](https://github.com/reductstore/reduct-py/pull/98)
- Optimize batch read for better memory efficiency, [PR-99](https://github.com/reductstore/reduct-py/pull/99)
- Support for ReductStore HTTP API v1.8 with replication endpoints, [PR-100](https://github.com/reductstore/reduct-py/pull/100)

## [1.7.1] - 2023-10-09

Expand Down
91 changes: 91 additions & 0 deletions docs/replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Replicate Data Between Buckets

This guide explains how to use the ReductStore Python SDK to work with replications. Replications allow you to synchronize data between two buckets.

The destination bucket can be on the same ReductStore instance or on a different instance (e.g., a ReductStore instance running on a remote server).

## Prerequisites

Before you start working with replications, ensure that you have the following:

1. A running ReductStore instance with version 1.8.0 or higher. You can download the latest version of ReductStore from [here](https://reduct.store/download).
2. A source bucket and a destination bucket. See the [Quick Start](./quick-start.md) guide for instructions on how to create buckets.
4. An instance of the ReductStore Python SDK client. See the [Quick Start](./quick-start.md) guide for instructions on how to create a client instance.

## Replication Concepts

### What is Replication?

Replication is a feature that enables you to create and manage data synchronization tasks between two buckets. It allows you to mirror data from a source bucket to a destination bucket.

### Key Components

To understand how replication works, let's look at its key components:

- **Source Bucket:** The bucket from which data is replicated.
- **Destination Bucket:** The bucket to which data is copied.
- **Destination Host:** The URL of the destination host (e.g., https://play.reduct.store).
- **Replication Name:** A unique name for the replication task.
- **Replication Settings:** Configuration options for the replication, such as specifying which records to replicate and which to exclude.

## Working with Replications

The ReductStore Python SDK provides several methods to work with replications. Here's an overview of these methods:

### 1. Create a New Replication

Create a new replication with the specified settings.

```python
from reduct import ReplicationSettings

replication_settings = ReplicationSettings(
src_bucket="source-bucket",
dst_bucket="destination-bucket",
dst_host="https://play.reduct.store",
)
await client.create_replication("my-replication", replication_settings)
```

### 2. Get a List of Replications

You can retrieve a list of all replications, along with their statuses.

```python
replications = await client.get_replications()
```

### 3. Get Detailed Information about a Replication

Get detailed information about a specific replication using its name.

```python
replication_detail = await client.get_replication_detail("my-replication")
```

### 4. Update an Existing Replication

Update the settings of an existing replication.

```python
from reduct import ReplicationSettings

new_settings = ReplicationSettings(
src_bucket="updated-source-bucket",
dst_bucket="updated-destination-bucket",
dst_host="https://play.reduct.store",
)
await client.update_replication("my-replication", new_settings)
```

### 5. Delete a Replication

Delete a replication by specifying its name.

```python
await client.delete_replication("my-replication")
```

## Next Steps

Refer to the [Reference API Section](./api/client.md) for detailed documentation about the Python Client. For a comprehensive reference of all API calls, visit the [ReductStore HTTP API documentation](https://reduct.store/docs/http-api).
3 changes: 3 additions & 0 deletions reduct/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
Token,
Permissions,
FullTokenInfo,
ReplicationInfo,
ReplicationDetailInfo,
ReplicationSettings,
)

from reduct.error import ReductError
137 changes: 137 additions & 0 deletions reduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,73 @@ class TokenCreateResponse(BaseModel):
"""token for authentication"""


class ReplicationInfo(BaseModel):
"""Replication information"""

name: str
is_provisioned: bool
is_active: bool
pending_records: int


class ReplicationList(BaseModel):
"""List of replications"""

replications: List[ReplicationInfo]


class ReplicationDiagnosticsError(BaseModel):
"""Error information for replication"""

count: int
last_message: str


class ReplicationDiagnosticsDetail(BaseModel):
"""Diagnostics information for replication"""

ok: int
errored: int
errors: Dict[int, ReplicationDiagnosticsError]


class ReplicationDiagnostics(BaseModel):
"""Detailed diagnostics for replication"""

hourly: ReplicationDiagnosticsDetail


class ReplicationSettingsDetail(BaseModel):
"""Settings for replication"""

src_bucket: str
dst_bucket: str
dst_host: str
entries: List[str]
include: Dict[str, str]
exclude: Dict[str, str]


class ReplicationDetailInfo(BaseModel):
"""Complete information about a replication"""

diagnostics: ReplicationDiagnostics
info: ReplicationInfo
settings: ReplicationSettingsDetail


class ReplicationSettings(BaseModel):
"""Settings for creating a replication"""

src_bucket: str
dst_bucket: str
dst_host: str
dst_token: str = ""
entries: List[str] = []
include: Dict[str, str] = {}
exclude: Dict[str, str] = {}


class Client:
"""HTTP Client for Reduct Storage HTTP API"""

Expand Down Expand Up @@ -256,3 +323,73 @@ async def me(self) -> FullTokenInfo:
"""
body, _ = await self._http.request_all("GET", "/me")
return FullTokenInfo.model_validate_json(body)

async def get_replications(self) -> List[ReplicationInfo]:
"""
Get a list of replications
Returns:
List[ReplicationInfo]: List of replications with their statuses
Raises:
ReductError: if there is an HTTP error
"""
body, _ = await self._http.request_all("GET", "/replications")
return ReplicationList.model_validate_json(body).replications

async def get_replication_detail(
self, replication_name: str
) -> ReplicationDetailInfo:
"""
Get detailed information about a replication
Args:
replication_name: Name of the replication to show details
Returns:
ReplicationDetailInfo: Detailed information about the replication
Raises:
ReductError: if there is an HTTP error
"""
body, _ = await self._http.request_all(
"GET", f"/replications/{replication_name}"
)
return ReplicationDetailInfo.model_validate_json(body)

async def create_replication(
self, replication_name: str, settings: ReplicationSettings
) -> None:
"""
Create a new replication
Args:
replication_name: Name of the new replication
settings: Settings for the new replication
Raises:
ReductError: if there is an HTTP error
"""
data = settings.model_dump_json()
await self._http.request_all(
"POST", f"/replications/{replication_name}", data=data
)

async def update_replication(
self, replication_name: str, settings: ReplicationSettings
) -> None:
"""
Update an existing replication
Args:
replication_name: Name of the replication to update
settings: New settings for the replication
Raises:
ReductError: if there is an HTTP error
"""
data = settings.model_dump_json()
await self._http.request_all(
"PUT", f"/replications/{replication_name}", data=data
)

async def delete_replication(self, replication_name: str) -> None:
"""
Delete a replication
Args:
replication_name: Name of the replication to delete
Raises:
ReductError: if there is an HTTP error
"""
await self._http.request_all("DELETE", f"/replications/{replication_name}")
40 changes: 39 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest_asyncio
import requests

from reduct import Client, Bucket
from reduct import Client, Bucket, ReplicationSettings


def requires_env(key):
Expand Down Expand Up @@ -74,3 +74,41 @@ async def _bucket_2(client) -> Bucket:
await bucket.write("entry-1", b"some-data-2", timestamp=6_000_000)
yield bucket
await bucket.remove()


@pytest_asyncio.fixture(name="replication_1")
async def _replication_1(client) -> str:
replication_name = "replication-1"
replication_settings = ReplicationSettings(
src_bucket="bucket-1",
dst_bucket="bucket-2",
dst_host="http://127.0.0.1:8383",
)
await client.create_replication(replication_name, replication_settings)
yield replication_name
await client.delete_replication(replication_name)


@pytest_asyncio.fixture(name="replication_2")
async def _replication_2(client) -> str:
replication_name = "replication-2"
replication_settings = ReplicationSettings(
src_bucket="bucket-1",
dst_bucket="bucket-2",
dst_host="http://127.0.0.1:8383",
)
await client.create_replication(replication_name, replication_settings)
yield replication_name
await client.delete_replication(replication_name)


@pytest_asyncio.fixture(name="temporary_replication")
async def _temporary_replication(client) -> str:
replication_name = "temp-replication"
replication_settings = ReplicationSettings(
src_bucket="bucket-1",
dst_bucket="bucket-2",
dst_host="http://127.0.0.1:8383",
)
await client.create_replication(replication_name, replication_settings)
yield replication_name
57 changes: 57 additions & 0 deletions tests/replication_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for replication endpoints"""
import pytest
from reduct import (
ReductError,
ReplicationInfo,
ReplicationDetailInfo,
ReplicationSettings,
)


@pytest.mark.asyncio
@pytest.mark.usefixtures("bucket_1", "bucket_2")
async def test__get_replications(client, replication_1, replication_2):
"""Test getting a list of replications"""
replications = await client.get_replications()
assert isinstance(replications, list)
for replication in replications:
assert isinstance(replication, ReplicationInfo)
assert replication.name in [replication_1, replication_2]


@pytest.mark.asyncio
@pytest.mark.usefixtures("bucket_1", "bucket_2")
async def test__get_replication_detail(client, replication_1):
"""Test create a replication and get its details"""
replication_detail = await client.get_replication_detail(replication_1)
assert isinstance(replication_detail, ReplicationDetailInfo)
assert replication_detail.info.name == replication_1


@pytest.mark.asyncio
@pytest.mark.usefixtures("bucket_1", "bucket_2")
async def test__update_replication(client, replication_1):
"""Test updating an existing replication"""
new_settings = ReplicationSettings(
src_bucket="bucket-2",
dst_bucket="bucket-1",
dst_host="https://play.reduct.store",
)
await client.update_replication(replication_1, new_settings)
replication_detail = await client.get_replication_detail(replication_1)
assert replication_detail.settings.src_bucket == new_settings.src_bucket
assert replication_detail.settings.dst_bucket == new_settings.dst_bucket
assert replication_detail.settings.dst_host == new_settings.dst_host


@pytest.mark.asyncio
@pytest.mark.usefixtures("bucket_1", "bucket_2")
async def test_delete_replication(client, temporary_replication):
"""Test deleting a replication"""
await client.delete_replication(temporary_replication)
with pytest.raises(ReductError) as reduct_err:
await client.get_replication_detail(temporary_replication)
assert (
str(reduct_err.value)
== f"Status 404: Replication '{temporary_replication}' does not exist"
)
Loading