From a46b7c187c096f13facb2c88105e34d924fd916a Mon Sep 17 00:00:00 2001 From: Danil Grigorev Date: Mon, 15 Jan 2024 09:50:48 +0100 Subject: [PATCH] Implement watches_all method for defining any groups of objects reconciled together Signed-off-by: Danil Grigorev --- kube-runtime/src/controller/mod.rs | 84 ++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index ef4c662a7..c21fd3253 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -214,6 +214,7 @@ pub enum ReconcileReason { ReconcilerRequestedRetry, ErrorPolicyRequestedRetry, BulkReconcile, + IndirectlyRelatedObjectUpdated { obj_ref: String }, Custom { reason: String }, } @@ -228,6 +229,9 @@ impl Display for ReconcileReason { ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"), ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"), ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"), + ReconcileReason::IndirectlyRelatedObjectUpdated { obj_ref } => { + f.write_fmt(format_args!("indirectly related object updated: {obj_ref}")) + } ReconcileReason::Custom { reason } => f.write_str(reason), } } @@ -1085,6 +1089,86 @@ where self } + /// Trigger a reconciliation for all managed objects having a user defined relation to `Other`. + /// Mapper method accepts both managed and `Other` objects, allowing to determine if objects are related. + /// + /// For example, this can be used to reconcile all managed objects which have an annotation with name of the `Other` object. + /// Usefull when there is a chain of ownership references, where the parent should trigger reconcile for all of the most distant children: + /// + /// ``` + /// # async { + /// use futures::stream::StreamExt; + /// use k8s_openapi::api::core::v1::ConfigMap; + /// use k8s_openapi::api::apps::v1::Deployment; + /// use kube::{ + /// Client, + /// api::{Api, ResourceExt}, + /// runtime::{ + /// reflector::ObjectRef, + /// controller::{Controller, Action}, + /// watcher, + /// }, + /// }; + /// use std::{convert::Infallible, io::BufRead, sync::Arc}; + /// Controller::new( + /// Api::::all(Client::try_default().await.unwrap()), + /// watcher::Config::default(), + /// ) + /// .watches_all( + /// Api::::all(Client::try_default().await.unwrap()), + /// watcher::Config::default(), + /// |config_map, deployment| match deployment.annotations().get("config") { + /// Some(name) if config_map.name_any() == name.clone() => Some(ObjectRef::from_obj(config_map)), + /// _ => None, + /// }, + /// ) + /// .run( + /// |o, _| async move { + /// println!("Reconciling {}", o.name_any()); + /// Ok(Action::await_change()) + /// }, + /// |_object: Arc, err: &Infallible, _| Err(err).unwrap(), + /// Arc::new(()), + /// ); + /// # }; + /// ``` + #[must_use] + pub fn watches_all( + mut self, + api: Api, + wc: watcher::Config, + mapper: impl Fn(&K, &Other) -> I + Sync + Send + Copy + 'static, + ) -> Self + where + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Arc: Send, + I: IntoIterator> + Sync + Send + 'static, + I::IntoIter: Sync + Send + 'static, + { + let store = self.store(); + self.trigger_selector.push( + watcher(api, wc) + .touched_objects() + .filter_map(|o| future::ready(o.is_ok().then(|| o.unwrap()))) + .flat_map(move |o| { + let other = Arc::new(o); + stream::iter(store.state().into_iter().map(move |obj| { + stream::iter(mapper(obj.as_ref(), other.as_ref()).into_iter().map(|r| { + Ok(ReconcileRequest { + obj_ref: r.clone(), + reason: ReconcileReason::IndirectlyRelatedObjectUpdated { + obj_ref: r.to_string(), + }, + }) + })) + })) + }) + .flatten() + .boxed(), + ); + self + } + /// Trigger the reconciliation process for a managed object `ObjectRef` whenever `trigger` emits a value /// /// This can be used to inject reconciliations for specific objects from an external resource.