From c6d1027ecbda0afb95145351bf6aa8a959e7f90f Mon Sep 17 00:00:00 2001 From: Matei David Date: Wed, 3 Apr 2024 20:18:29 +0000 Subject: [PATCH] Fix tests & clippy warns Signed-off-by: Matei David --- examples/shared_stream_controllers.rs | 8 ++----- kube-runtime/src/controller/mod.rs | 4 ++-- kube-runtime/src/reflector/dispatcher.rs | 30 ++++++++++++------------ kube-runtime/src/reflector/mod.rs | 2 +- kube-runtime/src/reflector/object_ref.rs | 2 +- kube-runtime/src/reflector/store.rs | 8 ++++--- 6 files changed, 26 insertions(+), 28 deletions(-) diff --git a/examples/shared_stream_controllers.rs b/examples/shared_stream_controllers.rs index 3294ca6d4..cbfe79e95 100644 --- a/examples/shared_stream_controllers.rs +++ b/examples/shared_stream_controllers.rs @@ -4,11 +4,7 @@ use futures::StreamExt; use k8s_openapi::api::core::v1::{Pod, PodCondition}; use kube::{ api::{Patch, PatchParams}, - runtime::{ - controller::Action, - reflector::{self, store::Writer}, - watcher, Config, Controller, WatchStreamExt, - }, + runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt}, Api, Client, ResourceExt, }; use tokio::sync::mpsc; @@ -130,7 +126,7 @@ async fn main() -> anyhow::Result<()> { let ctx = Arc::new(Data { client }); // Create a shared store with a predefined buffer that will be shared between subscribers. - let (reader, writer) = reflector::shared_store(SUBSCRIBE_BUFFER_SIZE); + let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE); // Before threading an object watch through the store, create a subscriber. // Any number of subscribers can be created from one writer. let subscriber = writer diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 52de72544..c83aa0bb0 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -133,7 +133,7 @@ where { trigger_with(stream, move |obj| { Some(ReconcileRequest { - obj_ref: ObjectRef::from_shared_obj_with(obj.clone(), dyntype.clone()), + obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()), reason: ReconcileReason::ObjectUpdated, }) }) @@ -735,7 +735,7 @@ where dyntype: K::DynamicType, ) -> Self { let mut trigger_selector = stream::SelectAll::new(); - let self_watcher = trigger_self_shared(trigger.map(|obj| Ok(obj)), dyntype.clone()).boxed(); + let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed(); trigger_selector.push(self_watcher); Self { trigger_selector, diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 151b0e15c..fdc290d0b 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -77,9 +77,10 @@ where K::DynamicType: Eq + std::hash::Hash + Clone, { pub(super) fn new(reader: Store, rx: Receiver>) -> ReflectHandle { - Self { reader, rx } + Self { rx, reader } } + #[must_use] pub fn reader(&self) -> Store { self.reader.clone() } @@ -98,8 +99,7 @@ where Some(obj_ref) => this .reader .get(&obj_ref) - .map(|obj| Poll::Ready(Some(obj))) - .unwrap_or(Poll::Pending), + .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), None => Poll::Ready(None), } } @@ -133,8 +133,8 @@ pub(crate) mod test { Ok(Event::Restarted(vec![foo, bar])), ]); - let (reader, writer) = reflector::store_with_dispatch(10, Default::default()); - let reflect = st.reflect_dispatch(writer); + let (reader, writer) = reflector::store_shared(10); + let reflect = st.reflect_shared(writer); pin_mut!(reflect); // Prior to any polls, we should have an empty store. @@ -178,9 +178,9 @@ pub(crate) mod test { let foo = Arc::new(foo); let bar = Arc::new(bar); - let (_, writer) = reflector::store_with_dispatch(10, Default::default()); - let subscriber = writer.subscribe(); - let reflect = st.reflect_dispatch(writer); + let (_, writer) = reflector::store_shared(10); + let subscriber = writer.subscribe().unwrap(); + let reflect = st.reflect_shared(writer); pin_mut!(reflect); pin_mut!(subscriber); @@ -231,9 +231,9 @@ pub(crate) mod test { let foo = Arc::new(foo); let bar = Arc::new(bar); - let (_, writer) = reflector::store_with_dispatch(10, Default::default()); - let subscriber = writer.subscribe(); - let mut reflect = Box::pin(st.reflect_dispatch(writer)); + let (_, writer) = reflector::store_shared(10); + let subscriber = writer.subscribe().unwrap(); + let mut reflect = Box::pin(st.reflect_shared(writer)); pin_mut!(subscriber); assert!(matches!( @@ -275,10 +275,10 @@ pub(crate) mod test { let foo = Arc::new(foo); let bar = Arc::new(bar); - let (_, writer) = reflector::store_with_dispatch(1, Default::default()); - let subscriber = writer.subscribe(); - let subscriber_slow = writer.subscribe(); - let reflect = st.reflect_dispatch(writer); + let (_, writer) = reflector::store_shared(1); + let subscriber = writer.subscribe().unwrap(); + let subscriber_slow = writer.subscribe().unwrap(); + let reflect = st.reflect_shared(writer); pin_mut!(reflect); pin_mut!(subscriber); pin_mut!(subscriber_slow); diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 7950a9416..eb998b346 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -10,7 +10,7 @@ use crate::watcher; use async_stream::stream; use futures::{Stream, StreamExt}; use std::hash::Hash; -pub use store::{shared_store, store, Store}; +pub use store::{store, store_shared, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] /// diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index f2533cfde..6f2257b33 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -203,7 +203,7 @@ impl ObjectRef { obj.to_object_ref(dyntype) } - pub fn from_shared_obj_with(obj: Arc, dyntype: K::DynamicType) -> Self + pub fn from_shared_obj_with(obj: &Arc, dyntype: K::DynamicType) -> Self where K: Lookup, { diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index ef44f8baf..a3917b29e 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -72,7 +72,7 @@ where pub fn subscribe(&self) -> Option> { self.dispatcher .as_ref() - .and_then(|dispatcher| Some(dispatcher.subscribe(self.as_reader()))) + .map(|dispatcher| dispatcher.subscribe(self.as_reader())) } /// Applies a single watcher event to the store @@ -118,7 +118,7 @@ where dispatcher.broadcast(obj_ref).await; } } - _ => {} + watcher::Event::Deleted(_) => {} } } } @@ -244,7 +244,9 @@ where (r, w) } -pub fn shared_store(buf_size: usize) -> (Store, Writer) +#[must_use] +#[allow(clippy::module_name_repetitions)] +pub fn store_shared(buf_size: usize) -> (Store, Writer) where K: Lookup + Clone + 'static, K::DynamicType: Eq + Hash + Clone + Default,