From 9f7edd12e1194daacb87bda01b525ef33f83d41e Mon Sep 17 00:00:00 2001 From: Matei David <matei.david.35@gmail.com> Date: Mon, 15 Apr 2024 19:28:03 +0000 Subject: [PATCH] Add comments and feature flags Signed-off-by: Matei David <matei.david.35@gmail.com> --- kube-runtime/src/reflector/dispatcher.rs | 26 ++++++++ kube-runtime/src/reflector/store.rs | 21 +++++++ kube-runtime/src/utils/watch_ext.rs | 78 ++++++++++++++++++++++++ 3 files changed, 125 insertions(+) diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 71b6716a1..22084214c 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -15,6 +15,8 @@ use super::Lookup; #[derive(Derivative)] #[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)] +// A helper type that holds a broadcast transmitter and a broadcast receiver, +// used to fan-out events from a root stream to multiple listeners. pub(crate) struct Dispatcher<K> where K: Lookup + Clone + 'static, @@ -32,7 +34,11 @@ where K::DynamicType: Eq + std::hash::Hash + Clone, { pub(crate) fn new(buf_size: usize) -> Dispatcher<K> { + // Create a broadcast (tx, rx) pair let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size); + // The tx half will not wait for any receivers to be active before + // broadcasting events. If no receivers are active, events will be + // buffered. dispatch_tx.set_await_active(false); Self { dispatch_tx, @@ -40,15 +46,35 @@ where } } + // Calls broadcast on the channel. Will return when the channel has enough + // space to send an event. pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) { let _ = self.dispatch_tx.broadcast_direct(obj_ref).await; } + // Creates a `ReflectHandle` by creating a receiver from the tx half. + // N.B: the new receiver will be fast-forwarded to the _latest_ event. + // The receiver won't have access to any events that are currently waiting + // to be acked by listeners. pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> { ReflectHandle::new(reader, self.dispatch_tx.new_receiver()) } } +/// A handle to a shared stream reader +/// +/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`], +/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each +/// shared stream reader should be polled independently and driven to readiness +/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure +/// will be applied on the root stream side. +/// +/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s +/// subscribed to the stream will also terminate after all events yielded by +/// the root stream have been observed. This means [`ReflectHandle`] streams +/// can still be polled after the root stream has been dropped. +/// +/// [`Writer`]: crate::reflector::Writer #[pin_project] pub struct ReflectHandle<K> where diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index a3917b29e..b6645e46b 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -46,6 +46,16 @@ where } } + /// Creates a new Writer with the specified dynamic type and buffer size. + /// + /// When the Writer is created through `new_shared`, it will be able to + /// be subscribed. Stored objects will be propagated to all subscribers. The + /// buffer size is used for the underlying channel. An object is cleared + /// from the buffer only when all subscribers have seen it. + /// + /// If the dynamic type is default-able (for example when writer is used with + /// `k8s_openapi` types) you can use `Default` instead. + #[cfg(feature = "unstable-runtime-subscribe")] pub fn new_shared(dyntype: K::DynamicType, buf_size: usize) -> Self { let (ready_tx, ready_rx) = DelayedInit::new(); Writer { @@ -69,6 +79,10 @@ where } } + /// Return a handle to a subscriber + /// + /// Multiple subscribe handles may be obtained, by either calling + /// `subscribe` multiple times, or by calling `clone()` pub fn subscribe(&self) -> Option<ReflectHandle<K>> { self.dispatcher .as_ref() @@ -102,6 +116,7 @@ where } } + /// Broadcast an event to any downstream listeners subscribed on the store pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) { if let Some(ref mut dispatcher) = self.dispatcher { match event { @@ -244,8 +259,14 @@ where (r, w) } +/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K` +/// +/// The resulting `Writer` can be subscribed on in order to fan out events from +/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()), +/// and the [`Store`] is a read-only handle. #[must_use] #[allow(clippy::module_name_repetitions)] +#[cfg(feature = "unstable-runtime-subscribe")] pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>) where K: Lookup + Clone + 'static, diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index d5d2b4a92..d01a389bf 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -249,6 +249,84 @@ pub trait WatchStreamExt: Stream { Reflect::new(self, writer) } + /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`] + /// + /// Returns the stream unmodified, but passes every [`watcher::Event`] + /// through a [`Writer`]. This populates a [`Store`] as the stream is + /// polled. When the [`watcher::Event`] is not an error or a + /// [`watcher::Event::Deleted`] then its inner object will also be + /// propagated to subscribers. + /// + /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`]. + /// This will return a [`ReflectHandle`] stream that should be polled + /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s + /// subscribed to the stream will also terminate after all events yielded by + /// the root stream have been observed. This means [`ReflectHandle`] streams + /// can still be polled after the root stream has been dropped. + /// + /// **NB**: This adapter requires an + /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) + /// feature + /// + /// ## Warning + /// + /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will + /// never receive any events. This will cause the streams to deadlock since + /// the root stream will apply backpressure when downstream readers are not + /// consuming events. + /// + /// + /// [`Store`]: crate::reflector::Store + /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle + /// ## Usage + /// ```no_run + /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use std::time::Duration; + /// # use tracing::{info, warn}; + /// use kube::{Api, Client, ResourceExt}; + /// use kube_runtime::{watcher, WatchStreamExt, reflector}; + /// use k8s_openapi::api::apps::v1::Deployment; + /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> { + /// # let client: kube::Client = todo!(); + /// + /// let deploys: Api<Deployment> = Api::default_namespaced(client); + /// let subscriber_buf_sz = 100; + /// let (reader, writer) = reflector::store_shared(subscriber_buf_sz)::<Deployment>(); + /// let subscriber = &writer.subscribe().unwrap(); + /// + /// tokio::spawn(async move { + /// // start polling the store once the reader is ready + /// reader.wait_until_ready().await.unwrap(); + /// loop { + /// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>(); + /// info!("Current {} deploys: {:?}", names.len(), names); + /// tokio::time::sleep(Duration::from_secs(10)).await; + /// } + /// }); + /// + /// // configure the watcher stream and populate the store while polling + /// watcher(deploys, watcher::Config::default()) + /// .reflect_shared(writer) + /// .applied_objects() + /// .for_each(|res| async move { + /// match res { + /// Ok(o) => info!("saw in root stream {}", o.name_any()), + /// Err(e) => warn!("watcher error in root stream: {}", e), + /// } + /// }) + /// .await; + /// + /// // subscriber can be used to receive applied_objects + /// subscriber + /// .for_each(|obj| async move { + /// info!("saw in subscriber {}", &obj.name_any()) + /// }) + /// await; + /// + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-subscribe")] fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item> where Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,