Skip to content

Commit

Permalink
Fix tests & clippy warns
Browse files Browse the repository at this point in the history
Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid committed Apr 3, 2024
1 parent 1b81f4c commit c6d1027
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 28 deletions.
8 changes: 2 additions & 6 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use futures::StreamExt;
use k8s_openapi::api::core::v1::{Pod, PodCondition};
use kube::{
api::{Patch, PatchParams},
runtime::{
controller::Action,
reflector::{self, store::Writer},
watcher, Config, Controller, WatchStreamExt,
},
runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt},
Api, Client, ResourceExt,
};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -130,7 +126,7 @@ async fn main() -> anyhow::Result<()> {
let ctx = Arc::new(Data { client });

// Create a shared store with a predefined buffer that will be shared between subscribers.
let (reader, writer) = reflector::shared_store(SUBSCRIBE_BUFFER_SIZE);
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE);
// Before threading an object watch through the store, create a subscriber.
// Any number of subscribers can be created from one writer.
let subscriber = writer
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_shared_obj_with(obj.clone(), dyntype.clone()),
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,

Check warning on line 137 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L134-L137

Added lines #L134 - L137 were not covered by tests
})
})
Expand Down Expand Up @@ -735,7 +735,7 @@ where
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self_shared(trigger.map(|obj| Ok(obj)), dyntype.clone()).boxed();
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
trigger_selector.push(self_watcher);

Check warning on line 739 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L737-L739

Added lines #L737 - L739 were not covered by tests
Self {
trigger_selector,
Expand Down
30 changes: 15 additions & 15 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ where
K::DynamicType: Eq + std::hash::Hash + Clone,
{
pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
Self { reader, rx }
Self { rx, reader }
}

#[must_use]
pub fn reader(&self) -> Store<K> {
self.reader.clone()

Check warning on line 85 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L84-L85

Added lines #L84 - L85 were not covered by tests
}
Expand All @@ -98,8 +99,7 @@ where
Some(obj_ref) => this
.reader
.get(&obj_ref)

Check warning on line 101 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L100-L101

Added lines #L100 - L101 were not covered by tests
.map(|obj| Poll::Ready(Some(obj)))
.unwrap_or(Poll::Pending),
.map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
None => Poll::Ready(None),
}
}
Expand Down Expand Up @@ -133,8 +133,8 @@ pub(crate) mod test {
Ok(Event::Restarted(vec![foo, bar])),
]);

let (reader, writer) = reflector::store_with_dispatch(10, Default::default());
let reflect = st.reflect_dispatch(writer);
let (reader, writer) = reflector::store_shared(10);
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);

// Prior to any polls, we should have an empty store.
Expand Down Expand Up @@ -178,9 +178,9 @@ pub(crate) mod test {
let foo = Arc::new(foo);
let bar = Arc::new(bar);

let (_, writer) = reflector::store_with_dispatch(10, Default::default());
let subscriber = writer.subscribe();
let reflect = st.reflect_dispatch(writer);
let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);

Expand Down Expand Up @@ -231,9 +231,9 @@ pub(crate) mod test {
let foo = Arc::new(foo);
let bar = Arc::new(bar);

let (_, writer) = reflector::store_with_dispatch(10, Default::default());
let subscriber = writer.subscribe();
let mut reflect = Box::pin(st.reflect_dispatch(writer));
let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let mut reflect = Box::pin(st.reflect_shared(writer));
pin_mut!(subscriber);

assert!(matches!(
Expand Down Expand Up @@ -275,10 +275,10 @@ pub(crate) mod test {
let foo = Arc::new(foo);
let bar = Arc::new(bar);

let (_, writer) = reflector::store_with_dispatch(1, Default::default());
let subscriber = writer.subscribe();
let subscriber_slow = writer.subscribe();
let reflect = st.reflect_dispatch(writer);
let (_, writer) = reflector::store_shared(1);
let subscriber = writer.subscribe().unwrap();
let subscriber_slow = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);
pin_mut!(subscriber_slow);
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
pub use store::{shared_store, store, Store};
pub use store::{store, store_shared, Store};

/// Cache objects from a [`watcher()`] stream into a local [`Store`]
///
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<K: Lookup> ObjectRef<K> {
obj.to_object_ref(dyntype)
}

pub fn from_shared_obj_with(obj: Arc<K>, dyntype: K::DynamicType) -> Self
pub fn from_shared_obj_with(obj: &Arc<K>, dyntype: K::DynamicType) -> Self
where
K: Lookup,
{
Expand Down
8 changes: 5 additions & 3 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
self.dispatcher
.as_ref()
.and_then(|dispatcher| Some(dispatcher.subscribe(self.as_reader())))
.map(|dispatcher| dispatcher.subscribe(self.as_reader()))
}

/// Applies a single watcher event to the store
Expand Down Expand Up @@ -118,7 +118,7 @@ where
dispatcher.broadcast(obj_ref).await;
}
}
_ => {}
watcher::Event::Deleted(_) => {}
}
}
}
Expand Down Expand Up @@ -244,7 +244,9 @@ where
(r, w)
}

pub fn shared_store<K>(buf_size: usize) -> (Store<K>, Writer<K>)
#[must_use]
#[allow(clippy::module_name_repetitions)]
pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
Expand Down

0 comments on commit c6d1027

Please sign in to comment.