Skip to content

Commit

Permalink
refactor(task): refactor all packages in internal/resource
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI committed Mar 21, 2024
1 parent 7cf2db4 commit 6f8449a
Show file tree
Hide file tree
Showing 55 changed files with 2,592 additions and 2,685 deletions.
12 changes: 3 additions & 9 deletions cmd/generator/task/task-register.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ package {{.PackageName}}
func init() {
{{- range .Tasks }}
taskMap[t{{.Name}}] = {{.TaskFuncName}}
taskMap.Register(t{{.}}, {{.}})
{{- end }}
}
`

type Task struct {
Name string
TaskFuncName string
}
type Task string

func main() {
if len(os.Args) != 2 {
Expand All @@ -59,10 +56,7 @@ func main() {
// Get return type of function and check whether it is a func(resource T) TaskError
if len(fn.Type.Params.List) == 1 && len(fn.Type.Results.List) == 1 {
if strings.HasSuffix(exprToString(fn.Type.Results.List[0].Type), "TaskError") {
taskFuncs = append(taskFuncs, Task{
Name: fn.Name.Name,
TaskFuncName: fn.Name.Name,
})
taskFuncs = append(taskFuncs, Task(fn.Name.Name))
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}
if needExecuteFinalizer {
err = observerManager.DeleteOBServerInCluster()
err = resobserver.DeleteOBServerInCluster(observerManager)
if err != nil {
logger.Error(err, "delete observer failed")
return ctrl.Result{}, errors.Wrapf(err, "delete observer %s failed", observer.Name)
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

var taskMap = builder.NewTaskMap[*OBClusterManager]()
var taskMap = builder.NewTaskHub[*OBClusterManager]()

Check failure on line 48 in internal/resource/obcluster/obcluster_task.go

View workflow job for this annotation

GitHub Actions / build-images

undefined: builder.NewTaskHub

func WaitOBZoneTopologyMatch(_ *OBClusterManager) tasktypes.TaskError {
// TODO
Expand Down
54 changes: 27 additions & 27 deletions internal/resource/obcluster/obcluster_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 0 additions & 22 deletions internal/resource/obparameter/init.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/resource/obparameter/obparameter_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func SetOBParameter() *tasktypes.TaskFlow {
func FlowSetOBParameter() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fSetOBParameter,
Expand Down
75 changes: 3 additions & 72 deletions internal/resource/obparameter/obparameter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

apitypes "github.com/oceanbase/ob-operator/api/types"
Expand All @@ -32,8 +30,6 @@ import (
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/internal/telemetry"
opresource "github.com/oceanbase/ob-operator/pkg/coordinator"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
"github.com/oceanbase/ob-operator/pkg/task"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
Expand Down Expand Up @@ -79,28 +75,23 @@ func (m *OBParameterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
// return task flow depends on status

var taskFlow *tasktypes.TaskFlow
var err error
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obparameter status")
switch m.OBParameter.Status.Status {
// only need to handle parameter not match
case parameterstatus.NotMatch:
taskFlow, err = task.GetRegistry().Get(fSetOBParameter)
taskFlow = FlowSetOBParameter()
default:
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("No need to run anything for obparameter")
return nil, nil
}

if err != nil {
return nil, err
}

if taskFlow.OperationContext.OnFailure.Strategy == "" {
taskFlow.OperationContext.OnFailure.Strategy = strategy.StartOver
if taskFlow.OperationContext.OnFailure.NextTryStatus == "" {
taskFlow.OperationContext.OnFailure.NextTryStatus = parameterstatus.Matched
}
}
return taskFlow, err
return taskFlow, nil
}

func (m *OBParameterManager) IsDeleting() bool {
Expand All @@ -112,21 +103,6 @@ func (m *OBParameterManager) CheckAndUpdateFinalizers() error {
return nil
}

func (m *OBParameterManager) retryUpdateStatus() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
parameter := &v1alpha1.OBParameter{}
err := m.Client.Get(m.Ctx, types.NamespacedName{
Namespace: m.OBParameter.GetNamespace(),
Name: m.OBParameter.GetName(),
}, parameter)
if err != nil {
return client.IgnoreNotFound(err)
}
parameter.Status = *m.OBParameter.Status.DeepCopy()
return m.Client.Status().Update(m.Ctx, parameter)
})
}

func (m *OBParameterManager) UpdateStatus() error {
obcluster, err := m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -207,58 +183,13 @@ func (m *OBParameterManager) HandleFailure() {
}

func (m *OBParameterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tSetOBParameter:
return m.SetOBParameter, nil
default:
return nil, errors.New("Can not find a function for task")
}
return taskMap.GetTask(name, m)
}

func (m *OBParameterManager) PrintErrEvent(err error) {
m.Recorder.Event(m.OBParameter, corev1.EventTypeWarning, "Task failed", err.Error())
}

func (m *OBParameterManager) SetOBParameter() tasktypes.TaskError {
operationManager, err := m.getOceanbaseOperationManager()
if err != nil {
m.Logger.Error(err, "Get operation manager failed")
return errors.Wrapf(err, "Get operation manager")
}
err = operationManager.SetParameter(m.OBParameter.Spec.Parameter.Name, m.OBParameter.Spec.Parameter.Value, nil)
if err != nil {
m.Logger.Error(err, "Set parameter failed")
return errors.Wrapf(err, "Set parameter")
}
return nil
}

func (m *OBParameterManager) generateNamespacedName(name string) types.NamespacedName {
var namespacedName types.NamespacedName
namespacedName.Namespace = m.OBParameter.Namespace
namespacedName.Name = name
return namespacedName
}

func (m *OBParameterManager) getOBCluster() (*v1alpha1.OBCluster, error) {
// this label always exists
clusterName, _ := m.OBParameter.Labels[oceanbaseconst.LabelRefOBCluster]
obcluster := &v1alpha1.OBCluster{}
err := m.Client.Get(m.Ctx, m.generateNamespacedName(clusterName), obcluster)
if err != nil {
return nil, errors.Wrap(err, "get obcluster")
}
return obcluster, nil
}

func (m *OBParameterManager) getOceanbaseOperationManager() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster from K8s")
}
return resourceutils.GetSysOperationClient(m.Client, m.Logger, obcluster)
}

func (m *OBParameterManager) ArchiveResource() {
m.Logger.Info("Archive obparameter", "obparameter", m.OBParameter.Name)
m.Recorder.Event(m.OBParameter, "Archive", "", "archive obparameter")
Expand Down
38 changes: 38 additions & 0 deletions internal/resource/obparameter/obparameter_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package obparameter

import (
"github.com/pkg/errors"

"github.com/oceanbase/ob-operator/pkg/task/builder"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

//go:generate task-register $GOFILE

var taskMap = builder.NewTaskHub[*OBParameterManager]()

Check failure on line 24 in internal/resource/obparameter/obparameter_task.go

View workflow job for this annotation

GitHub Actions / build-images

undefined: builder.NewTaskHub

func SetOBParameter(m *OBParameterManager) tasktypes.TaskError {
operationManager, err := m.getOceanbaseOperationManager()
if err != nil {
m.Logger.Error(err, "Get operation manager failed")
return errors.Wrapf(err, "Get operation manager")
}
err = operationManager.SetParameter(m.OBParameter.Spec.Parameter.Name, m.OBParameter.Spec.Parameter.Value, nil)
if err != nil {
m.Logger.Error(err, "Set parameter failed")
return errors.Wrapf(err, "Set parameter")
}
return nil
}
6 changes: 6 additions & 0 deletions internal/resource/obparameter/obparameter_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions internal/resource/obparameter/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package obparameter

import (
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
)

func (m *OBParameterManager) generateNamespacedName(name string) types.NamespacedName {
var namespacedName types.NamespacedName
namespacedName.Namespace = m.OBParameter.Namespace
namespacedName.Name = name
return namespacedName
}

func (m *OBParameterManager) getOBCluster() (*v1alpha1.OBCluster, error) {
// this label always exists
clusterName, _ := m.OBParameter.Labels[oceanbaseconst.LabelRefOBCluster]
obcluster := &v1alpha1.OBCluster{}
err := m.Client.Get(m.Ctx, m.generateNamespacedName(clusterName), obcluster)
if err != nil {
return nil, errors.Wrap(err, "get obcluster")
}
return obcluster, nil
}

func (m *OBParameterManager) getOceanbaseOperationManager() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster from K8s")
}
return resourceutils.GetSysOperationClient(m.Client, m.Logger, obcluster)
}

func (m *OBParameterManager) retryUpdateStatus() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
parameter := &v1alpha1.OBParameter{}
err := m.Client.Get(m.Ctx, types.NamespacedName{
Namespace: m.OBParameter.GetNamespace(),
Name: m.OBParameter.GetName(),
}, parameter)
if err != nil {
return client.IgnoreNotFound(err)
}
parameter.Status = *m.OBParameter.Status.DeepCopy()
return m.Client.Status().Update(m.Ctx, parameter)
})
}
Loading

0 comments on commit 6f8449a

Please sign in to comment.