Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: concurrent map writes #95

Merged
merged 7 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion api/v1alpha1/obtenant_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,6 +33,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
"github.com/oceanbase/ob-operator/pkg/const/status/tenantstatus"
)

// log is for logging in this package.
Expand Down Expand Up @@ -69,6 +72,8 @@ func (r *OBTenant) Default() {

if r.Spec.TenantRole == "" {
r.Spec.TenantRole = constants.TenantRolePrimary
} else {
r.Spec.TenantRole = apitypes.TenantRole(strings.ToUpper(string(r.Spec.TenantRole)))
}
}

Expand All @@ -86,7 +91,14 @@ func (r *OBTenant) ValidateCreate() (admission.Warnings, error) {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *OBTenant) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
_ = old
// TODO(user): fill in your validation logic upon object update.
if r.Status.Status == tenantstatus.Running {
switch {
case r.Spec.ClusterName != old.(*OBTenant).Spec.ClusterName:
return nil, apierrors.NewBadRequest("Cannot change clusterName when tenant is running")
case r.Spec.TenantName != old.(*OBTenant).Spec.TenantName:
return nil, apierrors.NewBadRequest("Cannot change tenantName when tenant is running")
}
}
return nil, r.validateMutation()
}

Expand All @@ -97,6 +109,11 @@ func (r *OBTenant) validateMutation() error {
}
var allErrs field.ErrorList

// 0. TenantRole must be one of PRIMARY and STANDBY
if r.Spec.TenantRole != constants.TenantRolePrimary && r.Spec.TenantRole != constants.TenantRoleStandby {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("tenantRole"), r.Spec.TenantRole, "TenantRole must be primary or standby"))
}

// 0. OBCluster must exist
cluster := &OBCluster{}
err := tenantClt.Get(context.Background(), types.NamespacedName{
Expand Down
19 changes: 19 additions & 0 deletions api/v1alpha1/obtenantbackuppolicy_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
oceanbaseconst "github.com/oceanbase/ob-operator/pkg/const/oceanbase"
"github.com/oceanbase/ob-operator/pkg/const/status/tenantstatus"
)
Expand All @@ -60,11 +61,16 @@ var _ webhook.Defaulter = &OBTenantBackupPolicy{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *OBTenantBackupPolicy) Default() {
// Set default values for backup policy destination types
if r.Spec.DataBackup.Destination.Type == "" {
r.Spec.DataBackup.Destination.Type = constants.BackupDestTypeNFS
} else {
r.Spec.DataBackup.Destination.Type = apitypes.BackupDestType(strings.ToUpper(string(r.Spec.DataBackup.Destination.Type)))
}
if r.Spec.LogArchive.Destination.Type == "" {
r.Spec.LogArchive.Destination.Type = constants.BackupDestTypeNFS
} else {
r.Spec.LogArchive.Destination.Type = apitypes.BackupDestType(strings.ToUpper(string(r.Spec.LogArchive.Destination.Type)))
}
if r.Spec.LogArchive.SwitchPieceInterval == "" {
r.Spec.LogArchive.SwitchPieceInterval = "1d"
Expand Down Expand Up @@ -220,6 +226,19 @@ func (r *OBTenantBackupPolicy) validateBackupPolicy() error {
}
}

if r.Spec.LogArchive.Binding != constants.ArchiveBindingOptional && r.Spec.LogArchive.Binding != constants.ArchiveBindingMandatory {
return field.Invalid(field.NewPath("spec").Child("logArchive").Child("binding"), r.Spec.LogArchive.Binding, "invalid binding, only optional and mandatory are supported")
}

// Check types of destinations are legal
if r.Spec.LogArchive.Destination.Type != constants.BackupDestTypeNFS && r.Spec.LogArchive.Destination.Type != constants.BackupDestTypeOSS {
return field.Invalid(field.NewPath("spec").Child("logArchive").Child("destination").Child("type"), r.Spec.LogArchive.Destination.Type, "invalid destination type, only NFS and OSS are supported")
}
if r.Spec.DataBackup.Destination.Type != constants.BackupDestTypeNFS && r.Spec.DataBackup.Destination.Type != constants.BackupDestTypeOSS {
return field.Invalid(field.NewPath("spec").Child("dataBackup").Child("destination").Child("type"), r.Spec.DataBackup.Destination.Type, "invalid destination type, only NFS and OSS are supported")
}

// Check oss access of destinations
if r.Spec.DataBackup.Destination.Type == constants.BackupDestTypeOSS && r.Spec.DataBackup.Destination.OSSAccessSecret != "" {
if !ossPathPattern.MatchString(r.Spec.DataBackup.Destination.Path) {
return field.Invalid(field.NewPath("spec").Child("dataBackup").Child("destination").Child("path"), r.Spec.DataBackup.Destination.Path, "invalid path, pattern: ^oss://[^/]+/[^/].*\\?host=.+$")
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/obtenantoperation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
)

// log is for logging in this package.
Expand All @@ -51,7 +53,8 @@ var _ webhook.Defaulter = &OBTenantOperation{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *OBTenantOperation) Default() {
obtenantoperationlog.Info("default", "name", r.Name)
r.Spec.Type = apitypes.TenantOperationType(strings.ToUpper(string(r.Spec.Type)))

tenant := &OBTenant{}
var targetTenantName string
var secondaryTenantName string
Expand Down Expand Up @@ -113,7 +116,8 @@ func (r *OBTenantOperation) ValidateCreate() (admission.Warnings, error) {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *OBTenantOperation) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
_ = old
return nil, r.validateMutation()
warnings := []string{"Updating operation resource can not trigger any action, please create a new one if you want to do that"}
return warnings, r.validateMutation()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand All @@ -140,6 +144,8 @@ func (r *OBTenantOperation) validateMutation() error {
if r.Spec.Switchover == nil || r.Spec.Switchover.PrimaryTenant == "" || r.Spec.Switchover.StandbyTenant == "" {
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("switchover").Child("primaryTenant", "standbyTenant"), "name of primary tenant and standby tenant are both required"))
}
default:
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("type"), string(r.Spec.Type)+" type of operation is not supported"))
}
if len(allErrs) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion charts/oceanbase-cluster/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Welcome to OceanBase Cluster!

After installing OBCluster chart, you need to wait for the cluster bootstrapped. Bootstrap progress will cost approximately 2~3 minutes which may varies depends on the machine.
After installing OBCluster chart, you need to wait for the cluster bootstrapped. Bootstrap progress will cost approximately 2~3 minutes which may vary depends on the machine.

You can use the following command to wait for the OBCluster to be ready.

Expand Down
4 changes: 3 additions & 1 deletion charts/oceanbase-cluster/templates/secret.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{- if $.Values.generateUserSecrets }}
{{- range $secretName := $.Values.userSecrets }}
{{- if empty (lookup "v1" "Secret" $.Release.Namespace $secretName) }}
--- # the lookup function will return an empty list when dry-running or local rendering
apiVersion: v1
kind: Secret
Expand All @@ -10,7 +9,10 @@ metadata:
labels:
{{- include "oceanbase-cluster.labels" $ | nindent 4 }}
data:
{{- if empty (lookup "v1" "Secret" $.Release.Namespace $secretName) }}
password: {{ randAlphaNum 16 | b64enc }}
{{- else }}
password: {{ (lookup "v1" "Secret" $.Release.Namespace $secretName).data.password }}
{{- end }}
{{- end }}
{{- end }}
2 changes: 0 additions & 2 deletions pkg/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
Expand Down Expand Up @@ -114,7 +113,6 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.OBServer{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 9}).
WithEventFilter(preds).
Complete(r)
}
13 changes: 5 additions & 8 deletions pkg/resource/obtenant_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type OBTenantManager struct {
Logger *logr.Logger
}

// TODO add lock to be thread safe, and read/write whitelist from/to DB
var GlobalWhiteListMap = make(map[string]string, 0)

func (m *OBTenantManager) getClusterSysClient() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -567,14 +564,14 @@ func (m *OBTenantManager) buildTenantStatus() (*v1alpha1.OBTenantStatus, error)
tenantCurrentStatus.TenantRecordInfo = v1alpha1.TenantRecordInfo{}
tenantCurrentStatus.TenantRecordInfo.TenantID = int(obtenant.TenantID)

// TODO get whitelist from tenant account
whitelist, exists := GlobalWhiteListMap[obtenant.TenantName]
// TODO: get whitelist from tenant account
whitelist, exists := tenantWhiteListMap.Load(obtenant.TenantName)
if exists {
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist.(string)
} else {
// try update whitelist after the manager restart
GlobalWhiteListMap[obtenant.TenantName] = tenant.DefaultOBTcpInvitedNodes
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = GlobalWhiteListMap[obtenant.TenantName]
tenantWhiteListMap.Store(obtenant.TenantName, tenant.DefaultOBTcpInvitedNodes)
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = tenant.DefaultOBTcpInvitedNodes
}

tenantCurrentStatus.TenantRecordInfo.UnitNumber = poolStatusList[0].UnitNumber
Expand Down
6 changes: 3 additions & 3 deletions pkg/resource/obtenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (m *OBTenantManager) CheckAndApplyWhiteList() error {
if err != nil {
return err
}
// TODO get whitelist variable by tenant account
// TODO: get whitelist variable by tenant account
// Because getting a whitelist requires specifying a tenant , temporary use .Status.TenantRecordInfo.ConnectWhiteList as value in db
GlobalWhiteListMap[tenantName] = specWhiteList
tenantWhiteListMap.Store(tenantName, specWhiteList)
}
return nil
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (m *OBTenantManager) createTenant() error {
m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "failed to create OBTenant", err.Error())
return err
}
GlobalWhiteListMap[tenantName] = m.OBTenant.Spec.ConnectWhiteList
tenantWhiteListMap.Store(tenantName, m.OBTenant.Spec.ConnectWhiteList)
// Create user or change password of root, do not return error
m.Recorder.Event(m.OBTenant, "Create", "", "create OBTenant successfully")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/obtenantrestore_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *OBTenantManager) WatchRestoreJobToFinish() error {
}
time.Sleep(5 * time.Second)
}
GlobalWhiteListMap[m.OBTenant.Spec.TenantName] = m.OBTenant.Spec.ConnectWhiteList
tenantWhiteListMap.Store(m.OBTenant.Spec.TenantName, m.OBTenant.Spec.ConnectWhiteList)
m.Recorder.Event(m.OBTenant, "RestoreJobFinished", "", "restore job finished successfully")
return nil
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/resource/tenantWhiteList.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package resource

import "sync"

var tenantWhiteListMap sync.Map
37 changes: 21 additions & 16 deletions pkg/task/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func GetTaskManager() *TaskManager {
taskManagerOnce.Do(func() {
logger := log.FromContext(context.TODO())
taskManager = &TaskManager{
ResultMap: make(map[string]chan *TaskResult),
Logger: &logger,
TaskResultCache: make(map[string]*TaskResult, 0),
Logger: &logger,
}
})
return taskManager
Expand All @@ -46,17 +44,16 @@ type TaskResult struct {
}

type TaskManager struct {
ResultMap map[string]chan *TaskResult
ResultMap sync.Map
Logger *logr.Logger
TaskResultCache map[string]*TaskResult
TaskResultCache sync.Map
}

func (m *TaskManager) Submit(f func() error) string {
retCh := make(chan *TaskResult, 1)
taskId := uuid.New().String()
// TODO add lock to keep ResultMap safe
m.ResultMap[taskId] = retCh
m.TaskResultCache[taskId] = nil
m.ResultMap.Store(taskId, retCh)
m.TaskResultCache.Delete(taskId)
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -83,30 +80,38 @@ func (m *TaskManager) Submit(f func() error) string {
}

func (m *TaskManager) GetTaskResult(taskId string) (*TaskResult, error) {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil, errors.Errorf("Task %s not exists", taskId)
}
if m.TaskResultCache[taskId] == nil {
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil, errors.Errorf("Task %s not exists", taskId)
}
result, exists := m.TaskResultCache.Load(taskId)
if !exists {
select {
case result := <-retCh:
m.TaskResultCache[taskId] = result
m.TaskResultCache.Store(taskId, result)
return result, nil
default:
return nil, nil
}
} else {
return m.TaskResultCache[taskId], nil
}
return result.(*TaskResult), nil
}

func (m *TaskManager) CleanTaskResult(taskId string) error {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil
}
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil
}
close(retCh)
delete(m.ResultMap, taskId)
delete(m.TaskResultCache, taskId)
m.ResultMap.Delete(taskId)
m.TaskResultCache.Delete(taskId)
return nil
}
Loading