Skip to content

Commit

Permalink
Continuous subscription at a specific frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 3b8bc86 commit 19d23d8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 63 deletions.
87 changes: 24 additions & 63 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import dataclasses
import datetime
import enum
import http
import logging
import re
from typing import Any
Expand Down Expand Up @@ -163,29 +164,20 @@ def from_message(cls, message: types_pb2.Metadata):
if message.HasField(field):
setattr(metadata, field, getattr(message, field))
if message.HasField('value_restriction'):
restriction_type = message.value_restriction.WhichOneof('type')
# Make sure that a type actually is set
if restriction_type:
value_restriction = getattr(
message.value_restriction, restriction_type)
metadata.value_restriction = ValueRestriction()
# All types except string support min/max
if restriction_type != 'string':
for field in ('min', 'max'):
if value_restriction.HasField(field):
setattr(metadata.value_restriction, field,
getattr(value_restriction, field))
if value_restriction.allowed_values:
metadata.value_restriction.allowed_values = list(
value_restriction.allowed_values)
value_restriction = getattr(
message.value_restriction, message.value_restriction.WhichOneof('type'))
metadata.value_restriction = ValueRestriction()
for field in ('min', 'max'):
if value_restriction.HasField(field):
setattr(metadata.value_restriction, field,
getattr(value_restriction, field))
if value_restriction.allowed_values:
metadata.value_restriction.allowed_values = list(
value_restriction.allowed_values)
return metadata

# pylint: disable=too-many-branches
def to_message(self, value_type: DataType = DataType.UNSPECIFIED) -> types_pb2.Metadata:
"""
to_message/from_message aligned to use None rather than empty list for
representing allowed values in value restrictions
"""
message = types_pb2.Metadata(
data_type=self.data_type.value, entry_type=self.entry_type.value)
for field in ('description', 'comment', 'deprecation', 'unit'):
Expand All @@ -209,7 +201,7 @@ def to_message(self, value_type: DataType = DataType.UNSPECIFIED) -> types_pb2.M
if self.value_restriction.max is not None:
message.value_restriction.signed.max = int(
self.value_restriction.max)
if self.value_restriction.allowed_values:
if self.value_restriction.allowed_values is not None:
message.value_restriction.signed.allowed_values.extend(
(int(value)
for value in self.value_restriction.allowed_values),
Expand All @@ -230,7 +222,7 @@ def to_message(self, value_type: DataType = DataType.UNSPECIFIED) -> types_pb2.M
if self.value_restriction.max is not None:
message.value_restriction.unsigned.max = int(
self.value_restriction.max)
if self.value_restriction.allowed_values:
if self.value_restriction.allowed_values is not None:
message.value_restriction.unsigned.allowed_values.extend(
(int(value)
for value in self.value_restriction.allowed_values),
Expand All @@ -247,7 +239,7 @@ def to_message(self, value_type: DataType = DataType.UNSPECIFIED) -> types_pb2.M
if self.value_restriction.max is not None:
message.value_restriction.floating_point.max = float(
self.value_restriction.max)
if self.value_restriction.allowed_values:
if self.value_restriction.allowed_values is not None:
message.value_restriction.floating_point.allowed_values.extend(
(float(value)
for value in self.value_restriction.allowed_values),
Expand All @@ -256,7 +248,7 @@ def to_message(self, value_type: DataType = DataType.UNSPECIFIED) -> types_pb2.M
DataType.STRING,
DataType.STRING_ARRAY,
):
if self.value_restriction.allowed_values:
if self.value_restriction.allowed_values is not None:
message.value_restriction.string.allowed_values.extend(
(str(value)
for value in self.value_restriction.allowed_values),
Expand Down Expand Up @@ -316,32 +308,11 @@ class Datapoint:

@classmethod
def from_message(cls, message: types_pb2.Datapoint):
"""
Return internal Datapoint representation or None on error
"""
if message.WhichOneof('value') is None:
logger.warning("No value provided in datapoint!")
return None

if message.HasField('timestamp'):
# gRPC timestamp supports date up to including year 9999
# If timestamp by any reason contains a larger number for seconds than supported
# you may get an overflow error
try:
timestamp = message.timestamp.ToDatetime(
tzinfo=datetime.timezone.utc,
)
except OverflowError:

logger.error("Timestamp %d out of accepted range, value ignored!",
message.timestamp.seconds)
return None
else:
timestamp = None

return cls(
value=getattr(message, message.WhichOneof('value')),
timestamp=timestamp,
timestamp=message.timestamp.ToDatetime(
tzinfo=datetime.timezone.utc,
) if message.HasField('timestamp') else None,
)

def cast_array_values(cast, array):
Expand Down Expand Up @@ -664,21 +635,9 @@ def _raise_if_invalid(self, response):
err, preserving_proto_field_name=True) for err in response.errors]
else:
errors = []

raise_error = False
if (error and error.get('code') != 200):
raise_error = True
else:
for sub_error in errors:
if 'error' in sub_error:
if sub_error['error'].get('code') != 200:
logger.debug("Sub-error %d but no top level error", sub_error['error'].get('code'))
raise_error = True
else:
logger.error("No error field for sub-error")
raise_error = True

if raise_error:
if (error and error['code'] is not http.HTTPStatus.OK) or any(
sub_error['error']['code'] is not http.HTTPStatus.OK for sub_error in errors
):
raise VSSClientError(error, errors)

def get_authorization_header(self, token: str):
Expand Down Expand Up @@ -959,7 +918,7 @@ def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None:
self._process_set_response(resp)

@check_connected
def subscribe(self, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Iterator[List[EntryUpdate]]:
def subscribe(self, entries: Iterable[SubscribeEntry], frequency: Optional[int], **rpc_kwargs) -> Iterator[List[EntryUpdate]]:
"""
Parameters:
rpc_kwargs
Expand All @@ -969,6 +928,8 @@ def subscribe(self, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Iterator
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata"))
req = self._prepare_subscribe_request(entries)
if frequency is not None:
req.frequency_hertz = frequency
resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
Expand Down
3 changes: 3 additions & 0 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ async def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None:
@check_connected_async_iter
async def subscribe(self,
entries: Iterable[SubscribeEntry],
frequency: Optional[int],
**rpc_kwargs,
) -> AsyncIterator[List[EntryUpdate]]:
"""
Expand All @@ -343,6 +344,8 @@ async def subscribe(self,
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata"))
req = self._prepare_subscribe_request(entries)
if frequency is not None:
req.frequency_hertz = frequency
resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs)
try:
async for resp in resp_stream:
Expand Down

0 comments on commit 19d23d8

Please sign in to comment.