Skip to content

Commit

Permalink
Upgrade to zenoh-python 1.0.0-alpha.6 (#16)
Browse files Browse the repository at this point in the history
* Upgrade to zenoh-python 1.0.0-alpha.6

* Incorporate review comment
  • Loading branch information
neelam-kushwah authored Nov 12, 2024
1 parent 516b8af commit c2bda41
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 73 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
up-python==0.2.0-dev
eclipse-zenoh==0.11.0
eclipse-zenoh==1.0.0-alpha.6

6 changes: 3 additions & 3 deletions up_transport_zenoh/examples/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from up_transport_zenoh.examples.common_uuri import get_zenoh_default_config
from up_transport_zenoh.uptransportzenoh import UPTransportZenoh

source = UUri(authority_name="vehicle1", ue_id=18)
source = UUri(authority_name="publisher", ue_id=1, ue_version_major=1)
publisher = UPTransportZenoh.new(get_zenoh_default_config(), source)


async def publish_to_zenoh():
# create uuri
uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000)
builder = UMessageBuilder.publish(uuri)
source.resource_id = 0x8001
builder = UMessageBuilder.publish(source)
payload = UPayload.pack(UUri())
umessage = builder.build_from_upayload(payload)
status = await publisher.send(umessage)
Expand Down
4 changes: 2 additions & 2 deletions up_transport_zenoh/examples/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ async def on_receive(self, msg: UMessage) -> None:
return UStatus(message="Received event")


source = UUri(authority_name="vehicle1", ue_id=18)
source = UUri(authority_name="subscriber", ue_id=9)
transport = UPTransportZenoh.new(get_zenoh_default_config(), source)
# create topic uuri
uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000)
uuri = UUri(authority_name="publisher", ue_id=1, ue_version_major=1, resource_id=0x8001)


async def subscribe_to_zenoh_if_subscription_service_is_not_running():
Expand Down
35 changes: 15 additions & 20 deletions up_transport_zenoh/uptransportzenoh.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from uprotocol.v1.umessage_pb2 import UMessage
from uprotocol.v1.uri_pb2 import UUri
from uprotocol.v1.ustatus_pb2 import UStatus
from zenoh import Config, Query, Queryable, Sample, Session, Subscriber, Value
from zenoh.keyexpr import KeyExpr
from zenoh import Config, Query, Queryable, Sample, Session, Subscriber
from zenoh.zenoh import KeyExpr

from up_transport_zenoh.zenohutils import MessageFlag, ZenohUtils

Expand Down Expand Up @@ -93,7 +93,7 @@ def send_publish_notification(self, zenoh_key: str, payload: bytes, attributes:
logging.debug(f"Priority: {priority}")
logging.debug(f"Attachment: {attachment}")

self.session.put(keyexpr=zenoh_key, value=payload, attachment=attachment, priority=priority)
self.session.put(key_expr=zenoh_key, payload=payload, attachment=attachment, priority=priority)
msg = "Successfully sent data to Zenoh"
logging.debug(f"SUCCESS:{msg}")
return UStatus(code=UCode.OK, message=msg)
Expand All @@ -111,8 +111,8 @@ def send_request(self, zenoh_key: str, payload: bytes, attributes: UAttributes)
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)
resp_callback = None
for saved_zenoh_key, listener in self.rpc_callback_map.items():
keyexpr_zenohkey = KeyExpr.new(zenoh_key)
keyexpr_savedkey = KeyExpr.new(saved_zenoh_key)
keyexpr_zenohkey = KeyExpr(zenoh_key)
keyexpr_savedkey = KeyExpr(saved_zenoh_key)

if keyexpr_zenohkey.intersects(keyexpr_savedkey):
resp_callback = self.rpc_callback_map.get(saved_zenoh_key)
Expand All @@ -138,7 +138,7 @@ def handle_response(reply: Query.reply) -> None:
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
# Create UMessage
msg = UMessage(attributes=u_attribute, payload=sample.payload)
msg = UMessage(attributes=u_attribute, payload=bytes(sample.payload))
asyncio.run(resp_callback.on_receive(msg))
except Exception:
msg = f"Error while parsing Zenoh reply: {reply.error}"
Expand All @@ -148,21 +148,19 @@ def handle_response(reply: Query.reply) -> None:
# Send query
ttl = attributes.ttl / 1000 if attributes.ttl is not None else 1000

value = Value(payload)
# Send the query
get_builder = self.session.get(
zenoh_key,
zenoh.Queue(),
target=zenoh.QueryTarget.BEST_MATCHING(),
replies = self.session.get(
selector=zenoh_key,
target=zenoh.QueryTarget.BEST_MATCHING,
attachment=attachment,
value=value,
payload=payload,
timeout=ttl,
)

def get_response():
try:
for reply in get_builder.receiver:
if reply.is_ok:
for reply in replies:
if reply.ok:
handle_response(reply)
break
except Exception:
Expand Down Expand Up @@ -192,11 +190,9 @@ def send_response(self, payload: bytes, attributes: UAttributes) -> UStatus:
msg = "Query doesn't exist"
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg) # Send back the query
value = Value(payload)
reply = Sample(query.key_expr, value, attachment=attachment)

try:
query.reply(reply)
query.reply(query.key_expr, payload, attachment=attachment)
msg = "Successfully sent rpc response to Zenoh"
logging.debug(f"SUCCESS:{msg}")
return UStatus(code=UCode.OK, message=msg)
Expand All @@ -223,7 +219,7 @@ def callback(sample: Sample) -> None:
msg = "Unable to decode attributes"
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
message = UMessage(attributes=u_attribute, payload=sample.payload)
message = UMessage(attributes=u_attribute, payload=bytes(sample.payload))
asyncio.run(listener.on_receive(message))

# Create Zenoh subscriber
Expand Down Expand Up @@ -260,7 +256,7 @@ def callback(query: Query) -> None:
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

message = UMessage(attributes=u_attribute, payload=query.value.payload if query.value else None)
message = UMessage(attributes=u_attribute, payload=bytes(query.payload) if query.payload else None)
self.query_map[u_attribute.id.SerializeToString()] = query
asyncio.run(listener.on_receive(message))

Expand All @@ -286,7 +282,6 @@ async def send(self, message: UMessage) -> UStatus:
source = attributes.source
sink = attributes.sink
zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source, sink)

if not source:
return UStatus(code=UCode.INVALID_ARGUMENT, message="attributes.source shouldn't be empty")
payload = message.payload or b''
Expand Down
86 changes: 39 additions & 47 deletions up_transport_zenoh/zenohutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
from uprotocol.uri.factory.uri_factory import UriFactory
from uprotocol.v1.uattributes_pb2 import (
UAttributes,
UPayloadFormat,
UPriority,
)
from uprotocol.v1.ucode_pb2 import UCode
from uprotocol.v1.uri_pb2 import UUri
from uprotocol.v1.ustatus_pb2 import UStatus
from zenoh import Encoding, Priority
from zenoh.value import Attachment
from zenoh import Priority, ZBytes

UATTRIBUTE_VERSION: int = 1

Expand Down Expand Up @@ -72,77 +70,71 @@ def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]:
@staticmethod
def to_zenoh_key_string(authority_name: str, src_uri: UUri, dst_uri: UUri = None) -> str:
src = ZenohUtils.uri_to_zenoh_key(authority_name, src_uri)
dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri else "{}/{}/{}/{}"
dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri and dst_uri != UUri() else "{}/{}/{}/{}"
return f"up/{src}/{dst}"

@staticmethod
def map_zenoh_priority(upriority: UPriority) -> Priority:
mapping = {
UPriority.UPRIORITY_CS0: Priority.BACKGROUND(),
UPriority.UPRIORITY_CS1: Priority.DATA_LOW(),
UPriority.UPRIORITY_CS2: Priority.DATA(),
UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(),
UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW(),
UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH(),
UPriority.UPRIORITY_CS6: Priority.REAL_TIME(),
UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW(),
UPriority.UPRIORITY_CS0: Priority.BACKGROUND,
UPriority.UPRIORITY_CS1: Priority.DATA_LOW,
UPriority.UPRIORITY_CS2: Priority.DATA,
UPriority.UPRIORITY_CS3: Priority.DATA_HIGH,
UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW,
UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH,
UPriority.UPRIORITY_CS6: Priority.REAL_TIME,
UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW,
}
return mapping[upriority]

@staticmethod
def to_upayload_format(encoding: Encoding) -> UPayloadFormat:
try:
value = int(encoding.suffix)
return value if UPayloadFormat.Name(value) else None
except (ValueError, AttributeError):
return None

@staticmethod
def uattributes_to_attachment(uattributes: UAttributes):
attachment = [("", UATTRIBUTE_VERSION.to_bytes(1, byteorder='little')), ("", uattributes.SerializeToString())]
return attachment
# Convert the version number to bytes (assuming 1 as in the Rust example)
version_bytes = UATTRIBUTE_VERSION.to_bytes(1, byteorder='little')

# Serialize the UAttributes to bytes
uattributes_bytes = uattributes.SerializeToString()

# Combine version bytes and uattributes bytes into one list of bytes
attachment_bytes = [version_bytes, uattributes_bytes]

# Convert the combined bytes to ZBytes
return attachment_bytes

@staticmethod
def attachment_to_uattributes(attachment: Attachment) -> UAttributes:
def attachment_to_uattributes(attachment: ZBytes) -> UAttributes:
try:
version = None
version_found = False
uattributes = None

items = attachment.items()
for pair in items:
if not version_found:
version = pair[1]
version_found = True
else:
# Process UAttributes data
uattributes = UAttributes()
uattributes.ParseFromString(pair[1])
break

if version is None:
msg = f"UAttributes version is empty (should be {UATTRIBUTE_VERSION})"
logging.debug(msg)
raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)
# Convert ZBytes to a list of bytes
attachment_bytes = attachment.deserialize(list)

if not version_found:
msg = "UAttributes version is missing in the attachment"
# Ensure there is at least one byte for the version
if len(attachment_bytes) < 1:
msg = "Unable to get the UAttributes version"
logging.debug(msg)
raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)

if version != UATTRIBUTE_VERSION.to_bytes(1, byteorder='little'):
# Check the version
version = int.from_bytes(bytes(attachment_bytes[0]), byteorder='big')
if version != UATTRIBUTE_VERSION:
msg = f"UAttributes version is {version} (should be {UATTRIBUTE_VERSION})"
logging.debug(msg)
raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)

if uattributes is None:
# Get the attributes from the remaining bytes
uattributes_data = bytes(attachment_bytes[1])
if not uattributes_data:
msg = "Unable to get the UAttributes"
logging.debug(msg)
raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)

# Parse the UAttributes from the bytes
uattributes = UAttributes()
uattributes.ParseFromString(uattributes_data)

return uattributes

except Exception as e:
msg = f"Failed to convert Attachment to UAttributes: {e}"
msg = f"Failed to convert Attachment to UAttributes: {str(e)}"
logging.debug(msg)
raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)

Expand Down

0 comments on commit c2bda41

Please sign in to comment.