Skip to content

Commit

Permalink
merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-sun-star committed Mar 12, 2024
2 parents ee72b49 + 0f6abb5 commit 674442a
Show file tree
Hide file tree
Showing 42 changed files with 882 additions and 543 deletions.
2 changes: 1 addition & 1 deletion internal/resource/_template/template_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (m *ObResourceManager[T]) GetTaskFlow() (*tasktypes.TaskFlow, error) {
}

func (m *ObResourceManager[T]) PrintErrEvent(err error) {
m.Recorder.Event(m.Resource, corev1.EventTypeWarning, "task exec failed", err.Error())
m.Recorder.Event(m.Resource, corev1.EventTypeWarning, "Task failed", err.Error())
}

func (m *ObResourceManager[T]) ArchiveResource() {
Expand Down
17 changes: 7 additions & 10 deletions internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *OBClusterManager) GetStatus() string {
}

func (m *OBClusterManager) InitStatus() {
m.Logger.Info("newly created cluster, init status")
m.Logger.Info("Newly created cluster, init status")
m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status")
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
initialStatus := clusterstatus.New
Expand All @@ -74,15 +74,15 @@ func (m *OBClusterManager) SetOperationContext(c *tasktypes.OperationContext) {
func (m *OBClusterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
// exists unfinished task flow, return the last task flow
if m.OBCluster.Status.OperationContext != nil {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("get task flow from obcluster status")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Get task flow from obcluster status")
return tasktypes.NewTaskFlow(m.OBCluster.Status.OperationContext), nil
}
// return task flow depends on status

// newly created cluster
var taskFlow *tasktypes.TaskFlow
var err error
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("create task flow according to obcluster status")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obcluster status")
switch m.OBCluster.Status.Status {
// create obcluster, return taskFlow to bootstrap obcluster
case clusterstatus.MigrateFromExisting:
Expand All @@ -109,7 +109,7 @@ func (m *OBClusterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
case clusterstatus.MountBackupVolume:
taskFlow, err = task.GetRegistry().Get(fMountBackupVolume)
default:
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("no need to run anything for obcluster", "obcluster", m.OBCluster.Name)
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("No need to run anything for obcluster", "obcluster", m.OBCluster.Name)
return nil, nil
}

Expand Down Expand Up @@ -140,9 +140,6 @@ func (m *OBClusterManager) CheckAndUpdateFinalizers() error {
}

func (m *OBClusterManager) UpdateStatus() error {
if m.OBCluster.Status.Status == "Failed" {
return nil
}
// update obzone status
obzoneList, err := m.listOBZones()
if err != nil {
Expand All @@ -157,7 +154,7 @@ func (m *OBClusterManager) UpdateStatus() error {
Status: obzone.Status.Status,
})
if obzone.Status.Image != m.OBCluster.Spec.OBServerTemplate.Image {
m.Logger.Info("obzone still not sync")
m.Logger.Info("OBZone still not sync")
allZoneVersionSync = false
}
}
Expand Down Expand Up @@ -247,7 +244,7 @@ func (m *OBClusterManager) UpdateStatus() error {
}
}
}
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("update obcluster status", "status", m.OBCluster.Status)
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Update obcluster status", "status", m.OBCluster.Status)
err = m.retryUpdateStatus()
if err != nil {
m.Logger.Error(err, "Got error when update obcluster status")
Expand Down Expand Up @@ -347,7 +344,7 @@ func (m *OBClusterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskF
}

func (m *OBClusterManager) PrintErrEvent(err error) {
m.Recorder.Event(m.OBCluster, corev1.EventTypeWarning, "task exec failed", err.Error())
m.Recorder.Event(m.OBCluster, corev1.EventTypeWarning, "Task failed", err.Error())
}

func (m *OBClusterManager) ArchiveResource() {
Expand Down
36 changes: 18 additions & 18 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *OBClusterManager) generateWaitOBZoneStatusFunc(status string, timeoutSe
allMatched := true
for _, obzoneStatus := range obcluster.Status.OBZoneStatus {
if obzoneStatus.Status != status {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("zone status still not matched", "zone", obzoneStatus.Zone, "status", status)
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Zone status still not matched", "zone", obzoneStatus.Zone, "status", status)
allMatched = false
break
}
Expand All @@ -104,7 +104,7 @@ func (m *OBClusterManager) generateWaitOBZoneStatusFunc(status string, timeoutSe
}
time.Sleep(time.Second)
}
return errors.New("zone status still not matched when timeout")
return errors.New("Zone status still not matched when timeout")
}
return f
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (m *OBClusterManager) DeleteOBZone() tasktypes.TaskError {
}

func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("create obzones")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create obzones")
blockOwnerDeletion := true
ownerReferenceList := make([]metav1.OwnerReference, 0)
ownerReference := metav1.OwnerReference{
Expand Down Expand Up @@ -212,7 +212,7 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
if migrateAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("create obzone", "zone", zoneName)
m.Logger.Info("Create obzone", "zone", zoneName)
err := m.Client.Create(m.Ctx, obzone)
if err != nil {
m.Logger.Error(err, "create obzone failed", "zone", zone.Zone)
Expand All @@ -229,7 +229,7 @@ func (m *OBClusterManager) Bootstrap() tasktypes.TaskError {
m.Logger.Error(err, "list obzones failed")
return errors.Wrap(err, "list obzones")
}
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("successfully get obzone list", "obzone list", obzoneList)
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Successfully get obzone list", "obzone list", obzoneList)
if len(obzoneList.Items) == 0 {
return errors.Wrap(err, "no obzone belongs to this cluster")
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (m *OBClusterManager) MaintainOBParameter() tasktypes.TaskError {
}

func (m *OBClusterManager) CreateOBParameter(parameter *apitypes.Parameter) error {
m.Logger.Info("create ob parameters")
m.Logger.Info("Create ob parameters")
ownerReferenceList := make([]metav1.OwnerReference, 0)
ownerReference := metav1.OwnerReference{
APIVersion: m.OBCluster.APIVersion,
Expand All @@ -377,7 +377,7 @@ func (m *OBClusterManager) CreateOBParameter(parameter *apitypes.Parameter) erro
Parameter: parameter,
},
}
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("create obparameter", "parameter", parameterName)
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Create obparameter", "parameter", parameterName)
err := m.Client.Create(m.Ctx, obparameter)
if err != nil {
m.Logger.Error(err, "create obparameter failed")
Expand Down Expand Up @@ -475,17 +475,17 @@ func (m *OBClusterManager) ValidateUpgradeInfo() tasktypes.TaskError {
m.Logger.Error(err, "Failed to get job")
}
if jobObject.Status.Succeeded == 0 && jobObject.Status.Failed == 0 {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("job is still running")
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Job is still running")
} else {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("job finished")
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Job finished")
break
}
}

if jobObject.Status.Succeeded == 1 {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("job succeeded")
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Job succeeded")
} else {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("job is failed", "job", jobName)
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("Job is failed", "job", jobName)
return errors.Wrap(err, "Failed to run validate job")
}
return nil
Expand Down Expand Up @@ -554,7 +554,7 @@ func (m *OBClusterManager) WaitOBZoneUpgradeFinished(zoneName string) error {
}
}
if upgradeFinished {
m.Logger.Info("Obzone upgrade finished", "obzone", zoneName)
m.Logger.Info("OBZone upgrade finished", "obzone", zoneName)
break
}
time.Sleep(time.Second * oceanbaseconst.CommonCheckInterval)
Expand Down Expand Up @@ -616,7 +616,7 @@ func (m *OBClusterManager) ModifySysTenantReplica() tasktypes.TaskError {
zoneList = append(zoneList, zone)
}
}
m.Logger.Info("modify sys pool's zone list when add zone", "zone list", zoneList)
m.Logger.Info("Modify sys pool's zone list when add zone", "zone list", zoneList)
err = oceanbaseOperationManager.AlterPool(&model.PoolParam{
PoolName: oceanbaseconst.SysTenantPool,
ZoneList: zoneList,
Expand Down Expand Up @@ -646,7 +646,7 @@ func (m *OBClusterManager) ModifySysTenantReplica() tasktypes.TaskError {
Zone: zone,
})
locality = obutil.ConvertToLocalityStr(replicas)
m.Logger.Info("modify sys tenant's locality when add zone", "locality", locality)
m.Logger.Info("Modify sys tenant's locality when add zone", "locality", locality)
err = oceanbaseOperationManager.SetTenant(model.TenantSQLParam{
TenantName: oceanbaseconst.SysTenant,
Locality: locality,
Expand All @@ -672,7 +672,7 @@ func (m *OBClusterManager) ModifySysTenantReplica() tasktypes.TaskError {
if !found {
newReplicas := obutil.OmitZoneFromReplicas(replicas, r.Zone)
locality = obutil.ConvertToLocalityStr(newReplicas)
m.Logger.Info("modify sys tenant's locality when delete zone", "locality", locality)
m.Logger.Info("Modify sys tenant's locality when delete zone", "locality", locality)
err = oceanbaseOperationManager.SetTenant(model.TenantSQLParam{
TenantName: oceanbaseconst.SysTenant,
Locality: locality,
Expand Down Expand Up @@ -700,7 +700,7 @@ func (m *OBClusterManager) ModifySysTenantReplica() tasktypes.TaskError {
newZoneList = append(newZoneList, zone)
}
}
m.Logger.Info("modify sys pool's zone list when delete zone", "zone list", newZoneList)
m.Logger.Info("Modify sys pool's zone list when delete zone", "zone list", newZoneList)
return oceanbaseOperationManager.AlterPool(&model.PoolParam{
PoolName: oceanbaseconst.SysTenantPool,
ZoneList: newZoneList,
Expand Down Expand Up @@ -1000,9 +1000,9 @@ func (m *OBClusterManager) CheckClusterMode() tasktypes.TaskError {
return err
}
if jobObject.Status.Succeeded == 0 && jobObject.Status.Failed == 0 {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("ob version check job is still running")
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("OBServer version check job is still running")
} else {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("ob version check job finished")
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("OBServer version check job finished")
break
}
}
Expand Down
24 changes: 10 additions & 14 deletions internal/resource/obparameter/obparameter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@ import (
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oceanbase/ob-operator/internal/telemetry"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"

apitypes "github.com/oceanbase/ob-operator/api/types"
v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
parameterstatus "github.com/oceanbase/ob-operator/internal/const/status/obparameter"
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/internal/telemetry"
opresource "github.com/oceanbase/ob-operator/pkg/coordinator"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
"github.com/oceanbase/ob-operator/pkg/task"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

Expand All @@ -58,7 +57,7 @@ func (m *OBParameterManager) GetStatus() string {
}

func (m *OBParameterManager) InitStatus() {
m.Logger.Info("newly created obparameter, init status")
m.Logger.Info("Newly created obparameter, init status")
status := v1alpha1.OBParameterStatus{
Status: parameterstatus.New,
Parameter: make([]apitypes.ParameterValue, 0),
Expand All @@ -73,21 +72,21 @@ func (m *OBParameterManager) SetOperationContext(c *tasktypes.OperationContext)
func (m *OBParameterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
// exists unfinished task flow, return the last task flow
if m.OBParameter.Status.OperationContext != nil {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("get task flow from obparameter status")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Get task flow from obparameter status")
return tasktypes.NewTaskFlow(m.OBParameter.Status.OperationContext), nil
}

// return task flow depends on status

var taskFlow *tasktypes.TaskFlow
var err error
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("create task flow according to obparameter status")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obparameter status")
switch m.OBParameter.Status.Status {
// only need to handle parameter not match
case parameterstatus.NotMatch:
taskFlow, err = task.GetRegistry().Get(fSetOBParameter)
default:
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("no need to run anything for obparameter")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("No need to run anything for obparameter")
return nil, nil
}

Expand Down Expand Up @@ -128,9 +127,6 @@ func (m *OBParameterManager) retryUpdateStatus() error {
}

func (m *OBParameterManager) UpdateStatus() error {
if m.OBParameter.Status.Status == "Failed" {
return nil
}
obcluster, err := m.getOBCluster()
if err != nil {
return errors.Wrap(err, "Get obcluster from K8s")
Expand All @@ -142,7 +138,7 @@ func (m *OBParameterManager) UpdateStatus() error {
}
if obcluster.Status.Status != clusterstatus.Running {
m.OBParameter.Status.Status = parameterstatus.PendingOB
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("obcluster not in running status, skip compare parameters")
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("OBCluster not in running status, skip compare parameters")
} else {
parameterInfoList, err := operationManager.GetParameter(m.OBParameter.Spec.Parameter.Name, nil)
if err != nil {
Expand Down Expand Up @@ -219,7 +215,7 @@ func (m *OBParameterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.Tas
}

func (m *OBParameterManager) PrintErrEvent(err error) {
m.Recorder.Event(m.OBParameter, corev1.EventTypeWarning, "task exec failed", err.Error())
m.Recorder.Event(m.OBParameter, corev1.EventTypeWarning, "Task failed", err.Error())
}

func (m *OBParameterManager) SetOBParameter() tasktypes.TaskError {
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/observer/observer_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func PrepareOBServerForBootstrap() *tasktypes.TaskFlow {
func MaintainOBServerAfterBootstrap() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fPrepareOBServerForBootstrap,
Name: fMaintainOBServerAfterBootstrap,
Tasks: []tasktypes.TaskName{tWaitOBClusterBootstrapped, tAddServer, tWaitOBServerActiveInCluster},
TargetStatus: serverstatus.Running,
},
Expand Down
Loading

0 comments on commit 674442a

Please sign in to comment.