Skip to content

Commit

Permalink
ConsolidationMode can be Auto (#738)
Browse files Browse the repository at this point in the history
* ConsolidationMode rework

* Fix QueryConsolidation::DEFAULT
  • Loading branch information
Mallets authored Feb 28, 2024
1 parent cc8d4a1 commit 24e5ef5
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 55 deletions.
2 changes: 0 additions & 2 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ where
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
self.write(&mut *writer, v)
}
Expand All @@ -58,7 +57,6 @@ where
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok(c)
Expand Down
16 changes: 5 additions & 11 deletions commons/zenoh-protocol/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub enum Consolidation {
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
/// Remove the duplicates of any samples based on the their timestamp.
Unique,
// Remove the duplicates of any samples based on the their timestamp.
// Unique,
}

impl Consolidation {
Expand All @@ -45,15 +45,9 @@ impl Consolidation {
use rand::prelude::SliceRandom;
let mut rng = rand::thread_rng();

*[
Self::None,
Self::Monotonic,
Self::Latest,
Self::Unique,
Self::Auto,
]
.choose(&mut rng)
.unwrap()
*[Self::None, Self::Monotonic, Self::Latest, Self::Auto]
.choose(&mut rng)
.unwrap()
}
}

Expand Down
45 changes: 8 additions & 37 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,13 @@ use std::collections::HashMap;
use std::future::Ready;
use std::time::Duration;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::zenoh::query::Consolidation;
use zenoh_result::ZResult;

/// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get).
pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType;

/// The kind of consolidation.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum ConsolidationMode {
/// No consolidation applied: multiple samples may be received for the same key-timestamp.
None,
/// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
/// has already been sent with the same key.
///
/// This optimizes latency while potentially reducing bandwidth.
///
/// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
/// been observed with the same key.
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
}

impl From<ConsolidationMode> for Consolidation {
fn from(val: ConsolidationMode) -> Self {
match val {
ConsolidationMode::None => Consolidation::None,
ConsolidationMode::Monotonic => Consolidation::Monotonic,
ConsolidationMode::Latest => Consolidation::Latest,
}
}
}
pub type ConsolidationMode = zenoh_protocol::zenoh::query::Consolidation;

/// The operation: either manual or automatic.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -65,30 +40,26 @@ pub enum Mode<T> {
/// The replies consolidation strategy to apply on replies to a [`get`](Session::get).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct QueryConsolidation {
pub(crate) mode: Mode<ConsolidationMode>,
pub(crate) mode: ConsolidationMode,
}

impl QueryConsolidation {
pub const DEFAULT: Self = Self::AUTO;
/// Automatic query consolidation strategy selection.
pub const AUTO: Self = Self { mode: Mode::Auto };
pub const AUTO: Self = Self {
mode: ConsolidationMode::Auto,
};

pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self {
Self {
mode: Mode::Manual(mode),
}
Self { mode }
}

/// Returns the requested [`ConsolidationMode`].
pub fn mode(&self) -> Mode<ConsolidationMode> {
pub fn mode(&self) -> ConsolidationMode {
self.mode
}
}
impl From<Mode<ConsolidationMode>> for QueryConsolidation {
fn from(mode: Mode<ConsolidationMode>) -> Self {
Self { mode }
}
}

impl From<ConsolidationMode> for QueryConsolidation {
fn from(mode: ConsolidationMode) -> Self {
Self::from_mode(mode)
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1728,14 +1728,14 @@ impl Session {
log::trace!("get({}, {:?}, {:?})", selector, target, consolidation);
let mut state = zwrite!(self.state);
let consolidation = match consolidation.mode {
Mode::Auto => {
ConsolidationMode::Auto => {
if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) {
ConsolidationMode::None
} else {
ConsolidationMode::Latest
}
}
Mode::Manual(mode) => mode,
mode => mode,
};
let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst);
let nb_final = match destination {
Expand Down Expand Up @@ -1808,7 +1808,7 @@ impl Session {
ext_budget: None,
ext_timeout: Some(timeout),
payload: RequestBody::Query(zenoh_protocol::zenoh::Query {
consolidation: consolidation.into(),
consolidation,
parameters: selector.parameters().to_string(),
ext_sinfo: None,
ext_body: value.as_ref().map(|v| query::ext::QueryBodyType {
Expand All @@ -1829,7 +1829,7 @@ impl Session {
selector.parameters(),
qid,
target,
consolidation.into(),
consolidation,
value.as_ref().map(|v| query::ext::QueryBodyType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -2441,7 +2441,7 @@ impl Primitives for Session {
}
}
}
ConsolidationMode::Latest => {
Consolidation::Auto | ConsolidationMode::Latest => {
match query.replies.as_ref().unwrap().get(
new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(),
) {
Expand Down

0 comments on commit 24e5ef5

Please sign in to comment.