Skip to content

Commit

Permalink
Implement watches_all method for defining any groups of objects recon…
Browse files Browse the repository at this point in the history
…ciled together

Signed-off-by: Danil Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Jan 15, 2024
1 parent c7054b5 commit a46b7c1
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub enum ReconcileReason {
ReconcilerRequestedRetry,
ErrorPolicyRequestedRetry,
BulkReconcile,
IndirectlyRelatedObjectUpdated { obj_ref: String },
Custom { reason: String },
}

Expand All @@ -228,6 +229,9 @@ impl Display for ReconcileReason {
ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
ReconcileReason::IndirectlyRelatedObjectUpdated { obj_ref } => {
f.write_fmt(format_args!("indirectly related object updated: {obj_ref}"))
}
ReconcileReason::Custom { reason } => f.write_str(reason),
}
}
Expand Down Expand Up @@ -1085,6 +1089,86 @@ where
self
}

/// Trigger a reconciliation for all managed objects having a user defined relation to `Other`.
/// Mapper method accepts both managed and `Other` objects, allowing to determine if objects are related.
///
/// For example, this can be used to reconcile all managed objects which have an annotation with name of the `Other` object.
/// Usefull when there is a chain of ownership references, where the parent should trigger reconcile for all of the most distant children:
///
/// ```
/// # async {
/// use futures::stream::StreamExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use k8s_openapi::api::apps::v1::Deployment;
/// use kube::{
/// Client,
/// api::{Api, ResourceExt},
/// runtime::{
/// reflector::ObjectRef,
/// controller::{Controller, Action},
/// watcher,
/// },
/// };
/// use std::{convert::Infallible, io::BufRead, sync::Arc};
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
/// watcher::Config::default(),
/// )
/// .watches_all(
/// Api::<Deployment>::all(Client::try_default().await.unwrap()),
/// watcher::Config::default(),
/// |config_map, deployment| match deployment.annotations().get("config") {
/// Some(name) if config_map.name_any() == name.clone() => Some(ObjectRef::from_obj(config_map)),
/// _ => None,
/// },
/// )
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name_any());
/// Ok(Action::await_change())
/// },
/// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
/// Arc::new(()),
/// );
/// # };
/// ```
#[must_use]
pub fn watches_all<Other, I>(
mut self,
api: Api<Other>,
wc: watcher::Config,
mapper: impl Fn(&K, &Other) -> I + Sync + Send + Copy + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Arc<Other>: Send,
I: IntoIterator<Item = ObjectRef<K>> + Sync + Send + 'static,
I::IntoIter: Sync + Send + 'static,
{
let store = self.store();
self.trigger_selector.push(
watcher(api, wc)
.touched_objects()
.filter_map(|o| future::ready(o.is_ok().then(|| o.unwrap())))
.flat_map(move |o| {
let other = Arc::new(o);
stream::iter(store.state().into_iter().map(move |obj| {
stream::iter(mapper(obj.as_ref(), other.as_ref()).into_iter().map(|r| {
Ok(ReconcileRequest {
obj_ref: r.clone(),
reason: ReconcileReason::IndirectlyRelatedObjectUpdated {
obj_ref: r.to_string(),
},
})
}))
}))
})
.flatten()
.boxed(),
);
self
}

/// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
///
/// This can be used to inject reconciliations for specific objects from an external resource.
Expand Down

0 comments on commit a46b7c1

Please sign in to comment.