diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 91610fb..a80d1e7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,6 @@ repos: name: black entry: black language: python - language_version: python3.8 args: - . - --check diff --git a/.pylintrc b/.pylintrc index 191448d..d33efd0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -2,3 +2,6 @@ max-line-length = 88 load-plugins=pylint_protobuf extension-pkg-whitelist=pydantic + +[FORMAT] +good-names=dt diff --git a/CHANGELOG.md b/CHANGELOG.md index d4035ee..34b5da5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## Changed: +### Added: +- DRIFT-478: `Client.get_metrics` to get metrics from InfluxDB, [PR-10](https://github.com/panda-official/DriftPythonClient/pull/10) - DRIFT-510: Make package paths by using time from InfluxDB, [PR-2](https://github.com/panda-official/DriftPythonClient/pull/2) +- DRIFT-516: `Client.get_topic_data` to get paths in Minio, [PR-5](https://github.com/panda-official/DriftPythonClient/pull/5) + +### Deprecated: + +- DRIFT-516: `Client.get_list`. Remove in 1.0.0, [PR-5](https://github.com/panda-official/DriftPythonClient/pull/5) ## [0.1.1] - 2022-07-12 diff --git a/README.md b/README.md index 4d0460c..c626d91 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ from drift_client import DriftClient drift_client = DriftClient("10.0.0.153", os.getenv("DRIFT_PASSWORD")) # Download list of history -packages = drift_client.get_topic_data( +packages = drift_client.get_package_names( "acc-5", datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2022-01-02 00:00:00", "%Y-%m-%d %H:%M:%S") diff --git a/examples/get_list.py b/examples/get_list.py deleted file mode 100644 index 60a9d98..0000000 --- a/examples/get_list.py +++ /dev/null @@ -1,34 +0,0 @@ -import os - -from drift_client import DriftClient - -import logging -import sys -import datetime - -logging.basicConfig(level=logging.INFO) - - -def main(): - # Init - drift_client = DriftClient("drift-test-rig.local", os.getenv("DRIFT_PASSWORD")) - # Download list of history - packages = drift_client.get_list( - ["acc-1"], - [ - (datetime.datetime.utcnow() - datetime.timedelta(minutes=2)).strftime( - "%Y-%m-%d %H:%M:%S" - ), - (datetime.datetime.utcnow() - datetime.timedelta(minutes=1)).strftime( - "%Y-%m-%d %H:%M:%S" - ), - ], - ) - - # Show length of list - for topic in packages: - print(topic, "->", packages[topic]) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/examples/get_metrics.py b/examples/get_metrics.py new file mode 100644 index 0000000..b6d8966 --- /dev/null +++ b/examples/get_metrics.py @@ -0,0 +1,28 @@ +import os + +from drift_client import DriftClient + +import logging +import sys +import datetime + +logging.basicConfig(level=logging.INFO) + + +def main(): + # Init + drift_client = DriftClient("drift-test-rig.local", os.getenv("DRIFT_PASSWORD")) + metrics = drift_client.get_metrics( + "energy-distr-1", + start=datetime.datetime.utcnow() - datetime.timedelta(minutes=15), + stop=datetime.datetime.utcnow(), + names=["d1", "d2"], + ) + + print("Loaded %d points", len(metrics)) + for point in metrics: + print(point) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/get_topic_data.py b/examples/get_package_names.py similarity index 91% rename from examples/get_topic_data.py rename to examples/get_package_names.py index e0f91d1..ad1f386 100644 --- a/examples/get_topic_data.py +++ b/examples/get_package_names.py @@ -13,7 +13,7 @@ def main(): # Init drift_client = DriftClient("drift-test-rig.local", os.getenv("DRIFT_PASSWORD")) # Download list of history - packages = drift_client.get_topic_data( + packages = drift_client.get_package_names( "acc-1", start=datetime.datetime.utcnow() - datetime.timedelta(minutes=1), stop=datetime.datetime.utcnow(), diff --git a/pkg/drift_client/drift_client.py b/pkg/drift_client/drift_client.py index 1870055..53ad8b5 100644 --- a/pkg/drift_client/drift_client.py +++ b/pkg/drift_client/drift_client.py @@ -6,7 +6,7 @@ import time import logging -from typing import Dict, List, Callable, Union +from typing import Dict, List, Callable, Union, Any, Optional from datetime import datetime import deprecation @@ -18,6 +18,17 @@ from drift_client.mqtt_client import MQTTClient logger = logging.getLogger("drift-client") +TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + + +def _convert_type(timestamp: Union[float, datetime, str]) -> str: + if isinstance(timestamp, str): + return datetime.fromisoformat(timestamp).strftime(TIME_FORMAT) + if isinstance(timestamp, float): + return datetime.fromtimestamp(timestamp).strftime(TIME_FORMAT) + if isinstance(timestamp, datetime): + return timestamp.strftime(TIME_FORMAT) + raise TypeError("Timestamp must be str, float or datetime") class DriftClient: @@ -106,19 +117,19 @@ def get_list(self, topics: List[str], timeframe: List[str]) -> Dict[str, List[st data = {} for topic in topics: influxdb_values = self.__influx_client.query_data( - topic, timeframe[0], timeframe[1], field="status" + topic, timeframe[0], timeframe[1], fields="status" ) if not influxdb_values: break data[topic] = [] - for timestamp, _ in influxdb_values: + for timestamp, _ in influxdb_values["status"]: data[topic].append(f"{topic}/{int(timestamp * 1000)}.dp") return data - def get_topic_data( + def get_package_names( self, topic: str, start: Union[float, datetime, str], @@ -139,15 +150,25 @@ def get_topic_data( Examples: >>> client = DriftClient("127.0.0.1", "PASSWORD") - >>> client.get_topic_data("topic-1", + >>> client.get_package_names("topic-1", >>> "2022-02-03 10:00:00", "2022-02-03 10:00:10") >>> # => ['topic-1/1644750600291.dp', >>> # 'topic-1/1644750601291.dp', ...] """ - return self._get_topic_data( - topic, self._convert_type(start), self._convert_type(stop) + start = _convert_type(start) + stop = _convert_type(stop) + + data = [] + influxdb_values = self.__influx_client.query_data( + topic, start, stop, fields="status" ) + if influxdb_values: + for timestamp, _ in influxdb_values["status"]: + data.append(f"{topic}/{int(timestamp * 1000)}.dp") + + return data + def get_item(self, path: str) -> DriftDataPackage: """Returns requested single historic data from initialised Device Args: @@ -192,15 +213,15 @@ def package_handler(message): self.__mqtt_client.loop_forever() - def publish_data(self, topic: str, payload: str): + def publish_data(self, topic: str, payload: bytes): """Publishes payload to selected topic on initialised Device - - topic: MQTT topic - payload: Stringified data, defaults to None + Args: + topic: MQTT topic + payload: Stringified data, defaults to None # Examples >>> client = DriftClient("127.0.0.1", "PASSWORD") - >>> client.publish_data("topic-2", "hello") + >>> client.publish_data("topic-2", b"hello") """ if not self.__mqtt_client.is_connected(): self.__mqtt_client.connect() @@ -208,36 +229,44 @@ def publish_data(self, topic: str, payload: str): self.__mqtt_client.publish(topic, payload) - def _convert_type(self, timestamp: Union[float, datetime, str]) -> str: - if isinstance(timestamp, str): - return datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M:%S") - if isinstance(timestamp, float): - return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") - if isinstance(timestamp, datetime): - return timestamp.strftime("%Y-%m-%d %H:%M:%S") - raise TypeError("Timestamp must be str, float or datetime") - - def _get_topic_data(self, topic: str, start: str, stop: str) -> List[str]: - """Returns list of history data from initialised Device - + def get_metrics( + self, + topic: str, + start: Union[float, datetime, str], + stop: Union[float, datetime, str], + names: Optional[List[str]] = None, + ) -> List[Dict[str, Any]]: + """Reads history metrics from timeseries database + >>> client = DriftClient("127.0.0.1", "PASSWORD") + >>> client.get_metrics("topic", "2022-02-03 10:00:00", + >>> "2022-02-03 10:00:10", names=["status", "field"]) + >>> #=> [{"status": 0, "field": 0.1231}, ....] Args: - topics: Topic name, e.g. `"sensor-1"` + topic: MQTT topic start: Begin of request timeframe, - Format: `2022-02-07 10:00:00` + Format: ISO string, datetime or float timestamp stop: End of request timeframe, - Format: `2022-02-07 10:00:00` - - Returns: - List with item names available - :rtype: List[str] + Format: ISO string, datetime or float timestamp """ - data = [] + + start = _convert_type(start) + stop = _convert_type(stop) + + aligned_data = {} influxdb_values = self.__influx_client.query_data( - topic, start, stop, field="status" + topic, start, stop, fields=names ) - if influxdb_values: - for timestamp, _ in influxdb_values: - data.append(f"{topic}/{int(timestamp * 1000)}.dp") + for field, values in influxdb_values.items(): + for dt, value in values: + if dt not in aligned_data: + aligned_data[dt] = {} + + aligned_data[dt][field] = value + + data = [] + for dt, fields in aligned_data.items(): + fields["time"] = dt + data.append(fields) return data diff --git a/pkg/drift_client/influxdb_client.py b/pkg/drift_client/influxdb_client.py index 72600b7..eb0e29d 100644 --- a/pkg/drift_client/influxdb_client.py +++ b/pkg/drift_client/influxdb_client.py @@ -1,8 +1,7 @@ """ Simple InfluxDB client """ -from typing import List, Tuple, Any -from datetime import datetime +from typing import List, Tuple, Any, Union, Dict from urllib.parse import urlparse from influxdb_client import InfluxDBClient as Client @@ -37,8 +36,6 @@ def __init__( verify_ssl=secure, ) self.__query_api = self.__client.query_api() - self.__input_time_fmt = "%Y-%m-%d %H:%M:%S" - self.__output_time_fmt = "%Y-%m-%dT%H:%M:%SZ" self.__bucket = "data" def query_measurements(self) -> List[str]: @@ -67,33 +64,35 @@ def query_data( measurement: str, start: str, stop: str, - field: str = "src", - ) -> List[Tuple[float, Any]]: + fields: Union[str, List[str], None] = None, + ) -> Dict[str, List[Tuple[float, Any, str]]]: """InfluxDB queries for values""" # Change time format for request - start = datetime.strptime(start, self.__input_time_fmt).strftime( - self.__output_time_fmt - ) - stop = datetime.strptime(stop, self.__input_time_fmt).strftime( - self.__output_time_fmt + if isinstance(fields, str): + fields = [fields] + + filters = "" + if fields is not None: + filters = ( + "and (" + + " or".join([f' r._field == "{field}"' for field in fields]) + + ")" + ) + + query = ( + f'from(bucket:"{self.__bucket}") ' + f"|> range(start:{start}, stop: {stop}) " + f'|> filter(fn: (r) => r._measurement == "{measurement}" {filters})' ) - query = f'\ - from(bucket:"{self.__bucket}")\ - |> range(start:{start}, stop: {stop})\ - |> filter(fn: (r) => r._measurement == "{measurement}"\ - and r._field =~ /{field}/)\ - ' response = self.__query_api.query(query) - data = [] + data = {} for table in response: for record in table.records: - data.append( - ( - record.get_time().timestamp(), - record.get_value(), - ) - ) + field = record.get_field() + if field not in data: + data[field] = [] + data[field].append((record.get_time().timestamp(), record.get_value())) return data diff --git a/setup.py b/setup.py index ee1af57..cd97eb9 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ def get_long_description(base_path: Path): "influxdb-client==1.30.0", "minio==7.1.10", "drift-protocol~=0.1.0", - "wavelet-buffer~=0.1.0", + "wavelet-buffer~=0.3.0", "paho-mqtt==1.6.1", "numpy==1.23.1", "deprecation==2.1.0", diff --git a/tests/drift_client_test.py b/tests/drift_client_test.py index bba9f04..af56ccb 100644 --- a/tests/drift_client_test.py +++ b/tests/drift_client_test.py @@ -36,13 +36,13 @@ def test__timestamp_from_influxdb(influxdb_client): """should get timestamp and values for records from influxdb and make paths in minio""" client = DriftClient("host_name", "password") - influxdb_client.query_data.return_value = [(10000.0, 0), (10010.0, 512)] + influxdb_client.query_data.return_value = {"status": [(10000.0, 0), (10010.0, 512)]} data = client.get_list(["topic"], ["2022-01-01 00:00:00", "2022-01-01 00:00:00"]) assert data == {"topic": ["topic/10000000.dp", "topic/10010000.dp"]} influxdb_client.query_data.assert_called_with( - "topic", "2022-01-01 00:00:00", "2022-01-01 00:00:00", field="status" + "topic", "2022-01-01 00:00:00", "2022-01-01 00:00:00", fields="status" ) @@ -63,11 +63,42 @@ def test__get_topic_data(influxdb_client, start_ts, stop_ts): """should get timestamp and values for records using start and stop timestamps from influxdb and make paths in minio""" client = DriftClient("host_name", "password") - influxdb_client.query_data.return_value = [(10000.0, 0), (10010.0, 512)] + influxdb_client.query_data.return_value = {"status": [(10000.0, 0), (10010.0, 512)]} - data = client.get_topic_data("topic", start_ts, stop_ts) + data = client.get_package_names("topic", start_ts, stop_ts) assert data == ["topic/10000000.dp", "topic/10010000.dp"] influxdb_client.query_data.assert_called_with( - "topic", "2022-01-01 00:00:00", "2022-01-01 00:00:00", field="status" + "topic", "2022-01-01T00:00:00Z", "2022-01-01T00:00:00Z", fields="status" + ) + + +@pytest.mark.usefixtures("minio_klass") +@pytest.mark.parametrize( + "start_ts, stop_ts", + [ + (start, stop), + (start.isoformat(), stop.isoformat()), + (start.timestamp(), stop.timestamp()), + ], +) +def test__get_metrics(influxdb_client, start_ts, stop_ts): + """Should get metrics from InfluxDB and return it like a list of dictioniers""" + client = DriftClient("host_name", "password") + influxdb_client.query_data.return_value = { + "filed_1": [(10000.0, 1), (10010.0, 2)], + "filed_2": [(10000.0, 3), (10010.0, 4)], + } + + data = client.get_metrics("topic", start_ts, stop_ts, names=["field_1", "field_2"]) + assert data == [ + {"filed_1": 1, "filed_2": 3, "time": 10000.0}, + {"filed_1": 2, "filed_2": 4, "time": 10010.0}, + ] + + influxdb_client.query_data.assert_called_with( + "topic", + "2022-01-01T00:00:00Z", + "2022-01-01T00:00:00Z", + fields=["field_1", "field_2"], ) diff --git a/tests/influxdb_client_test.py b/tests/influxdb_client_test.py new file mode 100644 index 0000000..5526e90 --- /dev/null +++ b/tests/influxdb_client_test.py @@ -0,0 +1,101 @@ +"""InfluxDB Client""" +from datetime import datetime + +import pytest +from influxdb_client.client.flux_table import FluxRecord + +from drift_client.influxdb_client import InfluxDBClient + + +@pytest.fixture(name="query_api") +def _make_query_api(mocker): + client_klass = mocker.patch("drift_client.influxdb_client.Client") + client = mocker.Mock() + client.query_api.return_value = mocker.Mock() + client_klass.return_value = client + + return client.query_api.return_value + + +@pytest.fixture(name="dt") +def _make_dt() -> datetime: + return datetime.now() + + +@pytest.fixture(name="response") +def _make_response(mocker, dt): + record1 = FluxRecord("", values={"_time": dt, "_value": 1, "_field": "field"}) + record2 = FluxRecord("", values={"_time": dt, "_value": "str", "_field": "field"}) + + table = mocker.Mock + table.records = [record1, record2] + return [table] + + +@pytest.fixture(name="response_multi") +def _make_response_multy(mocker, dt): + record1 = FluxRecord("", values={"_time": dt, "_value": 1, "_field": "field_1"}) + record2 = FluxRecord("", values={"_time": dt, "_value": "str", "_field": "field_2"}) + + table = mocker.Mock + table.records = [record1, record2] + return [table] + + +def test__query_data_for_one_field(response, dt, query_api): + """Should query data and filter it with a field""" + query_api.query.return_value = response + influxdb_client = InfluxDBClient( + "http://localhost:8086", org="panda", secure=False, token="SECRET" + ) + data = influxdb_client.query_data( + "topic", "2022-08-15", "2022-08-15", fields="field" + ) + + assert data["field"][0] == (dt.timestamp(), 1) + assert data["field"][1] == (dt.timestamp(), "str") + + query_api.query.assert_called_with( + 'from(bucket:"data") |> range(start:2022-08-15, stop: 2022-08-15) |> ' + 'filter(fn: (r) => r._measurement == "topic" and ( r._field == "field"))' + ) + + +def test__query_data_for_some_fields(response_multi, dt, query_api): + """Should query data and filter it with a few fields""" + query_api.query.return_value = response_multi + + influxdb_client = InfluxDBClient( + "http://localhost:8086", org="panda", secure=False, token="SECRET" + ) + data = influxdb_client.query_data( + "topic", "2022-08-15", "2022-08-15", fields=["field_1", "field_2"] + ) + + assert data["field_1"][0] == (dt.timestamp(), 1) + assert data["field_2"][0] == (dt.timestamp(), "str") + + query_api.query.assert_called_with( + 'from(bucket:"data") |> range(start:2022-08-15, stop: 2022-08-15) |> ' + 'filter(fn: (r) => r._measurement == "topic" and ( r._field == "field_1" or ' + 'r._field == "field_2"))', + ) + + +def test__query_data_for_all_fields(response_multi, dt, query_api): + """Should query data for all fields""" + + query_api.query.return_value = response_multi + + influxdb_client = InfluxDBClient( + "http://localhost:8086", org="panda", secure=False, token="SECRET" + ) + data = influxdb_client.query_data("topic", "2022-08-15", "2022-08-15") + + assert data["field_1"][0] == (dt.timestamp(), 1) + assert data["field_2"][0] == (dt.timestamp(), "str") + + query_api.query.assert_called_with( + 'from(bucket:"data") |> range(start:2022-08-15, stop: 2022-08-15) |> ' + 'filter(fn: (r) => r._measurement == "topic" )', + )