Skip to content

Commit

Permalink
Migrate obcluster (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-sun-star authored Mar 13, 2024
1 parent 1476327 commit 0cec638
Show file tree
Hide file tree
Showing 25 changed files with 372 additions and 92 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/obcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type OBClusterSpec struct {
// Important: Run "make" to regenerate code after modifying this file

ClusterName string `json:"clusterName"`
ClusterId int64 `json:"clusterId,omitempty"`
ClusterId int64 `json:"clusterId"`
OBServerTemplate *apitypes.OBServerTemplate `json:"observer"`
MonitorTemplate *apitypes.MonitorTemplate `json:"monitor,omitempty"`
BackupVolume *apitypes.BackupVolumeSpec `json:"backupVolume,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/obcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (r *OBCluster) validateMutation() error {
} else if r.Spec.Topology[0].Replica != 1 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("topology"), r.Spec.Topology, "standalone mode only support single replica"))
}
// validate migration
migrateAnnoVal, migrateAnnoExist := r.GetAnnotations()[oceanbaseconst.AnnotationsSourceClusterAddress]
if migrateAnnoExist {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("annotations").Child(oceanbaseconst.AnnotationsSourceClusterAddress), migrateAnnoVal, "migrate obcluster into standalone mode is not supported"))
}
}

// Validate userSecrets
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/oceanbase.oceanbase.com_obclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2703,6 +2703,7 @@ spec:
- root
type: object
required:
- clusterId
- clusterName
- observer
- topology
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2726,6 +2726,7 @@ spec:
- root
type: object
required:
- clusterId
- clusterName
- observer
- topology
Expand Down Expand Up @@ -12038,6 +12039,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
11 changes: 11 additions & 0 deletions internal/const/oceanbase/oceanbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
GetConnectionMaxRetries = 10
CheckConnectionInterval = 3
CheckJobInterval = 3
CheckJobMaxRetries = 100
CommonCheckInterval = 5
)

Expand All @@ -76,6 +77,7 @@ const (
AnnotationsIndependentPVCLifecycle = "oceanbase.oceanbase.com/independent-pvc-lifecycle"
AnnotationsSinglePVC = "oceanbase.oceanbase.com/single-pvc"
AnnotationsMode = "oceanbase.oceanbase.com/mode"
AnnotationsSourceClusterAddress = "oceanbase.oceanbase.com/source-cluster-address"
)

const (
Expand Down Expand Up @@ -174,3 +176,12 @@ const (
const (
TolerateServerPodNotReadyMinutes = 5
)

const (
ClusterNameParam = "cluster"
ClusterIdParam = "cluster_id"
)

const (
CmdVersion = "rpm -q --queryformat '%{VERSION}-%{RELEASE}' oceanbase-ce | sed 's/\\.[^.]*$//'"
)
1 change: 1 addition & 0 deletions internal/const/status/obcluster/obcluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package obcluster

const (
New = "new"
MigrateFromExisting = "migrate from existing"
Running = "running"
AddOBZone = "add obzone"
DeleteOBZone = "delete obzone"
Expand Down
27 changes: 14 additions & 13 deletions internal/const/status/obzone/obzone_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package obcluster
package obzone

const (
New = "new"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
New = "new"
MigrateFromExisting = "migrate from existing"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
)
1 change: 1 addition & 0 deletions internal/controller/obcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type OBClusterReconciler struct {
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=update
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods/log,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
1 change: 1 addition & 0 deletions internal/resource/obcluster/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func init() {
// obcluster
task.GetRegistry().Register(fBootstrapOBCluster, BootstrapOBCluster)
task.GetRegistry().Register(fMigrateOBClusterFromExisting, MigrateOBClusterFromExisting)
task.GetRegistry().Register(fMaintainOBClusterAfterBootstrap, MaintainOBClusterAfterBootstrap)
task.GetRegistry().Register(fAddOBZone, AddOBZone)
task.GetRegistry().Register(fDeleteOBZone, DeleteOBZone)
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/obcluster/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

// obcluster flows
const (
fMigrateOBClusterFromExisting ttypes.FlowName = "migrate obcluster from existing"
fBootstrapOBCluster ttypes.FlowName = "bootstrap obcluster"
fMaintainOBClusterAfterBootstrap ttypes.FlowName = "maintain obcluster after bootstrap"
fAddOBZone ttypes.FlowName = "add obzone"
Expand All @@ -33,6 +34,7 @@ const (

// obcluster tasks
const (
tCheckMigration ttypes.TaskName = "check before migration"
tCheckImageReady ttypes.TaskName = "check image ready"
tCheckClusterMode ttypes.TaskName = "check cluster mode"
tCheckAndCreateUserSecrets ttypes.TaskName = "check and create user secrets"
Expand Down
13 changes: 13 additions & 0 deletions internal/resource/obcluster/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func MigrateOBClusterFromExisting() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fMigrateOBClusterFromExisting,
Tasks: []tasktypes.TaskName{tCheckMigration, tCheckImageReady, tCheckClusterMode, tCheckAndCreateUserSecrets, tCreateOBZone, tWaitOBZoneRunning, tCreateUsers, tMaintainOBParameter, tCreateServiceForMonitor, tCreateOBClusterService},
TargetStatus: clusterstatus.Running,
OnFailure: tasktypes.FailureRule{
NextTryStatus: clusterstatus.Failed,
},
},
}
}

func BootstrapOBCluster() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Expand Down
11 changes: 10 additions & 1 deletion internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ func (m *OBClusterManager) GetStatus() string {
func (m *OBClusterManager) InitStatus() {
m.Logger.Info("Newly created cluster, init status")
m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status")
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
initialStatus := clusterstatus.New
if migrateAnnoExist {
initialStatus = clusterstatus.MigrateFromExisting
}
status := v1alpha1.OBClusterStatus{
Image: m.OBCluster.Spec.OBServerTemplate.Image,
Status: clusterstatus.New,
Status: initialStatus,
OBZoneStatus: make([]apitypes.OBZoneReplicaStatus, 0, len(m.OBCluster.Spec.Topology)),
}
m.OBCluster.Status = status
Expand All @@ -80,6 +85,8 @@ func (m *OBClusterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obcluster status")
switch m.OBCluster.Status.Status {
// create obcluster, return taskFlow to bootstrap obcluster
case clusterstatus.MigrateFromExisting:
taskFlow, err = task.GetRegistry().Get(fMigrateOBClusterFromExisting)
case clusterstatus.New:
taskFlow, err = task.GetRegistry().Get(fBootstrapOBCluster)
// after obcluster bootstraped, return taskFlow to maintain obcluster after bootstrap
Expand Down Expand Up @@ -277,6 +284,8 @@ func (m *OBClusterManager) HandleFailure() {

func (m *OBClusterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tCheckMigration:
return m.CheckMigration, nil
case tCheckImageReady:
return m.CheckImageReady, nil
case tCheckClusterMode:
Expand Down
86 changes: 79 additions & 7 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -162,6 +161,7 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
independentVolumeAnnoVal, independentVolumeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsIndependentPVCLifecycle)
singlePVCAnnoVal, singlePVCAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSinglePVC)
modeAnnoVal, modeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsMode)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
for _, zone := range m.OBCluster.Spec.Topology {
zoneName := m.generateZoneName(zone.Zone)
zoneExists := false
Expand Down Expand Up @@ -208,6 +208,9 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
if modeAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsMode] = modeAnnoVal
}
if migrateAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("Create obzone", "zone", zoneName)
err := m.Client.Create(m.Ctx, obzone)
if err != nil {
Expand Down Expand Up @@ -430,9 +433,7 @@ func (m *OBClusterManager) ValidateUpgradeInfo() tasktypes.TaskError {
return errors.Wrapf(err, "Failed to get version of obcluster %s", m.OBCluster.Name)
}
// Get target version and patch
parts := strings.Split(uuid.New().String(), "-")
suffix := parts[len(parts)-1]
jobName := fmt.Sprintf("%s-%s", "oceanbase-upgrade", suffix)
jobName := fmt.Sprintf("%s-%s", "oceanbase-upgrade", rand.String(6))
var backoffLimit int32
var ttl int32 = 300
container := corev1.Container{
Expand Down Expand Up @@ -714,12 +715,10 @@ func (m *OBClusterManager) CreateServiceForMonitor() tasktypes.TaskError {
ownerReferenceList = append(ownerReferenceList, ownerReference)
selector := make(map[string]string)
selector[oceanbaseconst.LabelRefOBCluster] = m.OBCluster.Name
parts := strings.Split(uuid.New().String(), "-")
suffix := parts[len(parts)-1]
monitorService := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: m.OBCluster.Namespace,
Name: fmt.Sprintf("svc-monitor-%s-%s", m.OBCluster.Name, suffix),
Name: fmt.Sprintf("svc-monitor-%s-%s", m.OBCluster.Name, rand.String(6)),
OwnerReferences: ownerReferenceList,
},
Spec: corev1.ServiceSpec{
Expand Down Expand Up @@ -1011,3 +1010,76 @@ func (m *OBClusterManager) CheckClusterMode() tasktypes.TaskError {
}
return nil
}

func (m *OBClusterManager) CheckMigration() tasktypes.TaskError {
m.Logger.Info("Check before migration")
manager, err := m.getOceanbaseOperationManager()
if err != nil {
return errors.Wrap(err, "get operation manager")
}

// check version strictly matches
targetVersionStr, err := resourceutils.RunJob(m.Client, m.Logger, m.OBCluster.Namespace, fmt.Sprintf("%s-version", m.OBCluster.Name), m.OBCluster.Spec.OBServerTemplate.Image, oceanbaseconst.CmdVersion)
if err != nil {
return errors.Wrap(err, "get target oceanbase version")
}

sourceVersion, err := manager.GetVersion()
if err != nil {
return errors.Wrap(err, "get source oceanbase version")
}

if sourceVersion.String() != targetVersionStr {
return errors.Errorf("version mismatch source cluster: %s, target cluster: %s", sourceVersion.String(), targetVersionStr)
}

// check obzone matches topology
obzoneList, err := manager.ListZones()
if err != nil {
return errors.Wrap(err, "list obzones")
}
zoneMap := make(map[string]struct{})
for _, zone := range obzoneList {
zoneMap[zone.Name] = struct{}{}
}

extraZones := make([]string, 0)
for _, obzone := range m.OBCluster.Spec.Topology {
_, found := zoneMap[obzone.Zone]
if !found {
extraZones = append(extraZones, obzone.Zone)
} else {
delete(zoneMap, obzone.Zone)
}
}
if len(extraZones) > 0 {
return errors.Errorf("obzone %s defined but not in source cluster", strings.Join(extraZones, ","))
}

undefinedZones := make([]string, 0)
for zone := range zoneMap {
undefinedZones = append(undefinedZones, zone)
}
if len(undefinedZones) > 0 {
return errors.Errorf("obzone %s not defined in obcluster's topology", strings.Join(undefinedZones, ","))
}

// check obcluster name and id
obclusterNameParamList, err := manager.GetParameter(oceanbaseconst.ClusterNameParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster name failed")
}
obclusterName := obclusterNameParamList[0].Value
obclusterIdParamList, err := manager.GetParameter(oceanbaseconst.ClusterIdParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster id failed")
}
obclusterId := obclusterIdParamList[0].Value
if obclusterName != m.OBCluster.Spec.ClusterName {
return errors.Errorf("Cluster name mismatch, source cluster: %s, current: %s", obclusterName, m.OBCluster.Spec.ClusterName)
}
if obclusterId != fmt.Sprintf("%d", m.OBCluster.Spec.ClusterId) {
return errors.Errorf("Cluster id mismatch, source cluster: %s, current: %d", obclusterId, m.OBCluster.Spec.ClusterId)
}
return nil
}
1 change: 1 addition & 0 deletions internal/resource/obzone/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func init() {
// obzone
task.GetRegistry().Register(fCreateOBZone, CreateOBZone)
task.GetRegistry().Register(fMigrateOBZoneFromExisting, MigrateOBZoneFromExisting)
task.GetRegistry().Register(fAddOBServer, AddOBServer)
task.GetRegistry().Register(fDeleteOBServer, DeleteOBServer)
task.GetRegistry().Register(fPrepareOBZoneForBootstrap, PrepareOBZoneForBootstrap)
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/obzone/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

// obzone flows
const (
fMigrateOBZoneFromExisting ttypes.FlowName = "migrate obzone from existing"
fPrepareOBZoneForBootstrap ttypes.FlowName = "prepare obzone for bootstrap"
fMaintainOBZoneAfterBootstrap ttypes.FlowName = "maintain obzone after bootstrap"
fAddOBServer ttypes.FlowName = "add observer"
Expand All @@ -36,6 +37,7 @@ const (
tCreateOBServer ttypes.TaskName = "create observer"
tUpgradeOBServer ttypes.TaskName = "upgrade observer"
tWaitOBServerUpgraded ttypes.TaskName = "wait observer upgraded"
tDeleteLegacyOBServers ttypes.TaskName = "delete legacy observers"
tDeleteOBServer ttypes.TaskName = "delete observer"
tDeleteAllOBServer ttypes.TaskName = "delete all observer"
tAddZone ttypes.TaskName = "add zone"
Expand Down
10 changes: 10 additions & 0 deletions internal/resource/obzone/obzone_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func MigrateOBZoneFromExisting() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fMigrateOBZoneFromExisting,
Tasks: []tasktypes.TaskName{tCreateOBServer, tWaitOBServerRunning, tDeleteLegacyOBServers},
TargetStatus: zonestatus.Running,
},
}
}

func PrepareOBZoneForBootstrap() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Expand Down
Loading

0 comments on commit 0cec638

Please sign in to comment.