Skip to content

Commit

Permalink
Continuous subscription at a specific frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 3b8bc86 commit d6c4c0c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
7 changes: 4 additions & 3 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def from_grpc_error(cls, error: RpcError):
def to_dict(self) -> Dict[str, Any]:
return {'error': self.error, 'errors': self.errors}


@dataclasses.dataclass
class ValueRestriction:
min: Optional[Any] = None
Expand Down Expand Up @@ -959,7 +958,7 @@ def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None:
self._process_set_response(resp)

@check_connected
def subscribe(self, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Iterator[List[EntryUpdate]]:
def subscribe(self, entries: Iterable[SubscribeEntry], frequency: Optional[int], **rpc_kwargs) -> Iterator[List[EntryUpdate]]:
"""
Parameters:
rpc_kwargs
Expand All @@ -969,6 +968,8 @@ def subscribe(self, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Iterator
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata"))
req = self._prepare_subscribe_request(entries)
if frequency is not None:
req.frequency_hertz = frequency
resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
Expand Down Expand Up @@ -1034,4 +1035,4 @@ def get_value_types(self, paths: Collection[str], **rpc_kwargs) -> Dict[str, Dat
) for path in paths)
entries = self.get(entries=entry_requests, **rpc_kwargs)
return {entry.path: DataType(entry.metadata.data_type) for entry in entries}
return {}
return {}
3 changes: 3 additions & 0 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ async def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None:
@check_connected_async_iter
async def subscribe(self,
entries: Iterable[SubscribeEntry],
frequency: Optional[int],
**rpc_kwargs,
) -> AsyncIterator[List[EntryUpdate]]:
"""
Expand All @@ -343,6 +344,8 @@ async def subscribe(self,
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata"))
req = self._prepare_subscribe_request(entries)
if frequency is not None:
req.frequency_hertz = frequency
resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs)
try:
async for resp in resp_stream:
Expand Down

0 comments on commit d6c4c0c

Please sign in to comment.