Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Reply protocol definition and codec #717

Merged
merged 9 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ where
fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
match x {
ResponseBody::Reply(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Ack(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Put(b) => self.write(&mut *writer, b),
}
}
Expand All @@ -140,8 +140,8 @@ where
let codec = Zenoh080Header::new(header);
let body = match imsg::mid(codec.header) {
id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::ACK => ResponseBody::Ack(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::PUT => ResponseBody::Put(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};
Expand Down
70 changes: 34 additions & 36 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,46 @@ use zenoh_protocol::{
common::{iext, imsg},
zenoh::{
id,
query::{ext, flag, Query},
query::{ext, flag, Consolidation, Query},
},
};

// Extension Consolidation
impl<W> WCodec<(ext::ConsolidationType, bool), &mut W> for Zenoh080
// Consolidation
impl<W> WCodec<Consolidation, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (ext::ConsolidationType, bool)) -> Self::Output {
let (x, more) = x;
fn write(self, writer: &mut W, x: Consolidation) -> Self::Output {
let v: u64 = match x {
ext::ConsolidationType::Auto => 0,
ext::ConsolidationType::None => 1,
ext::ConsolidationType::Monotonic => 2,
ext::ConsolidationType::Latest => 3,
ext::ConsolidationType::Unique => 4,
Consolidation::Auto => 0,
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
let v = ext::Consolidation::new(v);
self.write(&mut *writer, (&v, more))
self.write(&mut *writer, v)
}
}

impl<R> RCodec<(ext::ConsolidationType, bool), &mut R> for Zenoh080Header
impl<R> RCodec<Consolidation, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::ConsolidationType, bool), Self::Error> {
let (ext, more): (ext::Consolidation, bool) = self.read(&mut *reader)?;
let c = match ext.value {
0 => ext::ConsolidationType::Auto,
1 => ext::ConsolidationType::None,
2 => ext::ConsolidationType::Monotonic,
3 => ext::ConsolidationType::Latest,
4 => ext::ConsolidationType::Unique,
_ => return Err(DidntRead),
fn read(self, reader: &mut R) -> Result<Consolidation, Self::Error> {
let v: u64 = self.read(&mut *reader)?;
let c = match v {
0 => Consolidation::Auto,
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok((c, more))
Ok(c)
}
}

Expand All @@ -75,21 +73,23 @@ where

fn write(self, writer: &mut W, x: &Query) -> Self::Output {
let Query {
consolidation,
parameters,
ext_sinfo,
ext_consolidation,
ext_body,
ext_attachment,
ext_unknown,
} = x;

// Header
let mut header = id::QUERY;
if consolidation != &Consolidation::default() {
header |= flag::C;
}
if !parameters.is_empty() {
header |= flag::P;
}
let mut n_exts = (ext_sinfo.is_some() as u8)
+ ((ext_consolidation != &ext::ConsolidationType::default()) as u8)
+ (ext_body.is_some() as u8)
+ (ext_attachment.is_some() as u8)
+ (ext_unknown.len() as u8);
Expand All @@ -99,6 +99,9 @@ where
self.write(&mut *writer, header)?;

// Body
if consolidation != &Consolidation::default() {
self.write(&mut *writer, *consolidation)?;
}
if !parameters.is_empty() {
self.write(&mut *writer, parameters)?;
}
Expand All @@ -108,10 +111,6 @@ where
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if ext_consolidation != &ext::ConsolidationType::default() {
n_exts -= 1;
self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?;
}
if let Some(body) = ext_body.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (body, n_exts != 0))?;
Expand Down Expand Up @@ -154,14 +153,18 @@ where
}

// Body
let mut consolidation = Consolidation::default();
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}

let mut parameters = String::new();
if imsg::has_flag(self.header, flag::P) {
parameters = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_consolidation = ext::ConsolidationType::default();
let mut ext_body: Option<ext::QueryBodyType> = None;
let mut ext_attachment: Option<ext::AttachmentType> = None;
let mut ext_unknown = Vec::new();
Expand All @@ -176,11 +179,6 @@ where
ext_sinfo = Some(s);
has_ext = ext;
}
ext::Consolidation::ID => {
let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?;
ext_consolidation = c;
has_ext = ext;
}
ext::QueryBodyType::SID | ext::QueryBodyType::VID => {
let (s, ext): (ext::QueryBodyType, bool) = eodec.read(&mut *reader)?;
ext_body = Some(s);
Expand All @@ -200,9 +198,9 @@ where
}

Ok(Query {
consolidation,
parameters,
ext_sinfo,
ext_consolidation,
ext_body,
ext_attachment,
ext_unknown,
Expand Down
141 changes: 18 additions & 123 deletions commons/zenoh-codec/src/zenoh/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,18 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(not(feature = "shared-memory"))]
use crate::Zenoh080Bounded;
#[cfg(feature = "shared-memory")]
use crate::Zenoh080Sliced;
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
use alloc::vec::Vec;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg},
core::Encoding,
common::imsg,
zenoh::{
id,
reply::{ext, flag, Reply},
query::Consolidation,
reply::{flag, Reply, ReplyBody},
},
};

Expand All @@ -39,81 +34,35 @@ where

fn write(self, writer: &mut W, x: &Reply) -> Self::Output {
let Reply {
timestamp,
encoding,
ext_sinfo,
ext_consolidation,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_attachment,
consolidation,
ext_unknown,
payload,
} = x;

// Header
let mut header = id::REPLY;
if timestamp.is_some() {
header |= flag::T;
}
if encoding != &Encoding::default() {
header |= flag::E;
}
let mut n_exts = (ext_sinfo.is_some()) as u8
+ ((ext_consolidation != &ext::ConsolidationType::default()) as u8)
+ (ext_attachment.is_some()) as u8
+ (ext_unknown.len() as u8);
#[cfg(feature = "shared-memory")]
{
n_exts += ext_shm.is_some() as u8;
if consolidation != &Consolidation::default() {
header |= flag::C;
}
let mut n_exts = ext_unknown.len() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
if let Some(ts) = timestamp.as_ref() {
self.write(&mut *writer, ts)?;
}
if encoding != &Encoding::default() {
self.write(&mut *writer, encoding)?;
if consolidation != &Consolidation::default() {
self.write(&mut *writer, *consolidation)?;
}

// Extensions
if let Some(sinfo) = ext_sinfo.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if ext_consolidation != &ext::ConsolidationType::default() {
n_exts -= 1;
self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(eshm) = ext_shm.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (eshm, n_exts != 0))?;
}
if let Some(att) = ext_attachment.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (att, n_exts != 0))?;
}
for u in ext_unknown.iter() {
n_exts -= 1;
self.write(&mut *writer, (u, n_exts != 0))?;
}

// Payload
#[cfg(feature = "shared-memory")]
{
let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
codec.write(&mut *writer, payload)?;
}

#[cfg(not(feature = "shared-memory"))]
{
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, payload)?;
}
self.write(&mut *writer, payload)?;

Ok(())
}
Expand Down Expand Up @@ -144,81 +93,27 @@ where
}

// Body
let mut timestamp: Option<uhlc::Timestamp> = None;
if imsg::has_flag(self.header, flag::T) {
timestamp = Some(self.codec.read(&mut *reader)?);
}

let mut encoding = Encoding::default();
if imsg::has_flag(self.header, flag::E) {
encoding = self.codec.read(&mut *reader)?;
let mut consolidation = Consolidation::default();
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_consolidation = ext::ConsolidationType::default();
#[cfg(feature = "shared-memory")]
let mut ext_shm: Option<ext::ShmType> = None;
let mut ext_attachment: Option<ext::AttachmentType> = None;
let mut ext_unknown = Vec::new();

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
ext::SourceInfo::ID => {
let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
ext_sinfo = Some(s);
has_ext = ext;
}
ext::Consolidation::ID => {
let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?;
ext_consolidation = c;
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
ext_shm = Some(s);
has_ext = ext;
}
ext::Attachment::ID => {
let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
ext_attachment = Some(a);
has_ext = ext;
}
_ => {
let (u, ext) = extension::read(reader, "Reply", ext)?;
ext_unknown.push(u);
has_ext = ext;
}
}
let (u, ext) = extension::read(reader, "Reply", ext)?;
ext_unknown.push(u);
has_ext = ext;
}

// Payload
let payload: ZBuf = {
#[cfg(feature = "shared-memory")]
{
let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
codec.read(&mut *reader)?
}

#[cfg(not(feature = "shared-memory"))]
{
let bodec = Zenoh080Bounded::<u32>::new();
bodec.read(&mut *reader)?
}
};
let payload: ReplyBody = self.codec.read(&mut *reader)?;

Ok(Reply {
timestamp,
encoding,
ext_sinfo,
ext_consolidation,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_attachment,
consolidation,
ext_unknown,
payload,
})
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ fn codec_network() {
run!(NetworkMessage, NetworkMessage::rand());
}

// Zenoh new
// Zenoh
#[test]
fn codec_put() {
run!(zenoh::Put, zenoh::Put::rand());
Expand Down
Loading