From c202af1c52e7057d401044a32a4aa92fd1cc8b79 Mon Sep 17 00:00:00 2001 From: yuyi Date: Wed, 15 Nov 2023 10:37:27 +0800 Subject: [PATCH] refactor(coordinator): speed up reconciling when status change --- pkg/controller/observer_controller.go | 14 ++++++++++---- pkg/resource/coordinator.go | 6 ++++++ pkg/resource/obcluster_manager.go | 4 ++++ pkg/resource/obparameter_manager.go | 4 ++++ pkg/resource/observer_manager.go | 4 ++++ pkg/resource/obtenant_manager.go | 4 ++++ pkg/resource/obtenantbackup_manager.go | 4 ++++ pkg/resource/obtenantbackuppolicy_manager.go | 4 ++++ pkg/resource/obtenantoperation_manager.go | 4 ++++ pkg/resource/obtenantrestore_manager.go | 4 ++++ pkg/resource/obzone_manager.go | 4 ++++ pkg/resource/resource_manager.go | 1 + pkg/resource/template_manager.go | 4 ++++ pkg/task/obcluster_flow.go | 3 +++ 14 files changed, 60 insertions(+), 4 deletions(-) diff --git a/pkg/controller/observer_controller.go b/pkg/controller/observer_controller.go index 5e8480648..da6f5c596 100644 --- a/pkg/controller/observer_controller.go +++ b/pkg/controller/observer_controller.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" @@ -103,16 +104,21 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } } coordinator := resource.NewCoordinator(observerManager, &logger) - _, err = coordinator.Coordinate() - return ctrl.Result{ - RequeueAfter: 5 * time.Second, - }, err + result, err := coordinator.Coordinate() + if err != nil { + return result, err + } + if result.RequeueAfter > 5*time.Second { + result.RequeueAfter = 5 * time.Second + } + return result, nil } // SetupWithManager sets up the controller with the Manager. func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.OBServer{}). + Owns(&corev1.Pod{}). WithEventFilter(preds). Complete(r) } diff --git a/pkg/resource/coordinator.go b/pkg/resource/coordinator.go index b52f1a4c3..0ccecc631 100644 --- a/pkg/resource/coordinator.go +++ b/pkg/resource/coordinator.go @@ -59,6 +59,7 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) { } var f *task.TaskFlow var err error + beforeStatus := c.Manager.GetStatus() if c.Manager.IsNewResource() { c.Manager.InitStatus() } else { @@ -96,6 +97,11 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) { if err != nil { c.Logger.Error(err, "Failed to update status") } + // When status changes(e.g. from running to other status), set a shorter `requeue after` to speed up processing. + if c.Manager.GetStatus() != beforeStatus { + result.RequeueAfter = ExecutionRequeueDuration + } + c.Logger.V(obconst.LogLevelTrace).Info("Requeue after", "duration", result.RequeueAfter) return result, err } diff --git a/pkg/resource/obcluster_manager.go b/pkg/resource/obcluster_manager.go index 2bf9e1195..fe540b00a 100644 --- a/pkg/resource/obcluster_manager.go +++ b/pkg/resource/obcluster_manager.go @@ -46,6 +46,10 @@ func (m *OBClusterManager) IsNewResource() bool { return m.OBCluster.Status.Status == "" } +func (m *OBClusterManager) GetStatus() string { + return m.OBCluster.Status.Status +} + func (m *OBClusterManager) InitStatus() { m.Logger.Info("newly created cluster, init status") m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status") diff --git a/pkg/resource/obparameter_manager.go b/pkg/resource/obparameter_manager.go index eeb86e513..e4e2eb05d 100644 --- a/pkg/resource/obparameter_manager.go +++ b/pkg/resource/obparameter_manager.go @@ -50,6 +50,10 @@ func (m *OBParameterManager) IsNewResource() bool { return m.OBParameter.Status.Status == "" } +func (m *OBParameterManager) GetStatus() string { + return m.OBParameter.Status.Status +} + func (m *OBParameterManager) InitStatus() { m.Logger.Info("newly created obparameter, init status") status := v1alpha1.OBParameterStatus{ diff --git a/pkg/resource/observer_manager.go b/pkg/resource/observer_manager.go index 803001bbc..d1cbff1ae 100644 --- a/pkg/resource/observer_manager.go +++ b/pkg/resource/observer_manager.go @@ -82,6 +82,10 @@ func (m *OBServerManager) IsNewResource() bool { return m.OBServer.Status.Status == "" } +func (m *OBServerManager) GetStatus() string { + return m.OBServer.Status.Status +} + func (m *OBServerManager) InitStatus() { m.Logger.Info("newly created server, init status") status := v1alpha1.OBServerStatus{ diff --git a/pkg/resource/obtenant_manager.go b/pkg/resource/obtenant_manager.go index e453a441c..1a3d96ff0 100644 --- a/pkg/resource/obtenant_manager.go +++ b/pkg/resource/obtenant_manager.go @@ -77,6 +77,10 @@ func (m *OBTenantManager) IsDeleting() bool { return !m.OBTenant.ObjectMeta.DeletionTimestamp.IsZero() } +func (m *OBTenantManager) GetStatus() string { + return m.OBTenant.Status.Status +} + func (m *OBTenantManager) InitStatus() { m.OBTenant.Status = v1alpha1.OBTenantStatus{ Pools: make([]v1alpha1.ResourcePoolStatus, 0, len(m.OBTenant.Spec.Pools)), diff --git a/pkg/resource/obtenantbackup_manager.go b/pkg/resource/obtenantbackup_manager.go index dd34f0bdb..49db03d29 100644 --- a/pkg/resource/obtenantbackup_manager.go +++ b/pkg/resource/obtenantbackup_manager.go @@ -50,6 +50,10 @@ func (m *OBTenantBackupManager) IsNewResource() bool { return m.Resource.Status.Status == "" } +func (m *OBTenantBackupManager) GetStatus() string { + return string(m.Resource.Status.Status) +} + func (m *OBTenantBackupManager) IsDeleting() bool { return m.Resource.GetDeletionTimestamp() != nil } diff --git a/pkg/resource/obtenantbackuppolicy_manager.go b/pkg/resource/obtenantbackuppolicy_manager.go index c14ae0d8c..aef531d18 100644 --- a/pkg/resource/obtenantbackuppolicy_manager.go +++ b/pkg/resource/obtenantbackuppolicy_manager.go @@ -59,6 +59,10 @@ func (m *ObTenantBackupPolicyManager) IsDeleting() bool { return !m.BackupPolicy.ObjectMeta.DeletionTimestamp.IsZero() } +func (m *ObTenantBackupPolicyManager) GetStatus() string { + return string(m.BackupPolicy.Status.Status) +} + func (m *ObTenantBackupPolicyManager) CheckAndUpdateFinalizers() error { policy := m.BackupPolicy finalizerName := "obtenantbackuppolicy.finalizers.oceanbase.com" diff --git a/pkg/resource/obtenantoperation_manager.go b/pkg/resource/obtenantoperation_manager.go index f1606cf8c..eea817d97 100644 --- a/pkg/resource/obtenantoperation_manager.go +++ b/pkg/resource/obtenantoperation_manager.go @@ -52,6 +52,10 @@ func (m *ObTenantOperationManager) IsNewResource() bool { return m.Resource.Status.Status == "" } +func (m *ObTenantOperationManager) GetStatus() string { + return string(m.Resource.Status.Status) +} + func (m *ObTenantOperationManager) IsDeleting() bool { return m.Resource.GetDeletionTimestamp() != nil } diff --git a/pkg/resource/obtenantrestore_manager.go b/pkg/resource/obtenantrestore_manager.go index 47044aa8e..acbdc297a 100644 --- a/pkg/resource/obtenantrestore_manager.go +++ b/pkg/resource/obtenantrestore_manager.go @@ -51,6 +51,10 @@ func (m ObTenantRestoreManager) IsNewResource() bool { return m.Resource.Status.Status == "" } +func (m *ObTenantRestoreManager) GetStatus() string { + return string(m.Resource.Status.Status) +} + func (m ObTenantRestoreManager) IsDeleting() bool { return m.Resource.GetDeletionTimestamp() != nil } diff --git a/pkg/resource/obzone_manager.go b/pkg/resource/obzone_manager.go index 242489db5..8205b8916 100644 --- a/pkg/resource/obzone_manager.go +++ b/pkg/resource/obzone_manager.go @@ -50,6 +50,10 @@ func (m *OBZoneManager) IsNewResource() bool { return m.OBZone.Status.Status == "" } +func (m *OBZoneManager) GetStatus() string { + return m.OBZone.Status.Status +} + func (m *OBZoneManager) InitStatus() { m.Logger.Info("newly created zone, init status") status := v1alpha1.OBZoneStatus{ diff --git a/pkg/resource/resource_manager.go b/pkg/resource/resource_manager.go index f04a61b77..1dbbb9b70 100644 --- a/pkg/resource/resource_manager.go +++ b/pkg/resource/resource_manager.go @@ -28,6 +28,7 @@ type ResourceManager interface { HandleFailure() FinishTask() UpdateStatus() error + GetStatus() string GetTaskFunc(string) (func() error, error) GetTaskFlow() (*task.TaskFlow, error) PrintErrEvent(error) diff --git a/pkg/resource/template_manager.go b/pkg/resource/template_manager.go index c042b0c81..eb91ce007 100644 --- a/pkg/resource/template_manager.go +++ b/pkg/resource/template_manager.go @@ -41,6 +41,10 @@ func (m *ObResourceManager[T]) IsNewResource() bool { return false } +func (m *ObResourceManager[T]) GetStatus() string { + return "" +} + func (m *ObResourceManager[T]) IsDeleting() bool { return false } diff --git a/pkg/task/obcluster_flow.go b/pkg/task/obcluster_flow.go index e2c3afe33..9dfa6ca67 100644 --- a/pkg/task/obcluster_flow.go +++ b/pkg/task/obcluster_flow.go @@ -26,6 +26,9 @@ func BootstrapOBCluster() *TaskFlow { Name: flowname.BootstrapOBCluster, Tasks: []string{taskname.CreateOBZone, taskname.WaitOBZoneBootstrapReady, taskname.Bootstrap}, TargetStatus: clusterstatus.Bootstrapped, + OnFailure: strategy.FailureRule{ + NextTryStatus: "new", + }, }, } }