Skip to content

Commit

Permalink
DRIFT-563: Integrate reduct storage client (#24)
Browse files Browse the repository at this point in the history
* integrate reduct storage client

* fix current tests and style

* test reduct storage client

* update documentation

* update CHANGELOG

* fix the order
  • Loading branch information
atimin authored Nov 23, 2022
1 parent 9f25e89 commit 7a9c5b6
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 41 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- DRIFT-534: Dependencies compatibility table, [PR-21](https://github.com/panda-official/DriftPythonClient/pull/21)
- DRIFT-550: DriftClientError class, to catch (initially) Minio errors, [PR-16](https://github.com/panda-official/DriftPythonClient/pull/16)
- DRIFT-550: DriftClientError class, to catch (initially) Minio errors, [PR-16](https://github.com/panda-official/DriftPythonClient/pull/16)7
- DRIFT-563: Reduct Storage client, [PR-24](https://github.com/panda-official/DriftPythonClient/pull/24)
- DRIFT-604: Add blob property to DriftDataPackage, [PR-20](https://github.com/panda-official/DriftPythonClient/pull/20)

### Fixed:
Expand Down
5 changes: 2 additions & 3 deletions docs/dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
|-------------------|-------------------------------------------------------------------------------------|---------------|
| DriftPythonClient | [MPL-2.0](https://github.com/panda-official/DriftPythonClient/blob/develop/LICENSE) | |
| influxdb-client | [MIT](https://github.com/influxdata/influxdb-client-python/blob/master/LICENSE) | ✅ |
| minio | [Apache-2.0](https://github.com/minio/minio-py/blob/master/LICENSE) | ✅ |
| minio-py | [Apache-2.0](https://github.com/minio/minio-py/blob/master/LICENSE) | ✅ |
| DriftProtocol | [MPL-2.0](https://github.com/panda-official/DriftProtocol/blob/develop/LICENSE) | ✅ |
| WaveletBuffer | [MPL-2.0](https://github.com/panda-official/WaveletBuffer/blob/develop/LICENSE) | ✅ |
| paho-mqtt | [EPL-2.0](https://www.eclipse.org/legal/epl-2.0/) | ✅ |
| paho-mqtt | [BSD-3 Clause](https://www.eclipse.org/org/documents/edl-v10.php) | ✅ |
| NumPy | [BSD-3-Clause](https://github.com/numpy/numpy/blob/main/LICENSE.txt) | ✅ |
| deprecation | [Apache-2.0](https://github.com/briancurtin/deprecation/blob/master/LICENSE) | ✅ |


| reduct-py | [MIT](https://github.com/reduct-storage/reduct-py/blob/main/LICENSE) | ✅ |
Binary file modified docs/img/DrfitStrutcure.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions docs/panda_drift.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ coordinates of detected objects etc.
As you may notice, we have two types of data: processed input and metrics. _Drift Core_ keeps a history for both
of them, but it does it differently for both and for different purposes:

* Metrics are results of work of an AI application and this is data that users usually need. To store it, we use InfluxDB and keep
data for long term storage.
* Input Data are mostly needed for training and model validation. We store it short term as blobs and use Minio
to provide HTTP access to it.
* Metrics are results of work of an AI application and this is data that users usually need. To store it, we use
[InfluxDB](https://www.influxdata.com/products/influxdb-overview/) and keep data for long term storage.
* Input Data are mostly needed for training and model validation. We store it short term as blobs and
use [Minio](https://min.io) or [ReductStorage](https://reduct-storage.dev) to provide HTTP access to it.

## Integration

Expand Down
66 changes: 40 additions & 26 deletions pkg/drift_client/drift_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
"""

import time
import logging
from typing import Dict, List, Callable, Union, Any, Optional
import time
from datetime import datetime
import deprecation
from typing import Dict, List, Callable, Union, Any, Optional

import deprecation
from google.protobuf.message import DecodeError

from drift_client.drift_data_package import DriftDataPackage
from drift_client.influxdb_client import InfluxDBClient
from drift_client.minio_client import MinIOClient
from drift_client.mqtt_client import MQTTClient
from drift_client.reduct_client import ReductStorageClient

logger = logging.getLogger("drift-client")
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, host: str, password: str, **kwargs):
org (str): An organisation name. Default: "panda"
secure (bool): Use HTTPS protocol to access data: Default: False
minio_port (int): Minio port. Default: 9000
reduct_port (int): Reduct port. Default: 8383
influx_port (int): InfluxDB port. Default: 8086,
mqtt_port (int): MQTT port. Default: 1883
"""
Expand All @@ -57,21 +59,32 @@ def __init__(self, host: str, password: str, **kwargs):
secure = kwargs["secure"] if "secure" in kwargs else False
influx_port = kwargs["influx_port"] if "influx_port" in kwargs else 8086
minio_port = kwargs["minio_port"] if "minio_port" in kwargs else 9000
reduct_storage_port = (
kwargs["reduct_storage_port"] if "reduct_storage_port" in kwargs else 8383
)
mqtt_port = kwargs["mqtt_port"] if "mqtt_port" in kwargs else 1883

self.__influx_client = InfluxDBClient(
self._influx_client = InfluxDBClient(
f"{('https://' if secure else 'http://')}{host}:{influx_port}",
org,
password,
False,
) # TBD!!! --> SSL handling!
self.__minio_client = MinIOClient(
f"{('https://' if secure else 'http://')}{host}:{minio_port}",
user,
password,
False,
) # TBD!!! --> SSL handling!
self.__mqtt_client = MQTTClient(

try:
self._blob_storage = ReductStorageClient(
f"{('https://' if secure else 'http://')}{host}:{reduct_storage_port}",
password,
)
except Exception: # pylint: disable=broad-except
# Minio as fallback if reduct storage is not available
self._blob_storage = MinIOClient(
f"{('https://' if secure else 'http://')}{host}:{minio_port}",
user,
password,
False,
) # TBD!!! --> SSL handling!
self._mqtt_client = MQTTClient(
f"mqtt://{host}:{mqtt_port}",
client_id=f"drift_client_{int(time.time() * 1000)}",
)
Expand All @@ -87,7 +100,7 @@ def get_topics(self) -> List[str]:
>>> client.get_topics() # => ['topic-1', 'topic-2', ...]
"""

topics = self.__influx_client.query_measurements()
topics = self._influx_client.query_measurements()
return topics

@deprecation.deprecated(
Expand Down Expand Up @@ -116,7 +129,7 @@ def get_list(self, topics: List[str], timeframe: List[str]) -> Dict[str, List[st
"""
data = {}
for topic in topics:
influxdb_values = self.__influx_client.query_data(
influxdb_values = self._influx_client.query_data(
topic, timeframe[0], timeframe[1], fields="status"
)

Expand Down Expand Up @@ -157,16 +170,17 @@ def get_package_names(
start = _convert_type(start)
stop = _convert_type(stop)

data = []
influxdb_values = self.__influx_client.query_data(
package_list = []
influxdb_values = self._influx_client.query_data(
topic, start, stop, fields="status"
)

if influxdb_values:
for timestamp, _ in influxdb_values["status"]:
data.append(f"{topic}/{int(timestamp * 1000)}.dp")
package_list.append(f"{topic}/{int(timestamp * 1000)}.dp")

return data
# Check if package_list is available (works only for Reduct Storage)
return self._blob_storage.check_package_list(package_list)

def get_item(self, path: str) -> DriftDataPackage:
"""Returns requested single historic data from initialised Device
Expand All @@ -181,7 +195,7 @@ def get_item(self, path: str) -> DriftDataPackage:
>>> client = DriftClient("127.0.0.1", "PASSWORD")
>>> client.get_item("topic-1/1644750605291.dp")
"""
blob = self.__minio_client.fetch_data(path)
blob = self._blob_storage.fetch_data(path)
return DriftDataPackage(blob)

def subscribe_data(self, topic: str, handler: Callable[[DriftDataPackage], None]):
Expand All @@ -207,10 +221,10 @@ def package_handler(message):
raise DecodeError("Payload is no Drift Package") from exc
handler(output)

self.__mqtt_client.connect()
self.__mqtt_client.subscribe(topic, package_handler)
self._mqtt_client.connect()
self._mqtt_client.subscribe(topic, package_handler)

self.__mqtt_client.loop_forever()
self._mqtt_client.loop_forever()

def publish_data(self, topic: str, payload: bytes):
"""Publishes payload to selected topic on initialised Device
Expand All @@ -222,11 +236,11 @@ def publish_data(self, topic: str, payload: bytes):
>>> client = DriftClient("127.0.0.1", "PASSWORD")
>>> client.publish_data("topic-2", b"hello")
"""
if not self.__mqtt_client.is_connected():
self.__mqtt_client.connect()
self.__mqtt_client.loop_start()
if not self._mqtt_client.is_connected():
self._mqtt_client.connect()
self._mqtt_client.loop_start()

self.__mqtt_client.publish(topic, payload)
self._mqtt_client.publish(topic, payload)

def get_metrics(
self,
Expand Down Expand Up @@ -257,7 +271,7 @@ def get_metrics(
stop = _convert_type(stop)

aligned_data = {}
influxdb_values = self.__influx_client.query_data(
influxdb_values = self._influx_client.query_data(
topic, start, stop, fields=names
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/drift_client/minio_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
""" Simple MinIO client
"""

from typing import Optional
from typing import Optional, List

from urllib.parse import urlparse
from minio import Minio
Expand Down Expand Up @@ -42,6 +42,11 @@ def __init__(
)
self.__bucket = "data"

def check_package_list(self, package_names: List[str]) -> list:
"""Check if packages exist in Minio"""
# Not possible with Minio
return package_names

def fetch_data(self, path: str) -> Optional[bytes]:
"""Fetch object from Minio
Expand Down
76 changes: 76 additions & 0 deletions pkg/drift_client/reduct_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Reduct Storage client"""
from typing import Tuple, List, Optional, Dict
from asyncio import new_event_loop

from drift_client.error import DriftClientError
from reduct import Client, Bucket, ReductError, EntryInfo


class ReductStorageClient:
"""Wrapper around Reduct Storage client"""

def __init__(self, url: str, token: str):
self._client = Client(url, api_token=token)
self._bucket = "data"
self._loop = new_event_loop()
_ = self._run(self._client.info()) # check connection for fallback to Minio

def check_package_list(self, package_names: List[str]) -> list:
"""Check if packages exist in Reduct Storage"""
entry_map: Dict[str, List[int]] = {}
# bad design, we don't know if all packages belong to the same entry
for package_name in package_names:
entry, timestamp = self._parse_minio_path(package_name)
if entry not in entry_map:
entry_map[entry] = [timestamp]
else:
entry_map[entry].append(timestamp)

# retrieve all entries
try:
bucket: Bucket = self._run(self._client.get_bucket(self._bucket))
entries_in_storage: List[EntryInfo] = self._run(bucket.get_entry_list())
except ReductError as err:
raise DriftClientError("Failed to list entries") from err

# check if the packages are still in storage
for entry in entries_in_storage:
if entry.name in entry_map:
entry_map[entry.name] = sorted(
timestamp
for timestamp in entry_map[entry.name]
if entry.oldest_record <= timestamp * 1000 <= entry.latest_record
)

# remove not existing topics
for entry in list(entry_map.keys()):
if entry not in [e.name for e in entries_in_storage]:
del entry_map[entry]

# restore entry/ts.db format
return [
f"{entry}/{timestamp}.dp"
for entry, timestamps in entry_map.items()
for timestamp in timestamps
]

def fetch_data(self, path: str) -> Optional[bytes]:
"""Fetch data from Reduct Storage via timestamp"""
entry, timestamp = self._parse_minio_path(path)
try:
return self._run(self._read_by_timestamp(entry, timestamp))
except ReductError as err:
raise DriftClientError(f"Could not read item at {path}") from err

async def _read_by_timestamp(self, entry: str, timestamp: int) -> bytes:
bucket: Bucket = await self._client.get_bucket(self._bucket)
async with bucket.read(entry, timestamp * 1000) as record:
return await record.read_all()

@staticmethod
def _parse_minio_path(path: str) -> Tuple[str, int]:
entry, file = path.split("/")
return entry, int(file.replace(".dp", ""))

def _run(self, coro):
return self._loop.run_until_complete(coro)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_long_description(base_path: Path):
"paho-mqtt==1.6.1",
"numpy==1.23.1",
"deprecation==2.1.0",
"reduct-py~=1.0",
],
extras_require={
"test": ["pytest==7.1.2", "pytest-mock==3.8.2"],
Expand Down
Loading

0 comments on commit 7a9c5b6

Please sign in to comment.