diff --git a/api/v1alpha1/obcluster_webhook.go b/api/v1alpha1/obcluster_webhook.go index ff5d9885f..3053b4d5e 100644 --- a/api/v1alpha1/obcluster_webhook.go +++ b/api/v1alpha1/obcluster_webhook.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" apitypes "github.com/oceanbase/ob-operator/api/types" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" ) @@ -61,7 +62,7 @@ func (r *OBCluster) Default() { parameterMap := make(map[string]apitypes.Parameter, 0) memorySize, ok := r.Spec.OBServerTemplate.Resource.Memory.AsInt64() if ok { - memoryLimit := fmt.Sprintf("%dM", memorySize*oceanbaseconst.DefaultMemoryLimitPercent/100/oceanbaseconst.MegaConverter) + memoryLimit := fmt.Sprintf("%dM", memorySize*int64(obcfg.GetConfig().Resource.DefaultMemoryLimitPercent)/100/oceanbaseconst.MegaConverter) parameterMap["memory_limit"] = apitypes.Parameter{ Name: "memory_limit", Value: memoryLimit, @@ -71,12 +72,12 @@ func (r *OBCluster) Default() { } datafileDiskSize, ok := r.Spec.OBServerTemplate.Storage.DataStorage.Size.AsInt64() if ok { - datafileMaxSize := fmt.Sprintf("%dG", datafileDiskSize*oceanbaseconst.DefaultDiskUsePercent/oceanbaseconst.GigaConverter/100) + datafileMaxSize := fmt.Sprintf("%dG", datafileDiskSize*int64(obcfg.GetConfig().Resource.DefaultDiskUsePercent)/oceanbaseconst.GigaConverter/100) parameterMap["datafile_maxsize"] = apitypes.Parameter{ Name: "datafile_maxsize", Value: datafileMaxSize, } - datafileNextSize := fmt.Sprintf("%dG", datafileDiskSize*oceanbaseconst.DefaultDiskExpandPercent/oceanbaseconst.GigaConverter/100) + datafileNextSize := fmt.Sprintf("%dG", datafileDiskSize*int64(obcfg.GetConfig().Resource.DefaultDiskExpandPercent)/oceanbaseconst.GigaConverter/100) parameterMap["datafile_next"] = apitypes.Parameter{ Name: "datafile_next", Value: datafileNextSize, @@ -92,7 +93,7 @@ func (r *OBCluster) Default() { logSize, ok := r.Spec.OBServerTemplate.Storage.LogStorage.Size.AsInt64() if ok { // observer has 4 types of log and one logfile limits at 256M considering about wf, maximum of 2G will be occupied for 1 syslog count - maxSysLogFileCount = logSize * oceanbaseconst.DefaultLogPercent / oceanbaseconst.GigaConverter / 100 / 2 + maxSysLogFileCount = logSize * int64(obcfg.GetConfig().Resource.DefaultLogPercent) / oceanbaseconst.GigaConverter / 100 / 2 } parameterMap["max_syslog_file_count"] = apitypes.Parameter{ Name: "max_syslog_file_count", @@ -275,18 +276,17 @@ func (r *OBCluster) validateMutation() error { } // Validate disk size - if r.Spec.OBServerTemplate.Storage.DataStorage.Size.AsApproximateFloat64() < oceanbaseconst.MinDataDiskSize.AsApproximateFloat64() { + if r.Spec.OBServerTemplate.Storage.DataStorage.Size.Cmp(resource.MustParse(obcfg.GetConfig().Resource.MinDataDiskSize)) < 0 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("dataStorage").Child("size"), r.Spec.OBServerTemplate.Storage.DataStorage.Size.String(), "The minimum data storage size of OBCluster is "+oceanbaseconst.MinDataDiskSize.String())) } - if r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.AsApproximateFloat64() < oceanbaseconst.MinRedoLogDiskSize.AsApproximateFloat64() { + if r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.Cmp(resource.MustParse(obcfg.GetConfig().Resource.MinRedoLogDiskSize)) < 0 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("redoLogStorage").Child("size"), r.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.String(), "The minimum redo log storage size of OBCluster is "+oceanbaseconst.MinRedoLogDiskSize.String())) } - if r.Spec.OBServerTemplate.Storage.LogStorage.Size.AsApproximateFloat64() < oceanbaseconst.MinLogDiskSize.AsApproximateFloat64() { + if r.Spec.OBServerTemplate.Storage.LogStorage.Size.Cmp(resource.MustParse(obcfg.GetConfig().Resource.MinLogDiskSize)) < 0 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("observer").Child("storage").Child("logStorage").Child("size"), r.Spec.OBServerTemplate.Storage.LogStorage.Size.String(), "The minimum log storage size of OBCluster is "+oceanbaseconst.MinLogDiskSize.String())) } - // Validate memory size - if r.Spec.OBServerTemplate.Resource.Memory.AsApproximateFloat64() < oceanbaseconst.MinMemorySize.AsApproximateFloat64() { + if r.Spec.OBServerTemplate.Resource.Memory.Cmp(resource.MustParse(obcfg.GetConfig().Resource.MinMemorySize)) < 0 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("observer").Child("resource").Child("memory"), r.Spec.OBServerTemplate.Resource.Memory.String(), "The minimum memory size of OBCluster is "+oceanbaseconst.MinMemorySize.String())) } diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 8088b6039..30cae0957 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -33,9 +33,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" "github.com/oceanbase/ob-operator/internal/controller" "github.com/oceanbase/ob-operator/internal/controller/config" "github.com/oceanbase/ob-operator/internal/telemetry" + "github.com/oceanbase/ob-operator/pkg/coordinator" ) var ( @@ -86,6 +88,10 @@ func main() { }, } + cfg := obcfg.GetConfig() + coordinator.SetMaxRetryTimes(cfg.Time.TaskMaxRetryTimes) + coordinator.SetRetryBackoffThreshold(cfg.Time.TaskRetryBackoffThreshold) + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ diff --git a/internal/config/operator/types.go b/internal/config/operator/types.go index 28f2d307a..5174b1fda 100644 --- a/internal/config/operator/types.go +++ b/internal/config/operator/types.go @@ -15,7 +15,8 @@ package operator import "github.com/spf13/viper" type Config struct { - v *viper.Viper + v *viper.Viper + Manager Manager `mapstructure:",squash" yaml:"manager"` Database Database `mapstructure:"database" yaml:"database"` Task Task `mapstructure:"task" yaml:"task"` diff --git a/internal/const/oceanbase/resource.go b/internal/const/oceanbase/resource.go index fc7d43807..d6400dd2a 100644 --- a/internal/const/oceanbase/resource.go +++ b/internal/const/oceanbase/resource.go @@ -20,8 +20,11 @@ const ( InitialDataDiskUsePercent = 20 DefaultDiskUsePercent = 95 DefaultMemoryLimitPercent = 90 - GigaConverter = 1 << 30 - MegaConverter = 1 << 20 +) + +const ( + GigaConverter = 1 << 30 + MegaConverter = 1 << 20 ) const ( diff --git a/internal/resource/obcluster/obcluster_task.go b/internal/resource/obcluster/obcluster_task.go index e78994b16..1fcab8426 100644 --- a/internal/resource/obcluster/obcluster_task.go +++ b/internal/resource/obcluster/obcluster_task.go @@ -33,6 +33,7 @@ import ( apitypes "github.com/oceanbase/ob-operator/api/types" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" obagentconst "github.com/oceanbase/ob-operator/internal/const/obagent" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" zonestatus "github.com/oceanbase/ob-operator/internal/const/status/obzone" @@ -54,7 +55,7 @@ func WaitOBZoneTopologyMatch(_ *OBClusterManager) tasktypes.TaskError { func WaitOBZoneDeleted(m *OBClusterManager) tasktypes.TaskError { waitSuccess := false - for i := 1; i < oceanbaseconst.ServerDeleteTimeoutSeconds; i++ { + for i := 1; i < obcfg.GetConfig().Time.ServerDeleteTimeoutSeconds; i++ { obcluster, err := m.getOBCluster() if err != nil { return errors.Wrap(err, "get obcluster failed") @@ -212,11 +213,11 @@ func Bootstrap(m *OBClusterManager) tasktypes.TaskError { return errors.Wrap(err, "no obzone belongs to this cluster") } var manager *operation.OceanbaseOperationManager - for i := 0; i < oceanbaseconst.GetConnectionMaxRetries; i++ { + for i := 0; i < obcfg.GetConfig().Time.GetConnectionMaxRetries; i++ { manager, err = m.getOceanbaseOperationManager() if err != nil || manager == nil { m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Get oceanbase operation manager failed") - time.Sleep(time.Second * oceanbaseconst.CheckConnectionInterval) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.CheckConnectionInterval)) } else { m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Successfully got oceanbase operation manager") break @@ -393,7 +394,7 @@ func ValidateUpgradeInfo(m *OBClusterManager) tasktypes.TaskError { return false, errors.Wrap(err, "Failed to run validate job") } } - err = resourceutils.CheckJobWithTimeout(check, time.Second*oceanbaseconst.WaitForJobTimeoutSeconds) + err = resourceutils.CheckJobWithTimeout(check, time.Second*time.Duration(obcfg.GetConfig().Time.WaitForJobTimeoutSeconds)) if err != nil { return errors.Wrap(err, "Failed to run validate job") } @@ -535,7 +536,7 @@ func ModifySysTenantReplica(m *OBClusterManager) tasktypes.TaskError { if err != nil { return errors.Wrapf(err, "Failed to set sys locality to %s", locality) } - err = oceanbaseOperationManager.WaitTenantLocalityChangeFinished(oceanbaseconst.SysTenant, oceanbaseconst.LocalityChangeTimeoutSeconds) + err = oceanbaseOperationManager.WaitTenantLocalityChangeFinished(oceanbaseconst.SysTenant, obcfg.GetConfig().Time.LocalityChangeTimeoutSeconds) if err != nil { return errors.Wrapf(err, "Locality change to %s not finished after timeout", locality) } @@ -561,7 +562,7 @@ func ModifySysTenantReplica(m *OBClusterManager) tasktypes.TaskError { if err != nil { return errors.Wrapf(err, "Failed to set sys locality to %s", locality) } - err = oceanbaseOperationManager.WaitTenantLocalityChangeFinished(oceanbaseconst.SysTenant, oceanbaseconst.LocalityChangeTimeoutSeconds) + err = oceanbaseOperationManager.WaitTenantLocalityChangeFinished(oceanbaseconst.SysTenant, obcfg.GetConfig().Time.LocalityChangeTimeoutSeconds) if err != nil { return errors.Wrapf(err, "Locality change to %s not finished after timeout", locality) } @@ -791,7 +792,7 @@ outerLoop: } if len(podList.Items) == 0 { m.Logger.V(oceanbaseconst.LogLevelDebug).Info("No pod found for check image pull job") - time.Sleep(time.Second * oceanbaseconst.CheckJobInterval) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.CheckJobInterval)) continue } pod := podList.Items[0] @@ -813,7 +814,7 @@ outerLoop: default: m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Container is waiting", "reason", containerStatus.State.Waiting.Reason, "message", containerStatus.State.Waiting.Message) } - time.Sleep(time.Second * oceanbaseconst.CheckJobInterval) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.CheckJobInterval)) continue outerLoop } else if containerStatus.State.Running != nil || containerStatus.State.Terminated != nil { m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Container is running or terminated") @@ -929,23 +930,23 @@ func CheckMigration(m *OBClusterManager) tasktypes.TaskError { } func ScaleUpOBZones(m *OBClusterManager) tasktypes.TaskError { - return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenScaling, zonestatus.ScaleUp, oceanbaseconst.DefaultStateWaitTimeout)() + return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenScaling, zonestatus.ScaleUp, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func ExpandPVC(m *OBClusterManager) tasktypes.TaskError { - return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenExpandingPVC, zonestatus.ExpandPVC, oceanbaseconst.DefaultStateWaitTimeout)() + return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenExpandingPVC, zonestatus.ExpandPVC, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func MountBackupVolume(m *OBClusterManager) tasktypes.TaskError { - return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenMountingBackupVolume, zonestatus.MountBackupVolume, oceanbaseconst.DefaultStateWaitTimeout)() + return m.modifyOBZonesAndCheckStatus(m.changeZonesWhenMountingBackupVolume, zonestatus.MountBackupVolume, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitOBZoneBootstrapReady(m *OBClusterManager) tasktypes.TaskError { - return m.generateWaitOBZoneStatusFunc(zonestatus.BootstrapReady, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBZoneStatusFunc(zonestatus.BootstrapReady, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitOBZoneRunning(m *OBClusterManager) tasktypes.TaskError { - return m.generateWaitOBZoneStatusFunc(zonestatus.Running, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBZoneStatusFunc(zonestatus.Running, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func CheckEnvironment(m *OBClusterManager) tasktypes.TaskError { diff --git a/internal/resource/obcluster/utils.go b/internal/resource/obcluster/utils.go index 9af1dd3c1..1a437fb9e 100644 --- a/internal/resource/obcluster/utils.go +++ b/internal/resource/obcluster/utils.go @@ -25,6 +25,7 @@ import ( apitypes "github.com/oceanbase/ob-operator/api/types" "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" zonestatus "github.com/oceanbase/ob-operator/internal/const/status/obzone" resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils" @@ -390,7 +391,7 @@ func (m *OBClusterManager) WaitOBZoneUpgradeFinished(zoneName string) error { } return false, nil } - err := resourceutils.CheckJobWithTimeout(check, time.Second*oceanbaseconst.WaitForJobTimeoutSeconds) + err := resourceutils.CheckJobWithTimeout(check, time.Second*time.Duration(obcfg.GetConfig().Time.WaitForJobTimeoutSeconds)) if err != nil { return errors.Wrap(err, "Timeout to wait obzone upgrade finished") } diff --git a/internal/resource/observer/observer_task.go b/internal/resource/observer/observer_task.go index a6542b85a..8f4ffb174 100644 --- a/internal/resource/observer/observer_task.go +++ b/internal/resource/observer/observer_task.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" apitypes "github.com/oceanbase/ob-operator/api/types" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" podconst "github.com/oceanbase/ob-operator/internal/const/pod" clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster" @@ -82,7 +83,7 @@ func AddServer(m *OBServerManager) tasktypes.TaskError { } func WaitOBClusterBootstrapped(m *OBServerManager) tasktypes.TaskError { - for i := 0; i < oceanbaseconst.BootstrapTimeoutSeconds; i++ { + for i := 0; i < obcfg.GetConfig().Time.BootstrapTimeoutSeconds; i++ { obcluster, err := m.getOBCluster() if err != nil { return errors.Wrap(err, "Get obcluster from K8s") @@ -285,7 +286,7 @@ func UpgradeOBServerImage(m *OBServerManager) tasktypes.TaskError { func WaitOBServerPodReady(m *OBServerManager) tasktypes.TaskError { observerPodRestarted := false - for i := 0; i < oceanbaseconst.DefaultStateWaitTimeout; i++ { + for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ { observerPod, err := m.getPod() if err != nil { return errors.Wrapf(err, "Failed to get pod of observer %s", m.OBServer.Name) @@ -320,7 +321,7 @@ func WaitOBServerActiveInCluster(m *OBServerManager) tasktypes.TaskError { Port: oceanbaseconst.RpcPort, } active := false - for i := 0; i < oceanbaseconst.DefaultStateWaitTimeout; i++ { + for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ { operationManager, err := m.getOceanbaseOperationManager() if err != nil { return errors.Wrapf(err, "Get oceanbase operation manager failed") @@ -355,7 +356,7 @@ func WaitOBServerDeletedInCluster(m *OBServerManager) tasktypes.TaskError { Port: oceanbaseconst.RpcPort, } deleted := false - for i := 0; i < oceanbaseconst.ServerDeleteTimeoutSeconds; i++ { + for i := 0; i < obcfg.GetConfig().Time.ServerDeleteTimeoutSeconds; i++ { operationManager, err := m.getOceanbaseOperationManager() if err != nil { return errors.Wrapf(err, "Get oceanbase operation manager failed") @@ -395,7 +396,7 @@ func DeletePod(m *OBServerManager) tasktypes.TaskError { func WaitForPodDeleted(m *OBServerManager) tasktypes.TaskError { m.Logger.Info("Wait for observer pod being deleted") - for i := 0; i < oceanbaseconst.DefaultStateWaitTimeout; i++ { + for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ { time.Sleep(time.Second) err := m.Client.Get(m.Ctx, m.generateNamespacedName(m.OBServer.Name), &corev1.Pod{}) if err != nil && kubeerrors.IsNotFound(err) { @@ -448,7 +449,7 @@ func ExpandPVC(m *OBServerManager) tasktypes.TaskError { func WaitForPVCResized(m *OBServerManager) tasktypes.TaskError { outer: - for i := 0; i < oceanbaseconst.DefaultStateWaitTimeout; i++ { + for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ { time.Sleep(time.Second) observerPVC, err := m.getPVCs() diff --git a/internal/resource/observer/utils.go b/internal/resource/observer/utils.go index 15f741141..9b4fe0c8e 100644 --- a/internal/resource/observer/utils.go +++ b/internal/resource/observer/utils.go @@ -27,6 +27,7 @@ import ( apitypes "github.com/oceanbase/ob-operator/api/types" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" obagentconst "github.com/oceanbase/ob-operator/internal/const/obagent" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" secretconst "github.com/oceanbase/ob-operator/internal/const/secret" @@ -413,8 +414,8 @@ func (m *OBServerManager) createOBServerContainer(obcluster *v1alpha1.OBCluster) readinessProbeTCP.Port = intstr.FromInt(oceanbaseconst.SqlPort) readinessProbe := corev1.Probe{} readinessProbe.ProbeHandler.TCPSocket = &readinessProbeTCP - readinessProbe.PeriodSeconds = oceanbaseconst.ProbeCheckPeriodSeconds - readinessProbe.InitialDelaySeconds = oceanbaseconst.ProbeCheckDelaySeconds + readinessProbe.PeriodSeconds = int32(obcfg.GetConfig().Time.ProbeCheckPeriodSeconds) + readinessProbe.InitialDelaySeconds = int32(obcfg.GetConfig().Time.ProbeCheckDelaySeconds) readinessProbe.FailureThreshold = 32 startOBServerCmd := "/home/admin/oceanbase/bin/oceanbase-helper start" @@ -445,7 +446,7 @@ func (m *OBServerManager) createOBServerContainer(obcluster *v1alpha1.OBCluster) } envDataFile := corev1.EnvVar{ Name: "DATAFILE_SIZE", - Value: fmt.Sprintf("%dG", datafileSize*oceanbaseconst.InitialDataDiskUsePercent/oceanbaseconst.GigaConverter/100), + Value: fmt.Sprintf("%dG", datafileSize*int64(obcfg.GetConfig().Resource.InitialDataDiskUsePercent)/oceanbaseconst.GigaConverter/100), } clogDiskSize, ok := m.OBServer.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.AsInt64() if !ok { @@ -453,7 +454,7 @@ func (m *OBServerManager) createOBServerContainer(obcluster *v1alpha1.OBCluster) } envLogDisk := corev1.EnvVar{ Name: "LOG_DISK_SIZE", - Value: fmt.Sprintf("%dG", clogDiskSize*oceanbaseconst.DefaultDiskUsePercent/oceanbaseconst.GigaConverter/100), + Value: fmt.Sprintf("%dG", clogDiskSize*int64(obcfg.GetConfig().Resource.DefaultDiskUsePercent)/oceanbaseconst.GigaConverter/100), } envClusterName := corev1.EnvVar{ Name: "CLUSTER_NAME", diff --git a/internal/resource/obtenant/obtenant_task.go b/internal/resource/obtenant/obtenant_task.go index e060c1ae6..bca636047 100644 --- a/internal/resource/obtenant/obtenant_task.go +++ b/internal/resource/obtenant/obtenant_task.go @@ -27,6 +27,7 @@ import ( "github.com/oceanbase/ob-operator/api/constants" "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils" "github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/const/status/tenant" @@ -352,7 +353,7 @@ func WatchRestoreJobToFinish(m *OBTenantManager) tasktypes.TaskError { return false, nil } // Tenant restoring is in common quite a slow process, so we need to wait for a longer time - err = resourceutils.CheckJobWithTimeout(check, time.Second*oceanbaseconst.LocalityChangeTimeoutSeconds) + err = resourceutils.CheckJobWithTimeout(check, time.Second*time.Duration(obcfg.GetConfig().Time.LocalityChangeTimeoutSeconds)) if err != nil { return errors.Wrap(err, "Failed to wait for restore job to finish") } @@ -420,7 +421,7 @@ func UpgradeTenantIfNeeded(m *OBTenantManager) tasktypes.TaskError { if err != nil { return err } - maxWait5secTimes := oceanbaseconst.DefaultStateWaitTimeout/5 + 1 + maxWait5secTimes := obcfg.GetConfig().Time.DefaultStateWaitTimeout/5 + 1 outer: for i := 0; i < maxWait5secTimes; i++ { time.Sleep(5 * time.Second) diff --git a/internal/resource/obtenantoperation/obtenantoperation_task.go b/internal/resource/obtenantoperation/obtenantoperation_task.go index 561bfc17c..dadbd9f0a 100644 --- a/internal/resource/obtenantoperation/obtenantoperation_task.go +++ b/internal/resource/obtenantoperation/obtenantoperation_task.go @@ -22,6 +22,7 @@ import ( "github.com/oceanbase/ob-operator/api/constants" "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" obtenantresource "github.com/oceanbase/ob-operator/internal/resource/obtenant" resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils" @@ -93,7 +94,7 @@ func CreateUsersForActivatedStandby(m *ObTenantOperationManager) tasktypes.TaskE } // Wait for the tenant to be ready - maxRetry := oceanbaseconst.TenantOpRetryTimes + maxRetry := obcfg.GetConfig().Time.TenantOpRetryTimes counter := 0 for counter < maxRetry { tenants, err := con.ListTenantWithName(m.Resource.Status.PrimaryTenant.Spec.TenantName) @@ -107,7 +108,7 @@ func CreateUsersForActivatedStandby(m *ObTenantOperationManager) tasktypes.TaskE if t.TenantType == "USER" && t.TenantRole == "PRIMARY" && t.SwitchoverStatus == "NORMAL" { break } - time.Sleep(oceanbaseconst.TenantOpRetryGapSeconds * time.Second) + time.Sleep(time.Duration(obcfg.GetConfig().Time.TenantOpRetryGapSeconds) * time.Second) counter++ } if counter >= maxRetry { @@ -145,7 +146,7 @@ func SwitchTenantsRole(m *ObTenantOperationManager) tasktypes.TaskError { if err != nil { return err } - maxRetry := oceanbaseconst.TenantOpRetryTimes + maxRetry := obcfg.GetConfig().Time.TenantOpRetryTimes counter := 0 for counter < maxRetry { primary, err := con.ListTenantWithName(m.Resource.Status.PrimaryTenant.Spec.TenantName) @@ -157,7 +158,7 @@ func SwitchTenantsRole(m *ObTenantOperationManager) tasktypes.TaskError { } p := primary[0] if p.TenantRole != "STANDBY" || p.SwitchoverStatus != "NORMAL" { - time.Sleep(oceanbaseconst.TenantOpRetryGapSeconds * time.Second) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.TenantOpRetryGapSeconds)) counter++ } else { break @@ -185,7 +186,7 @@ func SwitchTenantsRole(m *ObTenantOperationManager) tasktypes.TaskError { } s := standby[0] if s.TenantRole != "PRIMARY" || s.SwitchoverStatus != "NORMAL" { - time.Sleep(oceanbaseconst.TenantOpRetryGapSeconds * time.Second) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.TenantOpRetryGapSeconds)) counter++ } else { break @@ -290,7 +291,7 @@ func UpgradeTenant(m *ObTenantOperationManager) tasktypes.TaskError { if err != nil { return err } - maxWait5secTimes := oceanbaseconst.DefaultStateWaitTimeout/5 + 1 + maxWait5secTimes := obcfg.GetConfig().Time.DefaultStateWaitTimeout/5 + 1 outer: for i := 0; i < maxWait5secTimes; i++ { time.Sleep(5 * time.Second) diff --git a/internal/resource/obzone/obzone_task.go b/internal/resource/obzone/obzone_task.go index 5834a5d6e..d3f709fbe 100644 --- a/internal/resource/obzone/obzone_task.go +++ b/internal/resource/obzone/obzone_task.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/util/retry" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" serverstatus "github.com/oceanbase/ob-operator/internal/const/status/observer" resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils" @@ -174,7 +175,7 @@ func DeleteAllOBServer(m *OBZoneManager) tasktypes.TaskError { func WaitReplicaMatch(m *OBZoneManager) tasktypes.TaskError { matched := false - for i := 0; i < oceanbaseconst.ServerDeleteTimeoutSeconds; i++ { + for i := 0; i < obcfg.GetConfig().Time.ServerDeleteTimeoutSeconds; i++ { obzone, err := m.getOBZone() if err != nil { m.Logger.Error(err, "Get obzone from K8s failed") @@ -196,7 +197,7 @@ func WaitReplicaMatch(m *OBZoneManager) tasktypes.TaskError { func WaitOBServerDeleted(m *OBZoneManager) tasktypes.TaskError { matched := false - for i := 0; i < oceanbaseconst.ServerDeleteTimeoutSeconds; i++ { + for i := 0; i < obcfg.GetConfig().Time.ServerDeleteTimeoutSeconds; i++ { obzone, err := m.getOBZone() if err != nil { m.Logger.Error(err, "Get obzone from K8s failed") @@ -265,7 +266,7 @@ func UpgradeOBServer(m *OBZoneManager) tasktypes.TaskError { } func WaitOBServerUpgraded(m *OBZoneManager) tasktypes.TaskError { - for i := 0; i < oceanbaseconst.TimeConsumingStateWaitTimeout; i++ { + for i := 0; i < obcfg.GetConfig().Time.TimeConsumingStateWaitTimeout; i++ { observerList, err := m.listOBServers() if err != nil { m.Logger.Error(err, "List observers failed") @@ -283,7 +284,7 @@ func WaitOBServerUpgraded(m *OBZoneManager) tasktypes.TaskError { m.Logger.Info("All server upgraded") return nil } - time.Sleep(oceanbaseconst.CommonCheckInterval * time.Second) + time.Sleep(time.Duration(obcfg.GetConfig().Time.CommonCheckInterval) * time.Second) } return errors.New("Wait all server upgraded timeout") } @@ -404,21 +405,21 @@ func DeleteLegacyOBServers(m *OBZoneManager) tasktypes.TaskError { } func WaitOBServerBootstrapReady(m *OBZoneManager) tasktypes.TaskError { - return m.generateWaitOBServerStatusFunc(serverstatus.BootstrapReady, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBServerStatusFunc(serverstatus.BootstrapReady, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitOBServerRunning(m *OBZoneManager) tasktypes.TaskError { - return m.generateWaitOBServerStatusFunc(serverstatus.Running, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBServerStatusFunc(serverstatus.Running, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitForOBServerScalingUp(m *OBZoneManager) tasktypes.TaskError { - return m.generateWaitOBServerStatusFunc(serverstatus.ScaleUp, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBServerStatusFunc(serverstatus.ScaleUp, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitForOBServerExpandingPVC(m *OBZoneManager) tasktypes.TaskError { - return m.generateWaitOBServerStatusFunc(serverstatus.ExpandPVC, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBServerStatusFunc(serverstatus.ExpandPVC, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } func WaitForOBServerMounting(m *OBZoneManager) tasktypes.TaskError { - return m.generateWaitOBServerStatusFunc(serverstatus.MountBackupVolume, oceanbaseconst.DefaultStateWaitTimeout)() + return m.generateWaitOBServerStatusFunc(serverstatus.MountBackupVolume, obcfg.GetConfig().Time.DefaultStateWaitTimeout)() } diff --git a/internal/resource/utils/jobs.go b/internal/resource/utils/jobs.go index 7a38520f8..0b1ce6414 100644 --- a/internal/resource/utils/jobs.go +++ b/internal/resource/utils/jobs.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/oceanbase/ob-operator/api/v1alpha1" + obcfg "github.com/oceanbase/ob-operator/internal/config/operator" cmdconst "github.com/oceanbase/ob-operator/internal/const/cmd" oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" k8sclient "github.com/oceanbase/ob-operator/pkg/k8s/client" @@ -90,7 +91,7 @@ func RunJob(ctx context.Context, c client.Client, logger *logr.Logger, namespace } var jobObject *batchv1.Job - for i := 0; i < oceanbaseconst.CheckJobMaxRetries; i++ { + for i := 0; i < obcfg.GetConfig().Time.CheckJobMaxRetries; i++ { jobObject, err = GetJob(ctx, c, namespace, fullJobName) if err != nil { logger.Error(err, "Failed to get job") @@ -102,7 +103,7 @@ func RunJob(ctx context.Context, c client.Client, logger *logr.Logger, namespace logger.V(oceanbaseconst.LogLevelDebug).Info("Job finished") break } - time.Sleep(time.Second * oceanbaseconst.CheckJobInterval) + time.Sleep(time.Second * time.Duration(obcfg.GetConfig().Time.CheckJobInterval)) } clientSet := k8sclient.GetClient() podList, err := clientSet.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ @@ -215,7 +216,7 @@ func ExecuteUpgradeScript(ctx context.Context, c client.Client, logger *logr.Log return false, errors.Wrap(err, "Failed to run upgrade script job") } } - err = CheckJobWithTimeout(check, time.Second*oceanbaseconst.WaitForJobTimeoutSeconds) + err = CheckJobWithTimeout(check, time.Second*time.Duration(obcfg.GetConfig().Time.WaitForJobTimeoutSeconds)) if err != nil { return errors.Wrap(err, "Failed to wait for job to finish") } @@ -229,8 +230,8 @@ type CheckJobFunc func() (bool, error) // Second parameter is the timeout duration, default to 1800s. // Third parameter is the interval to check job status, default to 3s. func CheckJobWithTimeout(f CheckJobFunc, ds ...time.Duration) error { - timeout := time.Second * oceanbaseconst.DefaultStateWaitTimeout - interval := time.Second * oceanbaseconst.CheckJobInterval + timeout := time.Second * time.Duration(obcfg.GetConfig().Time.DefaultStateWaitTimeout) + interval := time.Second * time.Duration(obcfg.GetConfig().Time.CheckJobInterval) if len(ds) > 0 { timeout = ds[0] } diff --git a/pkg/coordinator/config.go b/pkg/coordinator/config.go new file mode 100644 index 000000000..0c46282df --- /dev/null +++ b/pkg/coordinator/config.go @@ -0,0 +1,39 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package coordinator + +import "time" + +type config struct { + NormalRequeueDuration time.Duration + ExecutionRequeueDuration time.Duration + + TaskMaxRetryTimes int + TaskRetryBackoffThreshold int +} + +var cfg = &config{ + NormalRequeueDuration: 30 * time.Second, + ExecutionRequeueDuration: 1 * time.Second, + + TaskMaxRetryTimes: 99, + TaskRetryBackoffThreshold: 16, +} + +func SetMaxRetryTimes(maxRetryTimes int) { + cfg.TaskMaxRetryTimes = maxRetryTimes +} + +func SetRetryBackoffThreshold(retryBackoffThreshold int) { + cfg.TaskRetryBackoffThreshold = retryBackoffThreshold +} diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index 945a8c755..707ba5b62 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -13,26 +13,16 @@ See the Mulan PSL v2 for more details. package coordinator import ( - "time" - "github.com/go-logr/logr" "github.com/pkg/errors" ctrl "sigs.k8s.io/controller-runtime" - obconst "github.com/oceanbase/ob-operator/internal/const/oceanbase" "github.com/oceanbase/ob-operator/pkg/task" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status" "github.com/oceanbase/ob-operator/pkg/task/const/strategy" tasktypes "github.com/oceanbase/ob-operator/pkg/task/types" ) -const ( - // If no task flow, requeue after 30 sec. - NormalRequeueDuration = 30 * time.Second - // In task flow, requeue after 1 sec. - ExecutionRequeueDuration = 1 * time.Second -) - type Coordinator struct { Manager ResourceManager Logger *logr.Logger @@ -56,7 +46,7 @@ func NewCoordinator(m ResourceManager, logger *logr.Logger) *Coordinator { // will be requeued using exponential backoff. func (c *Coordinator) Coordinate() (ctrl.Result, error) { result := ctrl.Result{ - RequeueAfter: ExecutionRequeueDuration, + RequeueAfter: cfg.ExecutionRequeueDuration, } var f *tasktypes.TaskFlow var err error @@ -69,16 +59,16 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) { return result, errors.Wrap(err, "Get task flow") } else if f == nil { // No need to execute task flow - result.RequeueAfter = NormalRequeueDuration + result.RequeueAfter = cfg.NormalRequeueDuration } else { - c.Logger.V(obconst.LogLevelDebug).Info("Set operation context", "operation context", f.OperationContext) + c.Logger.V(1).Info("Set operation context", "operation context", f.OperationContext) c.Manager.SetOperationContext(f.OperationContext) // execution errors reflects by task status c.executeTaskFlow(f) // if task status is `failed`, requeue after 2 ^ min(retryCount, threshold) * 500ms. // maximum backoff time is about 2 hrs with 14 as threshold. if f.OperationContext.OnFailure.RetryCount > 0 && f.OperationContext.TaskStatus == taskstatus.Failed { - result.RequeueAfter = ExecutionRequeueDuration * (1 << Min(f.OperationContext.OnFailure.RetryCount, obconst.TaskRetryBackoffThreshold)) + result.RequeueAfter = cfg.ExecutionRequeueDuration * (1 << Min(f.OperationContext.OnFailure.RetryCount, cfg.TaskRetryBackoffThreshold)) } } } @@ -88,7 +78,7 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) { if err != nil { return result, errors.Wrapf(err, "Check and update finalizer failed") } - result.RequeueAfter = ExecutionRequeueDuration + result.RequeueAfter = cfg.ExecutionRequeueDuration } err = c.cleanTaskResultMap(f) if err != nil { @@ -100,9 +90,9 @@ func (c *Coordinator) Coordinate() (ctrl.Result, error) { } // When status changes(e.g. from running to other status), set a shorter `requeue after` to speed up processing. if c.Manager.GetStatus() != beforeStatus { - result.RequeueAfter = ExecutionRequeueDuration + result.RequeueAfter = cfg.ExecutionRequeueDuration } - c.Logger.V(obconst.LogLevelTrace).Info("Requeue after", "duration", result.RequeueAfter) + c.Logger.V(2).Info("Requeue after", "duration", result.RequeueAfter) return result, err } @@ -122,9 +112,9 @@ func (c *Coordinator) executeTaskFlow(f *tasktypes.TaskFlow) { c.Logger.Error(err, "No executable function found for task") c.Manager.PrintErrEvent(err) } else { - c.Logger.V(obconst.LogLevelDebug).Info("Successfully get task func " + f.OperationContext.Task.Display()) + c.Logger.V(1).Info("Successfully get task func " + f.OperationContext.Task.Display()) taskId := task.GetTaskManager().Submit(taskFunc) - c.Logger.V(obconst.LogLevelDebug).Info("Successfully submit task", "taskId", taskId) + c.Logger.V(1).Info("Successfully submit task", "taskId", taskId) f.OperationContext.TaskId = taskId f.OperationContext.TaskStatus = taskstatus.Running } @@ -137,7 +127,7 @@ func (c *Coordinator) executeTaskFlow(f *tasktypes.TaskFlow) { c.Manager.PrintErrEvent(err) f.OperationContext.TaskStatus = taskstatus.Failed } else if taskResult != nil { - c.Logger.V(obconst.LogLevelDebug).Info("Task finished", "task id", f.OperationContext.TaskId, "task result", taskResult) + c.Logger.V(1).Info("Task finished", "task id", f.OperationContext.TaskId, "task result", taskResult) f.OperationContext.TaskStatus = taskResult.Status if taskResult.Error != nil { c.Manager.PrintErrEvent(taskResult.Error) @@ -155,7 +145,7 @@ func (c *Coordinator) executeTaskFlow(f *tasktypes.TaskFlow) { switch f.OperationContext.OnFailure.Strategy { case strategy.RetryFromCurrent, strategy.StartOver: // if strategy is retry or start over, limit the maximum retry times - maxRetry := obconst.TaskMaxRetryTimes + maxRetry := cfg.TaskMaxRetryTimes if f.OperationContext.OnFailure.MaxRetry != 0 { maxRetry = f.OperationContext.OnFailure.MaxRetry }