Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate obcluster #234

Merged
merged 10 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/obcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type OBClusterSpec struct {
// Important: Run "make" to regenerate code after modifying this file

ClusterName string `json:"clusterName"`
ClusterId int64 `json:"clusterId,omitempty"`
ClusterId int64 `json:"clusterId"`
OBServerTemplate *apitypes.OBServerTemplate `json:"observer"`
MonitorTemplate *apitypes.MonitorTemplate `json:"monitor,omitempty"`
BackupVolume *apitypes.BackupVolumeSpec `json:"backupVolume,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/obcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (r *OBCluster) validateMutation() error {
} else if r.Spec.Topology[0].Replica != 1 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("topology"), r.Spec.Topology, "standalone mode only support single replica"))
}
// validate migration
migrateAnnoVal, migrateAnnoExist := r.GetAnnotations()[oceanbaseconst.AnnotationsSourceClusterAddress]
if migrateAnnoExist {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("annotations").Child(oceanbaseconst.AnnotationsSourceClusterAddress), migrateAnnoVal, "migrate obcluster into standalone mode is not supported"))
}
}

// Validate userSecrets
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/oceanbase.oceanbase.com_obclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,8 @@ spec:
serviceAccount:
default: default
type: string
sourceOBClusterConnection:
chris-sun-star marked this conversation as resolved.
Show resolved Hide resolved
type: string
topology:
items:
properties:
Expand Down Expand Up @@ -2703,6 +2705,7 @@ spec:
- root
type: object
required:
- clusterId
- clusterName
- observer
- topology
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
9 changes: 9 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,8 @@ spec:
serviceAccount:
default: default
type: string
sourceOBClusterConnection:
type: string
topology:
items:
properties:
Expand Down Expand Up @@ -2726,6 +2728,7 @@ spec:
- root
type: object
required:
- clusterId
- clusterName
- observer
- topology
Expand Down Expand Up @@ -12038,6 +12041,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
11 changes: 11 additions & 0 deletions internal/const/oceanbase/oceanbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
GetConnectionMaxRetries = 10
CheckConnectionInterval = 3
CheckJobInterval = 3
CheckJobMaxRetries = 100
CommonCheckInterval = 5
)

Expand All @@ -76,6 +77,7 @@ const (
AnnotationsIndependentPVCLifecycle = "oceanbase.oceanbase.com/independent-pvc-lifecycle"
AnnotationsSinglePVC = "oceanbase.oceanbase.com/single-pvc"
AnnotationsMode = "oceanbase.oceanbase.com/mode"
AnnotationsSourceClusterAddress = "oceanbase.oceanbase.com/source-cluster-address"
)

const (
Expand Down Expand Up @@ -174,3 +176,12 @@ const (
const (
TolerateServerPodNotReadyMinutes = 5
)

const (
ClusterNameParam = "cluster"
ClusterIdParam = "cluster_id"
)

const (
CmdVersion = "rpm -q --queryformat '%{VERSION}-%{RELEASE}' oceanbase-ce | sed 's/\\.[^.]*$//'"
)
1 change: 1 addition & 0 deletions internal/const/status/obcluster/obcluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package obcluster

const (
New = "new"
MigrateFromExisting = "migrate from existing"
Running = "running"
AddOBZone = "add obzone"
DeleteOBZone = "delete obzone"
Expand Down
27 changes: 14 additions & 13 deletions internal/const/status/obzone/obzone_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package obcluster
package obzone

const (
New = "new"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
New = "new"
MigrateFromExisting = "migrate from existing"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
)
1 change: 1 addition & 0 deletions internal/controller/obcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type OBClusterReconciler struct {
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=update
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods/log,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
1 change: 1 addition & 0 deletions internal/resource/obcluster/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func init() {
// obcluster
task.GetRegistry().Register(fBootstrapOBCluster, BootstrapOBCluster)
task.GetRegistry().Register(fMigrateOBClusterFromExisting, MigrateOBClusterFromExisting)
task.GetRegistry().Register(fMaintainOBClusterAfterBootstrap, MaintainOBClusterAfterBootstrap)
task.GetRegistry().Register(fAddOBZone, AddOBZone)
task.GetRegistry().Register(fDeleteOBZone, DeleteOBZone)
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/obcluster/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

// obcluster flows
const (
fMigrateOBClusterFromExisting ttypes.FlowName = "migrate obcluster from existing"
fBootstrapOBCluster ttypes.FlowName = "bootstrap obcluster"
fMaintainOBClusterAfterBootstrap ttypes.FlowName = "maintain obcluster after bootstrap"
fAddOBZone ttypes.FlowName = "add obzone"
Expand All @@ -33,6 +34,7 @@ const (

// obcluster tasks
const (
tCheckMigration ttypes.TaskName = "check before migration"
tCheckImageReady ttypes.TaskName = "check image ready"
tCheckClusterMode ttypes.TaskName = "check cluster mode"
tCheckAndCreateUserSecrets ttypes.TaskName = "check and create user secrets"
Expand Down
13 changes: 13 additions & 0 deletions internal/resource/obcluster/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func MigrateOBClusterFromExisting() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fMigrateOBClusterFromExisting,
Tasks: []tasktypes.TaskName{tCheckMigration, tCheckImageReady, tCheckClusterMode, tCheckAndCreateUserSecrets, tCreateOBZone, tWaitOBZoneRunning, tCreateUsers, tMaintainOBParameter, tCreateServiceForMonitor, tCreateOBClusterService},
TargetStatus: clusterstatus.Running,
OnFailure: tasktypes.FailureRule{
NextTryStatus: clusterstatus.Failed,
},
},
}
}

func BootstrapOBCluster() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Expand Down
11 changes: 10 additions & 1 deletion internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ func (m *OBClusterManager) GetStatus() string {
func (m *OBClusterManager) InitStatus() {
m.Logger.Info("Newly created cluster, init status")
m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status")
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
initialStatus := clusterstatus.New
if migrateAnnoExist {
initialStatus = clusterstatus.MigrateFromExisting
}
status := v1alpha1.OBClusterStatus{
Image: m.OBCluster.Spec.OBServerTemplate.Image,
Status: clusterstatus.New,
Status: initialStatus,
OBZoneStatus: make([]apitypes.OBZoneReplicaStatus, 0, len(m.OBCluster.Spec.Topology)),
}
m.OBCluster.Status = status
Expand All @@ -80,6 +85,8 @@ func (m *OBClusterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("Create task flow according to obcluster status")
switch m.OBCluster.Status.Status {
// create obcluster, return taskFlow to bootstrap obcluster
case clusterstatus.MigrateFromExisting:
taskFlow, err = task.GetRegistry().Get(fMigrateOBClusterFromExisting)
case clusterstatus.New:
taskFlow, err = task.GetRegistry().Get(fBootstrapOBCluster)
// after obcluster bootstraped, return taskFlow to maintain obcluster after bootstrap
Expand Down Expand Up @@ -277,6 +284,8 @@ func (m *OBClusterManager) HandleFailure() {

func (m *OBClusterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tCheckMigration:
return m.CheckMigration, nil
case tCheckImageReady:
return m.CheckImageReady, nil
case tCheckClusterMode:
Expand Down
77 changes: 77 additions & 0 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
independentVolumeAnnoVal, independentVolumeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsIndependentPVCLifecycle)
singlePVCAnnoVal, singlePVCAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSinglePVC)
modeAnnoVal, modeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsMode)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
for _, zone := range m.OBCluster.Spec.Topology {
zoneName := m.generateZoneName(zone.Zone)
zoneExists := false
Expand Down Expand Up @@ -208,6 +209,9 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
if modeAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsMode] = modeAnnoVal
}
if migrateAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("Create obzone", "zone", zoneName)
err := m.Client.Create(m.Ctx, obzone)
if err != nil {
Expand Down Expand Up @@ -1011,3 +1015,76 @@ func (m *OBClusterManager) CheckClusterMode() tasktypes.TaskError {
}
return nil
}

func (m *OBClusterManager) CheckMigration() tasktypes.TaskError {
m.Logger.Info("Check before migration")
manager, err := m.getOceanbaseOperationManager()
if err != nil {
return errors.Wrap(err, "get operation manager")
}

// check version strictly matches
targetVersionStr, err := resourceutils.RunJob(m.Client, m.Logger, m.OBCluster.Namespace, fmt.Sprintf("%s-version", m.OBCluster.Name), m.OBCluster.Spec.OBServerTemplate.Image, oceanbaseconst.CmdVersion)
if err != nil {
return errors.Wrap(err, "get target oceanbase version")
}

sourceVersion, err := manager.GetVersion()
if err != nil {
return errors.Wrap(err, "get source oceanbase version")
}

if sourceVersion.String() != targetVersionStr {
return errors.Errorf("version mismatch source cluster: %s, target cluster: %s", sourceVersion.String(), targetVersionStr)
}

// check obzone matches topology
obzoneList, err := manager.ListZones()
if err != nil {
return errors.Wrap(err, "list obzones")
}
zoneMap := make(map[string]struct{})
for _, zone := range obzoneList {
zoneMap[zone.Name] = struct{}{}
}

extraZones := make([]string, 0)
for _, obzone := range m.OBCluster.Spec.Topology {
_, found := zoneMap[obzone.Zone]
if !found {
extraZones = append(extraZones, obzone.Zone)
} else {
delete(zoneMap, obzone.Zone)
}
}
if len(extraZones) > 0 {
return errors.Errorf("obzone %s defined but not in source cluster", strings.Join(extraZones, ","))
}

undefinedZones := make([]string, 0)
for zone := range zoneMap {
undefinedZones = append(undefinedZones, zone)
}
if len(undefinedZones) > 0 {
return errors.Errorf("obzone %s not defined in obcluster's topology", strings.Join(undefinedZones, ","))
}

// check obcluster name and id
obclusterNameParamList, err := manager.GetParameter(oceanbaseconst.ClusterNameParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster name failed")
}
obclusterName := obclusterNameParamList[0].Value
obclusterIdParamList, err := manager.GetParameter(oceanbaseconst.ClusterIdParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster id failed")
}
obclusterId := obclusterIdParamList[0].Value
if obclusterName != m.OBCluster.Spec.ClusterName {
return errors.Errorf("Cluster name mismatch, source cluster: %s, current: %s", obclusterName, m.OBCluster.Spec.ClusterName)
}
if obclusterId != fmt.Sprintf("%d", m.OBCluster.Spec.ClusterId) {
return errors.Errorf("Cluster id mismatch, source cluster: %s, current: %d", obclusterId, m.OBCluster.Spec.ClusterId)
}
return nil
}
1 change: 1 addition & 0 deletions internal/resource/obzone/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func init() {
// obzone
task.GetRegistry().Register(fCreateOBZone, CreateOBZone)
task.GetRegistry().Register(fMigrateOBZoneFromExisting, MigrateOBZoneFromExisting)
task.GetRegistry().Register(fAddOBServer, AddOBServer)
task.GetRegistry().Register(fDeleteOBServer, DeleteOBServer)
task.GetRegistry().Register(fPrepareOBZoneForBootstrap, PrepareOBZoneForBootstrap)
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/obzone/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

// obzone flows
const (
fMigrateOBZoneFromExisting ttypes.FlowName = "migrate obzone from existing"
fPrepareOBZoneForBootstrap ttypes.FlowName = "prepare obzone for bootstrap"
fMaintainOBZoneAfterBootstrap ttypes.FlowName = "maintain obzone after bootstrap"
fAddOBServer ttypes.FlowName = "add observer"
Expand All @@ -36,6 +37,7 @@ const (
tCreateOBServer ttypes.TaskName = "create observer"
tUpgradeOBServer ttypes.TaskName = "upgrade observer"
tWaitOBServerUpgraded ttypes.TaskName = "wait observer upgraded"
tDeleteLegacyOBServers ttypes.TaskName = "delete legacy observers"
tDeleteOBServer ttypes.TaskName = "delete observer"
tDeleteAllOBServer ttypes.TaskName = "delete all observer"
tAddZone ttypes.TaskName = "add zone"
Expand Down
10 changes: 10 additions & 0 deletions internal/resource/obzone/obzone_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ import (
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

func MigrateOBZoneFromExisting() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fMigrateOBZoneFromExisting,
Tasks: []tasktypes.TaskName{tCreateOBServer, tWaitOBServerRunning, tDeleteLegacyOBServers},
TargetStatus: zonestatus.Running,
},
}
}

func PrepareOBZoneForBootstrap() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Expand Down
Loading
Loading