Skip to content

Commit

Permalink
Add comments and feature flags
Browse files Browse the repository at this point in the history
Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid committed Apr 15, 2024
1 parent 8347103 commit 9f7edd1
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
26 changes: 26 additions & 0 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use super::Lookup;

#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
// A helper type that holds a broadcast transmitter and a broadcast receiver,
// used to fan-out events from a root stream to multiple listeners.
pub(crate) struct Dispatcher<K>
where
K: Lookup + Clone + 'static,
Expand All @@ -32,23 +34,47 @@ where
K::DynamicType: Eq + std::hash::Hash + Clone,
{
pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
// Create a broadcast (tx, rx) pair
let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
// The tx half will not wait for any receivers to be active before
// broadcasting events. If no receivers are active, events will be
// buffered.
dispatch_tx.set_await_active(false);
Self {
dispatch_tx,
_dispatch_rx: dispatch_rx.deactivate(),
}
}

// Calls broadcast on the channel. Will return when the channel has enough
// space to send an event.
pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
}

// Creates a `ReflectHandle` by creating a receiver from the tx half.
// N.B: the new receiver will be fast-forwarded to the _latest_ event.
// The receiver won't have access to any events that are currently waiting
// to be acked by listeners.
pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
}
}

/// A handle to a shared stream reader
///
/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`],
/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each
/// shared stream reader should be polled independently and driven to readiness
/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure
/// will be applied on the root stream side.
///
/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s
/// subscribed to the stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`ReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
///
/// [`Writer`]: crate::reflector::Writer
#[pin_project]
pub struct ReflectHandle<K>
where
Expand Down
21 changes: 21 additions & 0 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ where
}
}

/// Creates a new Writer with the specified dynamic type and buffer size.
///
/// When the Writer is created through `new_shared`, it will be able to
/// be subscribed. Stored objects will be propagated to all subscribers. The
/// buffer size is used for the underlying channel. An object is cleared
/// from the buffer only when all subscribers have seen it.
///
/// If the dynamic type is default-able (for example when writer is used with
/// `k8s_openapi` types) you can use `Default` instead.
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn new_shared(dyntype: K::DynamicType, buf_size: usize) -> Self {
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
Expand All @@ -69,6 +79,10 @@ where
}
}

/// Return a handle to a subscriber
///
/// Multiple subscribe handles may be obtained, by either calling
/// `subscribe` multiple times, or by calling `clone()`
pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
self.dispatcher
.as_ref()
Expand Down Expand Up @@ -102,6 +116,7 @@ where
}
}

/// Broadcast an event to any downstream listeners subscribed on the store
pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
if let Some(ref mut dispatcher) = self.dispatcher {
match event {
Expand Down Expand Up @@ -244,8 +259,14 @@ where
(r, w)
}

/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
///
/// The resulting `Writer` can be subscribed on in order to fan out events from
/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()),
/// and the [`Store`] is a read-only handle.
#[must_use]
#[allow(clippy::module_name_repetitions)]
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
where
K: Lookup + Clone + 'static,
Expand Down
78 changes: 78 additions & 0 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,84 @@ pub trait WatchStreamExt: Stream {
Reflect::new(self, writer)
}

/// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`]
///
/// Returns the stream unmodified, but passes every [`watcher::Event`]
/// through a [`Writer`]. This populates a [`Store`] as the stream is
/// polled. When the [`watcher::Event`] is not an error or a
/// [`watcher::Event::Deleted`] then its inner object will also be
/// propagated to subscribers.
///
/// Subscribers can be created by calling [`subscribe()`] on a [`Writer`].
/// This will return a [`ReflectHandle`] stream that should be polled
/// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s
/// subscribed to the stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`ReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
///
/// **NB**: This adapter requires an
/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
/// feature
///
/// ## Warning
///
/// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will
/// never receive any events. This will cause the streams to deadlock since
/// the root stream will apply backpressure when downstream readers are not
/// consuming events.
///
///
/// [`Store`]: crate::reflector::Store
/// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle
/// ## Usage
/// ```no_run
/// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
/// # use std::time::Duration;
/// # use tracing::{info, warn};
/// use kube::{Api, Client, ResourceExt};
/// use kube_runtime::{watcher, WatchStreamExt, reflector};
/// use k8s_openapi::api::apps::v1::Deployment;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
///
/// let deploys: Api<Deployment> = Api::default_namespaced(client);
/// let subscriber_buf_sz = 100;
/// let (reader, writer) = reflector::store_shared(subscriber_buf_sz)::<Deployment>();
/// let subscriber = &writer.subscribe().unwrap();
///
/// tokio::spawn(async move {
/// // start polling the store once the reader is ready
/// reader.wait_until_ready().await.unwrap();
/// loop {
/// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
/// info!("Current {} deploys: {:?}", names.len(), names);
/// tokio::time::sleep(Duration::from_secs(10)).await;
/// }
/// });
///
/// // configure the watcher stream and populate the store while polling
/// watcher(deploys, watcher::Config::default())
/// .reflect_shared(writer)
/// .applied_objects()
/// .for_each(|res| async move {
/// match res {
/// Ok(o) => info!("saw in root stream {}", o.name_any()),
/// Err(e) => warn!("watcher error in root stream: {}", e),
/// }
/// })
/// .await;
///
/// // subscriber can be used to receive applied_objects
/// subscriber
/// .for_each(|obj| async move {
/// info!("saw in subscriber {}", &obj.name_any())
/// })
/// await;
///
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "unstable-runtime-subscribe")]
fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item>
where
Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
Expand Down

0 comments on commit 9f7edd1

Please sign in to comment.