Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add PublishValue and Subscribe from kuksa.val.v2 #34

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[submodule "submodules/kuksa-databroker"]

[submodule "kuksa-databroker"]
path = submodules/kuksa-databroker
url = https://github.com/eclipse-kuksa/kuksa-databroker
url = https://github.com/SoftwareDefinedVehicle/kuksa-databroker.git
branch = rel-0.5.0
lukasmittag marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions docs/building.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ git submodule update --init
cd kuksa-client
```

Hint: If you want to use another branch than master exchange the first command with

```console
git submodule update --recursive --remote
```

First we suggest you create a dedicated [python virtual environment](https://docs.python.org/3/library/venv.html) for kuksa-client:

```console
Expand Down
12 changes: 0 additions & 12 deletions kuksa-client/kuksa/__init__.py
lukasmittag marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

12 changes: 0 additions & 12 deletions kuksa-client/kuksa/val/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/README.md

This file was deleted.

12 changes: 0 additions & 12 deletions kuksa-client/kuksa/val/v1/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/types.proto

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/val.proto

This file was deleted.

13 changes: 7 additions & 6 deletions kuksa-client/kuksa_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ def path_completer(self, text, line, begidx, endidx):

def subscribeCallback(self, logPath, resp):
if logPath is None:
self.async_alert(
highlight(
json.dumps(json.loads(resp), indent=2),
lexers.JsonLexer(),
formatters.TerminalFormatter(),
with self.terminal_lock:
self.async_alert(
highlight(
json.dumps(json.loads(resp), indent=2),
lexers.JsonLexer(),
formatters.TerminalFormatter(),
)
)
)
else:
with logPath.open("a", encoding="utf-8") as logFile:
logFile.write(resp + "\n")
Expand Down
62 changes: 41 additions & 21 deletions kuksa-client/kuksa_client/cli_backend/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,18 @@ def __init__(self, config):
self.run = False

self.AttrDict = {
"value": (kuksa_client.grpc.Field.VALUE, kuksa_client.grpc.View.CURRENT_VALUE),
"targetValue": (kuksa_client.grpc.Field.ACTUATOR_TARGET, kuksa_client.grpc.View.TARGET_VALUE),
"metadata": (kuksa_client.grpc.Field.METADATA, kuksa_client.grpc.View.METADATA),
"value": (
kuksa_client.grpc.Field.VALUE,
kuksa_client.grpc.View.CURRENT_VALUE,
),
"targetValue": (
kuksa_client.grpc.Field.ACTUATOR_TARGET,
kuksa_client.grpc.View.TARGET_VALUE,
),
"metadata": (
kuksa_client.grpc.Field.METADATA,
kuksa_client.grpc.View.METADATA,
),
}

def connection_established(self) -> bool:
Expand Down Expand Up @@ -112,8 +121,10 @@ def getValue(self, path: str, attribute="value", timeout=5):
def getValues(self, paths: Iterable[str], attribute="value", timeout=5):
if attribute in self.AttrDict:
field, view = self.AttrDict[attribute]
entries = [kuksa_client.grpc.EntryRequest(
path=path, view=view, fields=(field,)) for path in paths]
entries = [
kuksa_client.grpc.EntryRequest(path=path, view=view, fields=(field,))
for path in paths
]
requestArgs = {'entries': entries}
return self._sendReceiveMsg(("get", requestArgs), timeout)

Expand All @@ -127,28 +138,33 @@ def setValues(self, updates: Dict[str, Any], attribute="value", timeout=5):
if attribute in self.AttrDict:
field, _ = self.AttrDict[attribute]
entry_updates = []
v1 = True
for path, value in updates.items():

if field is kuksa_client.grpc.Field.VALUE:
entry = kuksa_client.grpc.DataEntry(
path=path, value=kuksa_client.grpc.Datapoint(value=value))
path=path,
value=kuksa_client.grpc.Datapoint(value=value),
)
v1 = False
elif field is kuksa_client.grpc.Field.ACTUATOR_TARGET:
entry = kuksa_client.grpc.DataEntry(
path=path, actuator_target=kuksa_client.grpc.Datapoint(
value=value),
path=path,
actuator_target=kuksa_client.grpc.Datapoint(value=value),
)
elif field is kuksa_client.grpc.Field.METADATA:
try:
metadata_dict = json.loads(value)
except json.JSONDecodeError:
return json.dumps({"error": "Metadata value needs to be a valid JSON object"})
entry = kuksa_client.grpc.DataEntry(
path=path, metadata=kuksa_client.grpc.Metadata.from_dict(
metadata_dict),
path=path,
metadata=kuksa_client.grpc.Metadata.from_dict(metadata_dict),
)
entry_updates.append(kuksa_client.grpc.EntryUpdate(
entry=entry, fields=(field,)))
requestArgs = {'updates': entry_updates}
entry_updates.append(
kuksa_client.grpc.EntryUpdate(entry=entry, fields=(field,))
)
requestArgs = {"updates": entry_updates, "v1": v1}
return self._sendReceiveMsg(("set", requestArgs), timeout)
return json.dumps({"error": "Invalid Attribute"})

Expand All @@ -175,11 +191,14 @@ def subscribe(self, path: str, callback, attribute="value", timeout=5):
def subscribeMultiple(self, paths: Iterable[str], callback, attribute="value", timeout=5):
if attribute in self.AttrDict:
field, view = self.AttrDict[attribute]
entries = [kuksa_client.grpc.SubscribeEntry(
path=path, view=view, fields=(field,)) for path in paths]
entries = [
kuksa_client.grpc.SubscribeEntry(path=path, view=view, fields=(field,))
for path in paths
]
requestArgs = {
'entries': entries,
'callback': callback_wrapper(callback),
"entries": entries,
"v1": False,
"callback": callback_wrapper(callback),
}
return self._sendReceiveMsg(("subscribe", requestArgs), timeout)

Expand Down Expand Up @@ -222,8 +241,7 @@ def _sendReceiveMsg(self, req, timeout):
# Async function to handle the gRPC calls
async def _grpcHandler(self, vss_client: kuksa_client.grpc.aio.VSSClient):
self.run = True
subscriber_manager = kuksa_client.grpc.aio.SubscriberManager(
vss_client)
subscriber_manager = kuksa_client.grpc.aio.SubscriberManager(vss_client)
self.grpc_connection_established = True
while self.run:
try:
Expand Down Expand Up @@ -273,7 +291,9 @@ def updateVSSTree(self, jsonStr, timeout=5):
async def mainLoop(self):
if self.insecure:

async with kuksa_client.grpc.aio.VSSClient(self.serverIP, self.serverPort, token=self.token) as vss_client:
async with kuksa_client.grpc.aio.VSSClient(
self.serverIP, self.serverPort, token=self.token
) as vss_client:
logger.info("gRPC channel connected.")
await self._grpcHandler(vss_client)
else:
Expand All @@ -282,7 +302,7 @@ async def mainLoop(self):
self.serverPort,
root_certificates=self.cacertificate,
tls_server_name=self.tls_server_name,
token=self.token
token=self.token,
) as vss_client:
logger.info("Secure gRPC channel connected.")
await self._grpcHandler(vss_client)
Loading
Loading