Skip to content

Commit

Permalink
Feature: Modify configurations of observers w/o stopping running clus…
Browse files Browse the repository at this point in the history
…ter (#406)
  • Loading branch information
powerfooI authored May 24, 2024
1 parent 16a82dc commit 875e41f
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 100 deletions.
29 changes: 15 additions & 14 deletions internal/const/status/obcluster/obcluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ See the Mulan PSL v2 for more details.
package obcluster

const (
New = "new"
MigrateFromExisting = "migrate from existing"
Running = "running"
AddOBZone = "add obzone"
DeleteOBZone = "delete obzone"
ModifyOBZoneReplica = "modify obzone replica"
Upgrade = "upgrade"
ModifyOBParameter = "modify parameter"
Bootstrapped = "bootstrapped"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
Failed = "failed"
MountBackupVolume = "mount backup volume"
New = "new"
MigrateFromExisting = "migrate from existing"
Running = "running"
AddOBZone = "add obzone"
DeleteOBZone = "delete obzone"
ModifyOBZoneReplica = "modify obzone replica"
Upgrade = "upgrade"
ModifyOBParameter = "modify parameter"
Bootstrapped = "bootstrapped"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
Failed = "failed"
MountBackupVolume = "mount backup volume"
RollingUpdateOBServers = "rolling update observers"
)
27 changes: 14 additions & 13 deletions internal/const/status/obzone/obzone_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ See the Mulan PSL v2 for more details.
package obzone

const (
New = "new"
MigrateFromExisting = "migrate from existing"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
New = "new"
MigrateFromExisting = "migrate from existing"
Maintaining = "maintaining"
Running = "running"
AddOBServer = "add observer"
DeleteOBServer = "delete observer"
Deleting = "deleting"
Upgrade = "upgrade"
BootstrapReady = "bootstrap ready"
FinalizerFinished = "finalizer finished"
ScaleUp = "scale up"
ExpandPVC = "expand pvc"
MountBackupVolume = "mount backup volume"
RollingUpdateServers = "rolling update servers"
)
2 changes: 2 additions & 0 deletions internal/resource/obcluster/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
fScaleUpOBZones ttypes.FlowName = "scale up obzones"
fExpandPVC ttypes.FlowName = "expand pvc for obcluster"
fMountBackupVolume ttypes.FlowName = "mount backup volume for obcluster"
fRollingUpdateOBServers ttypes.FlowName = "rolling update observers"
)

// obcluster tasks
Expand Down Expand Up @@ -65,4 +66,5 @@ const (
tMountBackupVolume ttypes.TaskName = "mount backup volume"
tCheckEnvironment ttypes.TaskName = "check environment"
tAnnotateOBCluster ttypes.TaskName = "annotate obcluster"
tRollingUpdateOBZones ttypes.TaskName = "rolling update observers"
)
12 changes: 12 additions & 0 deletions internal/resource/obcluster/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,15 @@ func genMountBackupVolumeFlow(_ *OBClusterManager) *tasktypes.TaskFlow {
},
}
}

func genRollingUpdateOBZonesFlow(_ *OBClusterManager) *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fRollingUpdateOBServers,
Tasks: []tasktypes.TaskName{
tRollingUpdateOBZones,
},
TargetStatus: clusterstatus.Running,
},
}
}
6 changes: 6 additions & 0 deletions internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (m *OBClusterManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
taskFlow = genExpandPVCFlow(m)
case clusterstatus.MountBackupVolume:
taskFlow = genMountBackupVolumeFlow(m)
case clusterstatus.RollingUpdateOBServers:
taskFlow = genRollingUpdateOBZonesFlow(m)
default:
m.Logger.V(oceanbaseconst.LogLevelTrace).Info("No need to run anything for obcluster", "obcluster", m.OBCluster.Name)
return nil, nil
Expand Down Expand Up @@ -186,6 +188,10 @@ func (m *OBClusterManager) UpdateStatus() error {
m.OBCluster.Status.Status = clusterstatus.ScaleUp
break outer
}
if m.checkIfStorageClassChange(&obzone) {
m.OBCluster.Status.Status = clusterstatus.RollingUpdateOBServers
break outer
}
if m.checkIfStorageSizeExpand(&obzone) {
m.OBCluster.Status.Status = clusterstatus.ExpandPVC
break outer
Expand Down
4 changes: 4 additions & 0 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,10 @@ func WaitOBZoneRunning(m *OBClusterManager) tasktypes.TaskError {
return m.generateWaitOBZoneStatusFunc(zonestatus.Running, obcfg.GetConfig().Time.DefaultStateWaitTimeout)()
}

func RollingUpdateOBZones(m *OBClusterManager) tasktypes.TaskError {
return m.rollingUpdateZones(m.changeZonesWhenUpdatingOBServers, zonestatus.RollingUpdateServers, zonestatus.Running, obcfg.GetConfig().Time.ServerDeleteTimeoutSeconds)()
}

func CheckEnvironment(m *OBClusterManager) tasktypes.TaskError {
volumeName := m.OBCluster.Name + "check-clog-volume-" + rand.String(6)
claimName := m.OBCluster.Name + "check-clog-claim-" + rand.String(6)
Expand Down
1 change: 1 addition & 0 deletions internal/resource/obcluster/obcluster_task_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions internal/resource/obcluster/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ import (
)

func (m *OBClusterManager) checkIfStorageSizeExpand(obzone *v1alpha1.OBZone) bool {
return obzone.Spec.OBServerTemplate.Storage.DataStorage.Size.Cmp(m.OBCluster.Spec.OBServerTemplate.Storage.DataStorage.Size) < 0 ||
obzone.Spec.OBServerTemplate.Storage.LogStorage.Size.Cmp(m.OBCluster.Spec.OBServerTemplate.Storage.LogStorage.Size) < 0 ||
obzone.Spec.OBServerTemplate.Storage.RedoLogStorage.Size.Cmp(m.OBCluster.Spec.OBServerTemplate.Storage.RedoLogStorage.Size) < 0
newStorage := m.OBCluster.Spec.OBServerTemplate.Storage
oldStorage := obzone.Spec.OBServerTemplate.Storage
return oldStorage.DataStorage.Size.Cmp(newStorage.DataStorage.Size) < 0 ||
oldStorage.LogStorage.Size.Cmp(newStorage.LogStorage.Size) < 0 ||
oldStorage.RedoLogStorage.Size.Cmp(newStorage.RedoLogStorage.Size) < 0
}

func (m *OBClusterManager) checkIfStorageClassChange(obzone *v1alpha1.OBZone) bool {
newStorage := m.OBCluster.Spec.OBServerTemplate.Storage
oldStorage := obzone.Spec.OBServerTemplate.Storage
return oldStorage.DataStorage.StorageClass != newStorage.DataStorage.StorageClass ||
oldStorage.LogStorage.StorageClass != newStorage.LogStorage.StorageClass ||
oldStorage.RedoLogStorage.StorageClass != newStorage.RedoLogStorage.StorageClass
}

func (m *OBClusterManager) checkIfCalcResourceChange(obzone *v1alpha1.OBZone) bool {
Expand Down Expand Up @@ -172,6 +182,10 @@ func (m *OBClusterManager) changeZonesWhenExpandingPVC(obzone *v1alpha1.OBZone)
obzone.Spec.OBServerTemplate.Storage.RedoLogStorage.Size = m.OBCluster.Spec.OBServerTemplate.Storage.RedoLogStorage.Size
}

func (m *OBClusterManager) changeZonesWhenUpdatingOBServers(obzone *v1alpha1.OBZone) {
obzone.Spec.OBServerTemplate = m.OBCluster.Spec.OBServerTemplate
}

func (m *OBClusterManager) changeZonesWhenMountingBackupVolume(obzone *v1alpha1.OBZone) {
obzone.Spec.BackupVolume = m.OBCluster.Spec.BackupVolume
}
Expand Down
6 changes: 0 additions & 6 deletions internal/resource/observer/observer_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,6 @@ func WaitOBServerPodReady(m *OBServerManager) tasktypes.TaskError {
}

func WaitOBServerActiveInCluster(m *OBServerManager) tasktypes.TaskError {
if m.OBServer.SupportStaticIP() {
return nil
}
m.Logger.Info("Wait for observer to be active in cluster")
observerInfo := &model.ServerInfo{
Ip: m.OBServer.Status.GetConnectAddr(),
Expand Down Expand Up @@ -347,9 +344,6 @@ func WaitOBServerActiveInCluster(m *OBServerManager) tasktypes.TaskError {
}

func WaitOBServerDeletedInCluster(m *OBServerManager) tasktypes.TaskError {
if m.OBServer.SupportStaticIP() {
return nil
}
m.Logger.Info("Wait for observer to be deleted in cluster")
observerInfo := &model.ServerInfo{
Ip: m.OBServer.Status.GetConnectAddr(),
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/obzone/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
fScaleUpOBServers ttypes.FlowName = "scale up observers"
fExpandPVC ttypes.FlowName = "expand pvc for obzone"
fMountBackupVolume ttypes.FlowName = "mount backup volume for obzone"
fRollingUpdateOBServers ttypes.FlowName = "rolling update observers"
)

// obzone tasks
Expand All @@ -56,4 +57,5 @@ const (
tWaitForOBServerExpandingPVC ttypes.TaskName = "wait for observer to expand pvc"
tMountBackupVolume ttypes.TaskName = "mount backup volume"
tWaitForOBServerMounting ttypes.TaskName = "wait for observer to mount backup volume"
tRollingReplaceOBServers ttypes.TaskName = "rolling replace observers"
)
12 changes: 11 additions & 1 deletion internal/resource/obzone/obzone_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func genScaleUpOBServersFlow(_ *OBZoneManager) *tasktypes.TaskFlow {
}
}

func FlowExpandPVC(_ *OBZoneManager) *tasktypes.TaskFlow {
func genFlowExpandPVC(_ *OBZoneManager) *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fExpandPVC,
Expand All @@ -140,3 +140,13 @@ func genMountBackupVolumeFlow(_ *OBZoneManager) *tasktypes.TaskFlow {
},
}
}

func genRollingReplaceServersFlow(_ *OBZoneManager) *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fRollingUpdateOBServers,
Tasks: []tasktypes.TaskName{tRollingReplaceOBServers},
TargetStatus: zonestatus.Running,
},
}
}
9 changes: 7 additions & 2 deletions internal/resource/obzone/obzone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ func (m *OBZoneManager) GetTaskFlow() (*tasktypes.TaskFlow, error) {
case zonestatus.ScaleUp:
taskFlow = genScaleUpOBServersFlow(m)
case zonestatus.ExpandPVC:
taskFlow = FlowExpandPVC(m)
taskFlow = genFlowExpandPVC(m)
case zonestatus.MountBackupVolume:
taskFlow = genMountBackupVolumeFlow(m)
case zonestatus.RollingUpdateServers:
taskFlow = genRollingReplaceServersFlow(m)
case zonestatus.Upgrade:
obcluster, err = m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -224,6 +226,10 @@ func (m *OBZoneManager) UpdateStatus() error {
m.OBZone.Status.Status = zonestatus.ScaleUp
break
}
if m.checkIfStorageClassChanged(&observer) {
m.OBZone.Status.Status = zonestatus.RollingUpdateServers
break
}
if m.checkIfStorageSizeExpand(&observer) {
m.OBZone.Status.Status = zonestatus.ExpandPVC
break
Expand All @@ -235,7 +241,6 @@ func (m *OBZoneManager) UpdateStatus() error {
}
}

// TODO resource change require pod restart, and since oceanbase is a distributed system, resource can be scaled by add more servers
if m.OBZone.Status.Status == zonestatus.Running {
if m.OBZone.Status.Image != m.OBZone.Spec.OBServerTemplate.Image {
m.Logger.Info("Found image changed, need upgrade")
Expand Down
108 changes: 47 additions & 61 deletions internal/resource/obzone/obzone_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"fmt"
"time"

kubeerrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
Expand Down Expand Up @@ -54,76 +55,18 @@ func StartOBZone(m *OBZoneManager) tasktypes.TaskError {

func CreateOBServer(m *OBZoneManager) tasktypes.TaskError {
m.Logger.Info("Create observers")
blockOwnerDeletion := true
ownerReferenceList := make([]metav1.OwnerReference, 0)
ownerReference := metav1.OwnerReference{
APIVersion: m.OBZone.APIVersion,
Kind: m.OBZone.Kind,
Name: m.OBZone.Name,
UID: m.OBZone.GetUID(),
BlockOwnerDeletion: &blockOwnerDeletion,
}
ownerReferenceList = append(ownerReferenceList, ownerReference)
currentReplica := 0
for _, observerStatus := range m.OBZone.Status.OBServerStatus {
if observerStatus.Status != serverstatus.Unrecoverable {
currentReplica++
}
}
independentVolumeAnnoVal, independentVolumeAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsIndependentPVCLifecycle)
singlePVCAnnoVal, singlePVCAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSinglePVC)
modeAnnoVal, modeAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsMode)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSourceClusterAddress)
for i := currentReplica; i < m.OBZone.Spec.Topology.Replica; i++ {
serverName := m.generateServerName()
finalizerName := oceanbaseconst.FinalizerDeleteOBServer
finalizers := []string{finalizerName}
labels := make(map[string]string)
cluster, _ := m.OBZone.Labels[oceanbaseconst.LabelRefOBCluster]
labels[oceanbaseconst.LabelRefUID] = string(m.OBZone.GetUID())
labels[oceanbaseconst.LabelRefOBZone] = m.OBZone.Name
labels[oceanbaseconst.LabelRefOBCluster] = cluster
observer := &v1alpha1.OBServer{
ObjectMeta: metav1.ObjectMeta{
Name: serverName,
Namespace: m.OBZone.Namespace,
OwnerReferences: ownerReferenceList,
Finalizers: finalizers,
Labels: labels,
},
Spec: v1alpha1.OBServerSpec{
ClusterName: m.OBZone.Spec.ClusterName,
ClusterId: m.OBZone.Spec.ClusterId,
Zone: m.OBZone.Spec.Topology.Zone,
NodeSelector: m.OBZone.Spec.Topology.NodeSelector,
Affinity: m.OBZone.Spec.Topology.Affinity,
Tolerations: m.OBZone.Spec.Topology.Tolerations,
OBServerTemplate: m.OBZone.Spec.OBServerTemplate,
MonitorTemplate: m.OBZone.Spec.MonitorTemplate,
BackupVolume: m.OBZone.Spec.BackupVolume,
ServiceAccount: m.OBZone.Spec.ServiceAccount,
},
}
observer.ObjectMeta.Annotations = make(map[string]string)
if independentVolumeAnnoExist {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsIndependentPVCLifecycle] = independentVolumeAnnoVal
}
if singlePVCAnnoExist {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSinglePVC] = singlePVCAnnoVal
}
if modeAnnoExist {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsMode] = modeAnnoVal
}
if migrateAnnoExist {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("Create observer", "server", serverName)
err := m.Client.Create(m.Ctx, observer)
_, err := m.createOneOBServer(serverName)
if err != nil {
m.Logger.Error(err, "Create observer failed", "server", serverName)
return errors.Wrap(err, "create observer")
return errors.Wrapf(err, "Create observer %s", serverName)
}
m.Recorder.Event(m.OBZone, "CreateObServer", "CreateObserver", fmt.Sprintf("Create observer %s", serverName))
}
return nil
}
Expand Down Expand Up @@ -426,3 +369,46 @@ func WaitForOBServerExpandingPVC(m *OBZoneManager) tasktypes.TaskError {
func WaitForOBServerMounting(m *OBZoneManager) tasktypes.TaskError {
return m.generateWaitOBServerStatusFunc(serverstatus.MountBackupVolume, obcfg.GetConfig().Time.DefaultStateWaitTimeout)()
}

func RollingReplaceOBServers(m *OBZoneManager) tasktypes.TaskError {
servers, err := m.listOBServers()
if err != nil {
return errors.Wrap(err, "List observers")
}
for _, server := range servers.Items {
newServerName := m.generateServerName()
newServer, err := m.createOneOBServer(newServerName)
if err != nil {
return errors.Wrap(err, "Create new observer to replace old one")
}
for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ {
time.Sleep(time.Second)
err = m.Client.Get(m.Ctx, m.generateNamespacedName(newServerName), newServer)
if err != nil {
return errors.Wrap(err, "Get new observer")
}
if newServer.Status.Status == serverstatus.Running {
break
}
}
if newServer.Status.Status != serverstatus.Running {
return errors.New("Wait for new observer get running status, timeout")
}
err = m.Client.Delete(m.Ctx, &server)
if err != nil {
return errors.Wrap(err, "Delete old observer")
}
for i := 0; i < obcfg.GetConfig().Time.DefaultStateWaitTimeout; i++ {
time.Sleep(time.Second)
oldServer := &v1alpha1.OBServer{}
err = m.Client.Get(m.Ctx, m.generateNamespacedName(server.Name), oldServer)
if err != nil {
if kubeerrors.IsNotFound(err) {
break
}
return errors.Wrap(err, "Get old observer")
}
}
}
return nil
}
Loading

0 comments on commit 875e41f

Please sign in to comment.