Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tasks and flows in OBCluster package #249

Merged
merged 13 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions cmd/generator/task/task-register.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ package {{.PackageName}}

func init() {
{{- range .Tasks }}
taskMap[t{{.Name}}] = {{.TaskFuncName}}
taskMap.Register(t{{.}}, {{.}})
{{- end }}
}
`

type Task struct {
Name string
TaskFuncName string
}
type Task string

func main() {
if len(os.Args) != 2 {
Expand All @@ -59,10 +56,7 @@ func main() {
// Get return type of function and check whether it is a func(resource T) TaskError
if len(fn.Type.Params.List) == 1 && len(fn.Type.Results.List) == 1 {
if strings.HasSuffix(exprToString(fn.Type.Results.List[0].Type), "TaskError") {
taskFuncs = append(taskFuncs, Task{
Name: strings.ReplaceAll(fn.Name.Name, "Task", ""),
TaskFuncName: fn.Name.Name,
})
taskFuncs = append(taskFuncs, Task(fn.Name.Name))
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}
if needExecuteFinalizer {
err = observerManager.DeleteOBServerInCluster()
err = resobserver.DeleteOBServerInCluster(observerManager)
if err != nil {
logger.Error(err, "delete observer failed")
return ctrl.Result{}, errors.Wrapf(err, "delete observer %s failed", observer.Name)
Expand Down
5 changes: 3 additions & 2 deletions internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

var _ opresource.ResourceManager = &OBClusterManager{}

type OBClusterManager struct {
opresource.ResourceManager
Ctx context.Context
OBCluster *v1alpha1.OBCluster
Client client.Client
Expand Down Expand Up @@ -277,7 +278,7 @@ func (m *OBClusterManager) HandleFailure() {
}

func (m *OBClusterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
return taskMap.GetLegacyTask(name, m)
return taskMap.GetTask(name, m)
}

func (m *OBClusterManager) PrintErrEvent(err error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

var taskMap = builder.NewTaskMap[*OBClusterManager]()
var taskMap = builder.NewTaskHub[*OBClusterManager]()

func WaitOBZoneTopologyMatch(_ *OBClusterManager) tasktypes.TaskError {
// TODO
Expand Down
54 changes: 27 additions & 27 deletions internal/resource/obcluster/obcluster_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 0 additions & 22 deletions internal/resource/obparameter/init.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/resource/obparameter/obparameter_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func SetOBParameter() *tasktypes.TaskFlow {
func FlowSetOBParameter() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fSetOBParameter,
Expand Down
78 changes: 5 additions & 73 deletions internal/resource/obparameter/obparameter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

apitypes "github.com/oceanbase/ob-operator/api/types"
Expand All @@ -32,15 +30,14 @@ import (
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/internal/telemetry"
opresource "github.com/oceanbase/ob-operator/pkg/coordinator"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
"github.com/oceanbase/ob-operator/pkg/task"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

var _ opresource.ResourceManager = &OBParameterManager{}

type OBParameterManager struct {
opresource.ResourceManager
Ctx context.Context
OBParameter *v1alpha1.OBParameter
Client client.Client
Expand Down Expand Up @@ -79,28 +76,23 @@ func (m *OBParameterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
// return task flow depends on status

var taskFlow *tasktypes.TaskFlow
var err error
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obparameter status")
switch m.OBParameter.Status.Status {
// only need to handle parameter not match
case parameterstatus.NotMatch:
taskFlow, err = task.GetRegistry().Get(fSetOBParameter)
taskFlow = FlowSetOBParameter()
default:
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("No need to run anything for obparameter")
return nil, nil
}

if err != nil {
return nil, err
}

if taskFlow.OperationContext.OnFailure.Strategy == "" {
taskFlow.OperationContext.OnFailure.Strategy = strategy.StartOver
if taskFlow.OperationContext.OnFailure.NextTryStatus == "" {
taskFlow.OperationContext.OnFailure.NextTryStatus = parameterstatus.Matched
}
}
return taskFlow, err
return taskFlow, nil
}

func (m *OBParameterManager) IsDeleting() bool {
Expand All @@ -112,21 +104,6 @@ func (m *OBParameterManager) CheckAndUpdateFinalizers() error {
return nil
}

func (m *OBParameterManager) retryUpdateStatus() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
parameter := &v1alpha1.OBParameter{}
err := m.Client.Get(m.Ctx, types.NamespacedName{
Namespace: m.OBParameter.GetNamespace(),
Name: m.OBParameter.GetName(),
}, parameter)
if err != nil {
return client.IgnoreNotFound(err)
}
parameter.Status = *m.OBParameter.Status.DeepCopy()
return m.Client.Status().Update(m.Ctx, parameter)
})
}

func (m *OBParameterManager) UpdateStatus() error {
obcluster, err := m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -207,58 +184,13 @@ func (m *OBParameterManager) HandleFailure() {
}

func (m *OBParameterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tSetOBParameter:
return m.SetOBParameter, nil
default:
return nil, errors.New("Can not find a function for task")
}
return taskMap.GetTask(name, m)
}

func (m *OBParameterManager) PrintErrEvent(err error) {
m.Recorder.Event(m.OBParameter, corev1.EventTypeWarning, "Task failed", err.Error())
}

func (m *OBParameterManager) SetOBParameter() tasktypes.TaskError {
operationManager, err := m.getOceanbaseOperationManager()
if err != nil {
m.Logger.Error(err, "Get operation manager failed")
return errors.Wrapf(err, "Get operation manager")
}
err = operationManager.SetParameter(m.OBParameter.Spec.Parameter.Name, m.OBParameter.Spec.Parameter.Value, nil)
if err != nil {
m.Logger.Error(err, "Set parameter failed")
return errors.Wrapf(err, "Set parameter")
}
return nil
}

func (m *OBParameterManager) generateNamespacedName(name string) types.NamespacedName {
var namespacedName types.NamespacedName
namespacedName.Namespace = m.OBParameter.Namespace
namespacedName.Name = name
return namespacedName
}

func (m *OBParameterManager) getOBCluster() (*v1alpha1.OBCluster, error) {
// this label always exists
clusterName, _ := m.OBParameter.Labels[oceanbaseconst.LabelRefOBCluster]
obcluster := &v1alpha1.OBCluster{}
err := m.Client.Get(m.Ctx, m.generateNamespacedName(clusterName), obcluster)
if err != nil {
return nil, errors.Wrap(err, "get obcluster")
}
return obcluster, nil
}

func (m *OBParameterManager) getOceanbaseOperationManager() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster from K8s")
}
return resourceutils.GetSysOperationClient(m.Client, m.Logger, obcluster)
}

func (m *OBParameterManager) ArchiveResource() {
m.Logger.Info("Archive obparameter", "obparameter", m.OBParameter.Name)
m.Recorder.Event(m.OBParameter, "Archive", "", "archive obparameter")
Expand Down
38 changes: 38 additions & 0 deletions internal/resource/obparameter/obparameter_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package obparameter

import (
"github.com/pkg/errors"

"github.com/oceanbase/ob-operator/pkg/task/builder"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

//go:generate task-register $GOFILE

var taskMap = builder.NewTaskHub[*OBParameterManager]()

func SetOBParameter(m *OBParameterManager) tasktypes.TaskError {
operationManager, err := m.getOceanbaseOperationManager()
if err != nil {
m.Logger.Error(err, "Get operation manager failed")
return errors.Wrapf(err, "Get operation manager")
}
err = operationManager.SetParameter(m.OBParameter.Spec.Parameter.Name, m.OBParameter.Spec.Parameter.Value, nil)
if err != nil {
m.Logger.Error(err, "Set parameter failed")
return errors.Wrapf(err, "Set parameter")
}
return nil
}
6 changes: 6 additions & 0 deletions internal/resource/obparameter/obparameter_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading