Skip to content

Commit

Permalink
refactor(coordinator): speed up reconciling when status change
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI committed Nov 15, 2023
1 parent 7409300 commit c202af1
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -103,16 +104,21 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}
coordinator := resource.NewCoordinator(observerManager, &logger)
_, err = coordinator.Coordinate()
return ctrl.Result{
RequeueAfter: 5 * time.Second,
}, err
result, err := coordinator.Coordinate()
if err != nil {
return result, err
}
if result.RequeueAfter > 5*time.Second {
result.RequeueAfter = 5 * time.Second
}
return result, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.OBServer{}).
Owns(&corev1.Pod{}).
WithEventFilter(preds).
Complete(r)
}
6 changes: 6 additions & 0 deletions pkg/resource/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) {
}
var f *task.TaskFlow
var err error
beforeStatus := c.Manager.GetStatus()
if c.Manager.IsNewResource() {
c.Manager.InitStatus()
} else {
Expand Down Expand Up @@ -96,6 +97,11 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) {
if err != nil {
c.Logger.Error(err, "Failed to update status")
}
// When status changes(e.g. from running to other status), set a shorter `requeue after` to speed up processing.
if c.Manager.GetStatus() != beforeStatus {
result.RequeueAfter = ExecutionRequeueDuration
}
c.Logger.V(obconst.LogLevelTrace).Info("Requeue after", "duration", result.RequeueAfter)
return result, err
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (m *OBClusterManager) IsNewResource() bool {
return m.OBCluster.Status.Status == ""
}

func (m *OBClusterManager) GetStatus() string {
return m.OBCluster.Status.Status
}

func (m *OBClusterManager) InitStatus() {
m.Logger.Info("newly created cluster, init status")
m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status")
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obparameter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *OBParameterManager) IsNewResource() bool {
return m.OBParameter.Status.Status == ""
}

func (m *OBParameterManager) GetStatus() string {
return m.OBParameter.Status.Status
}

func (m *OBParameterManager) InitStatus() {
m.Logger.Info("newly created obparameter, init status")
status := v1alpha1.OBParameterStatus{
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/observer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (m *OBServerManager) IsNewResource() bool {
return m.OBServer.Status.Status == ""
}

func (m *OBServerManager) GetStatus() string {
return m.OBServer.Status.Status
}

func (m *OBServerManager) InitStatus() {
m.Logger.Info("newly created server, init status")
status := v1alpha1.OBServerStatus{
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obtenant_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (m *OBTenantManager) IsDeleting() bool {
return !m.OBTenant.ObjectMeta.DeletionTimestamp.IsZero()
}

func (m *OBTenantManager) GetStatus() string {
return m.OBTenant.Status.Status
}

func (m *OBTenantManager) InitStatus() {
m.OBTenant.Status = v1alpha1.OBTenantStatus{
Pools: make([]v1alpha1.ResourcePoolStatus, 0, len(m.OBTenant.Spec.Pools)),
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obtenantbackup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *OBTenantBackupManager) IsNewResource() bool {
return m.Resource.Status.Status == ""
}

func (m *OBTenantBackupManager) GetStatus() string {
return string(m.Resource.Status.Status)
}

func (m *OBTenantBackupManager) IsDeleting() bool {
return m.Resource.GetDeletionTimestamp() != nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obtenantbackuppolicy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (m *ObTenantBackupPolicyManager) IsDeleting() bool {
return !m.BackupPolicy.ObjectMeta.DeletionTimestamp.IsZero()
}

func (m *ObTenantBackupPolicyManager) GetStatus() string {
return string(m.BackupPolicy.Status.Status)
}

func (m *ObTenantBackupPolicyManager) CheckAndUpdateFinalizers() error {
policy := m.BackupPolicy
finalizerName := "obtenantbackuppolicy.finalizers.oceanbase.com"
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obtenantoperation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (m *ObTenantOperationManager) IsNewResource() bool {
return m.Resource.Status.Status == ""
}

func (m *ObTenantOperationManager) GetStatus() string {
return string(m.Resource.Status.Status)
}

func (m *ObTenantOperationManager) IsDeleting() bool {
return m.Resource.GetDeletionTimestamp() != nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obtenantrestore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (m ObTenantRestoreManager) IsNewResource() bool {
return m.Resource.Status.Status == ""
}

func (m *ObTenantRestoreManager) GetStatus() string {
return string(m.Resource.Status.Status)
}

func (m ObTenantRestoreManager) IsDeleting() bool {
return m.Resource.GetDeletionTimestamp() != nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/obzone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *OBZoneManager) IsNewResource() bool {
return m.OBZone.Status.Status == ""
}

func (m *OBZoneManager) GetStatus() string {
return m.OBZone.Status.Status
}

func (m *OBZoneManager) InitStatus() {
m.Logger.Info("newly created zone, init status")
status := v1alpha1.OBZoneStatus{
Expand Down
1 change: 1 addition & 0 deletions pkg/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ResourceManager interface {
HandleFailure()
FinishTask()
UpdateStatus() error
GetStatus() string
GetTaskFunc(string) (func() error, error)
GetTaskFlow() (*task.TaskFlow, error)
PrintErrEvent(error)
Expand Down
4 changes: 4 additions & 0 deletions pkg/resource/template_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *ObResourceManager[T]) IsNewResource() bool {
return false
}

func (m *ObResourceManager[T]) GetStatus() string {
return ""
}

func (m *ObResourceManager[T]) IsDeleting() bool {
return false
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/task/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func BootstrapOBCluster() *TaskFlow {
Name: flowname.BootstrapOBCluster,
Tasks: []string{taskname.CreateOBZone, taskname.WaitOBZoneBootstrapReady, taskname.Bootstrap},
TargetStatus: clusterstatus.Bootstrapped,
OnFailure: strategy.FailureRule{
NextTryStatus: "new",
},
},
}
}
Expand Down

0 comments on commit c202af1

Please sign in to comment.