Skip to content

Commit

Permalink
DRIFT-478: Access to metrics from InfluxDB (#10)
Browse files Browse the repository at this point in the history
* rename get_topic_data -> get_package_names

* refactoring

* refactor and test InfluxDB.query_data

* implement metrics

* add test for Client.get_metrics

* fix pylint errors

* update CHANGELOG

* add example to docstring

* fix pylint errors

* update wavelet buffer

* fix typo
  • Loading branch information
atimin authored Aug 16, 2022
1 parent 5299566 commit 66300a2
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 104 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ repos:
name: black
entry: black
language: python
language_version: python3.8
args:
- .
- --check
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
max-line-length = 88
load-plugins=pylint_protobuf
extension-pkg-whitelist=pydantic

[FORMAT]
good-names=dt
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]


## Changed:
### Added:

- DRIFT-478: `Client.get_metrics` to get metrics from InfluxDB, [PR-10](https://github.com/panda-official/DriftPythonClient/pull/10)
- DRIFT-510: Make package paths by using time from InfluxDB, [PR-2](https://github.com/panda-official/DriftPythonClient/pull/2)
- DRIFT-516: `Client.get_topic_data` to get paths in Minio, [PR-5](https://github.com/panda-official/DriftPythonClient/pull/5)

### Deprecated:

- DRIFT-516: `Client.get_list`. Remove in 1.0.0, [PR-5](https://github.com/panda-official/DriftPythonClient/pull/5)

## [0.1.1] - 2022-07-12

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ from drift_client import DriftClient
drift_client = DriftClient("10.0.0.153", os.getenv("DRIFT_PASSWORD"))
# Download list of history

packages = drift_client.get_topic_data(
packages = drift_client.get_package_names(
"acc-5",
datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2022-01-02 00:00:00", "%Y-%m-%d %H:%M:%S")
Expand Down
34 changes: 0 additions & 34 deletions examples/get_list.py

This file was deleted.

28 changes: 28 additions & 0 deletions examples/get_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os

from drift_client import DriftClient

import logging
import sys
import datetime

logging.basicConfig(level=logging.INFO)


def main():
# Init
drift_client = DriftClient("drift-test-rig.local", os.getenv("DRIFT_PASSWORD"))
metrics = drift_client.get_metrics(
"energy-distr-1",
start=datetime.datetime.utcnow() - datetime.timedelta(minutes=15),
stop=datetime.datetime.utcnow(),
names=["d1", "d2"],
)

print("Loaded %d points", len(metrics))
for point in metrics:
print(point)


if __name__ == "__main__":
sys.exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def main():
# Init
drift_client = DriftClient("drift-test-rig.local", os.getenv("DRIFT_PASSWORD"))
# Download list of history
packages = drift_client.get_topic_data(
packages = drift_client.get_package_names(
"acc-1",
start=datetime.datetime.utcnow() - datetime.timedelta(minutes=1),
stop=datetime.datetime.utcnow(),
Expand Down
101 changes: 65 additions & 36 deletions pkg/drift_client/drift_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import time
import logging
from typing import Dict, List, Callable, Union
from typing import Dict, List, Callable, Union, Any, Optional
from datetime import datetime
import deprecation

Expand All @@ -18,6 +18,17 @@
from drift_client.mqtt_client import MQTTClient

logger = logging.getLogger("drift-client")
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"


def _convert_type(timestamp: Union[float, datetime, str]) -> str:
if isinstance(timestamp, str):
return datetime.fromisoformat(timestamp).strftime(TIME_FORMAT)
if isinstance(timestamp, float):
return datetime.fromtimestamp(timestamp).strftime(TIME_FORMAT)
if isinstance(timestamp, datetime):
return timestamp.strftime(TIME_FORMAT)
raise TypeError("Timestamp must be str, float or datetime")


class DriftClient:
Expand Down Expand Up @@ -106,19 +117,19 @@ def get_list(self, topics: List[str], timeframe: List[str]) -> Dict[str, List[st
data = {}
for topic in topics:
influxdb_values = self.__influx_client.query_data(
topic, timeframe[0], timeframe[1], field="status"
topic, timeframe[0], timeframe[1], fields="status"
)

if not influxdb_values:
break

data[topic] = []
for timestamp, _ in influxdb_values:
for timestamp, _ in influxdb_values["status"]:
data[topic].append(f"{topic}/{int(timestamp * 1000)}.dp")

return data

def get_topic_data(
def get_package_names(
self,
topic: str,
start: Union[float, datetime, str],
Expand All @@ -139,15 +150,25 @@ def get_topic_data(
Examples:
>>> client = DriftClient("127.0.0.1", "PASSWORD")
>>> client.get_topic_data("topic-1",
>>> client.get_package_names("topic-1",
>>> "2022-02-03 10:00:00", "2022-02-03 10:00:10")
>>> # => ['topic-1/1644750600291.dp',
>>> # 'topic-1/1644750601291.dp', ...]
"""
return self._get_topic_data(
topic, self._convert_type(start), self._convert_type(stop)
start = _convert_type(start)
stop = _convert_type(stop)

data = []
influxdb_values = self.__influx_client.query_data(
topic, start, stop, fields="status"
)

if influxdb_values:
for timestamp, _ in influxdb_values["status"]:
data.append(f"{topic}/{int(timestamp * 1000)}.dp")

return data

def get_item(self, path: str) -> DriftDataPackage:
"""Returns requested single historic data from initialised Device
Args:
Expand Down Expand Up @@ -192,52 +213,60 @@ def package_handler(message):

self.__mqtt_client.loop_forever()

def publish_data(self, topic: str, payload: str):
def publish_data(self, topic: str, payload: bytes):
"""Publishes payload to selected topic on initialised Device
topic: MQTT topic
payload: Stringified data, defaults to None
Args:
topic: MQTT topic
payload: Stringified data, defaults to None
# Examples
>>> client = DriftClient("127.0.0.1", "PASSWORD")
>>> client.publish_data("topic-2", "hello")
>>> client.publish_data("topic-2", b"hello")
"""
if not self.__mqtt_client.is_connected():
self.__mqtt_client.connect()
self.__mqtt_client.loop_start()

self.__mqtt_client.publish(topic, payload)

def _convert_type(self, timestamp: Union[float, datetime, str]) -> str:
if isinstance(timestamp, str):
return datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M:%S")
if isinstance(timestamp, float):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
if isinstance(timestamp, datetime):
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
raise TypeError("Timestamp must be str, float or datetime")

def _get_topic_data(self, topic: str, start: str, stop: str) -> List[str]:
"""Returns list of history data from initialised Device
def get_metrics(
self,
topic: str,
start: Union[float, datetime, str],
stop: Union[float, datetime, str],
names: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Reads history metrics from timeseries database
>>> client = DriftClient("127.0.0.1", "PASSWORD")
>>> client.get_metrics("topic", "2022-02-03 10:00:00",
>>> "2022-02-03 10:00:10", names=["status", "field"])
>>> #=> [{"status": 0, "field": 0.1231}, ....]
Args:
topics: Topic name, e.g. `"sensor-1"`
topic: MQTT topic
start: Begin of request timeframe,
Format: `2022-02-07 10:00:00`
Format: ISO string, datetime or float timestamp
stop: End of request timeframe,
Format: `2022-02-07 10:00:00`
Returns:
List with item names available
:rtype: List[str]
Format: ISO string, datetime or float timestamp
"""
data = []

start = _convert_type(start)
stop = _convert_type(stop)

aligned_data = {}
influxdb_values = self.__influx_client.query_data(
topic, start, stop, field="status"
topic, start, stop, fields=names
)

if influxdb_values:
for timestamp, _ in influxdb_values:
data.append(f"{topic}/{int(timestamp * 1000)}.dp")
for field, values in influxdb_values.items():
for dt, value in values:
if dt not in aligned_data:
aligned_data[dt] = {}

aligned_data[dt][field] = value

data = []
for dt, fields in aligned_data.items():
fields["time"] = dt
data.append(fields)

return data
47 changes: 23 additions & 24 deletions pkg/drift_client/influxdb_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
""" Simple InfluxDB client
"""

from typing import List, Tuple, Any
from datetime import datetime
from typing import List, Tuple, Any, Union, Dict
from urllib.parse import urlparse

from influxdb_client import InfluxDBClient as Client
Expand Down Expand Up @@ -37,8 +36,6 @@ def __init__(
verify_ssl=secure,
)
self.__query_api = self.__client.query_api()
self.__input_time_fmt = "%Y-%m-%d %H:%M:%S"
self.__output_time_fmt = "%Y-%m-%dT%H:%M:%SZ"
self.__bucket = "data"

def query_measurements(self) -> List[str]:
Expand Down Expand Up @@ -67,33 +64,35 @@ def query_data(
measurement: str,
start: str,
stop: str,
field: str = "src",
) -> List[Tuple[float, Any]]:
fields: Union[str, List[str], None] = None,
) -> Dict[str, List[Tuple[float, Any, str]]]:
"""InfluxDB queries for values"""
# Change time format for request
start = datetime.strptime(start, self.__input_time_fmt).strftime(
self.__output_time_fmt
)
stop = datetime.strptime(stop, self.__input_time_fmt).strftime(
self.__output_time_fmt
if isinstance(fields, str):
fields = [fields]

filters = ""
if fields is not None:
filters = (
"and ("
+ " or".join([f' r._field == "{field}"' for field in fields])
+ ")"
)

query = (
f'from(bucket:"{self.__bucket}") '
f"|> range(start:{start}, stop: {stop}) "
f'|> filter(fn: (r) => r._measurement == "{measurement}" {filters})'
)

query = f'\
from(bucket:"{self.__bucket}")\
|> range(start:{start}, stop: {stop})\
|> filter(fn: (r) => r._measurement == "{measurement}"\
and r._field =~ /{field}/)\
'
response = self.__query_api.query(query)

data = []
data = {}
for table in response:
for record in table.records:
data.append(
(
record.get_time().timestamp(),
record.get_value(),
)
)
field = record.get_field()
if field not in data:
data[field] = []
data[field].append((record.get_time().timestamp(), record.get_value()))

return data
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_long_description(base_path: Path):
"influxdb-client==1.30.0",
"minio==7.1.10",
"drift-protocol~=0.1.0",
"wavelet-buffer~=0.1.0",
"wavelet-buffer~=0.3.0",
"paho-mqtt==1.6.1",
"numpy==1.23.1",
"deprecation==2.1.0",
Expand Down
Loading

0 comments on commit 66300a2

Please sign in to comment.