Skip to content

Commit

Permalink
Patch inter-cluster tenant restore (#315)
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI authored Apr 17, 2024
1 parent 0ea8672 commit 164664e
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 10 deletions.
1 change: 1 addition & 0 deletions internal/const/oceanbase/oceanbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const (
LabelTenantName = "oceanbase.oceanbase.com/tenant-name"
LabelSecondaryTenant = "oceanbase.oceanbase.com/secondary-tenant"
LabelBackupType = "oceanbase.oceanbase.com/backup-type"
LabelOBServerUID = "oceanbase.oceanbase.com/observer-uid"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/observer/observer_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func CreateOBServer() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fCreateOBServer,
Tasks: []tasktypes.TaskName{tCreateOBPVC, tCreateOBPod, tWaitOBServerReady, tAddServer, tWaitOBServerActiveInCluster},
Tasks: []tasktypes.TaskName{tCreateOBServerSvc, tCreateOBPVC, tCreateOBPod, tWaitOBServerReady, tAddServer, tWaitOBServerActiveInCluster},
TargetStatus: serverstatus.Running,
OnFailure: tasktypes.FailureRule{
NextTryStatus: "Failed",
Expand Down
8 changes: 6 additions & 2 deletions internal/resource/observer/observer_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ func (m *OBServerManager) CreateOBPod() tasktypes.TaskError {
ownerReferenceList = append(ownerReferenceList, ownerReference)
observerPodSpec := m.createOBPodSpec(obcluster)
// create pod
originLabels := m.OBServer.Labels
originLabels[oceanbaseconst.LabelOBServerUID] = string(m.OBServer.UID)
observerPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: m.OBServer.Name,
Namespace: m.OBServer.Namespace,
OwnerReferences: ownerReferenceList,
Labels: m.OBServer.Labels,
Labels: originLabels,
Annotations: annotations,
},
Spec: observerPodSpec,
Expand Down Expand Up @@ -877,7 +879,9 @@ func (m *OBServerManager) CreateOBServerSvc() tasktypes.TaskError {
}},
},
Spec: corev1.ServiceSpec{
Selector: m.OBServer.Labels,
Selector: map[string]string{
oceanbaseconst.LabelOBServerUID: string(m.OBServer.UID),
},
Ports: []corev1.ServicePort{{
Name: "sql",
Port: oceanbaseconst.SqlPort,
Expand Down
4 changes: 3 additions & 1 deletion internal/resource/obtenant/obtenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,9 @@ func (m *OBTenantManager) CreateEmptyStandbyTenant() tasktypes.TaskError {
if err != nil {
return err
}
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, con, m.OBTenant.Namespace, *m.OBTenant.Spec.Source.Tenant)
ns := m.OBTenant.GetNamespace()
tenantCRName := *m.OBTenant.Spec.Source.Tenant
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, ns, tenantCRName)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (m *ObTenantOperationManager) SetTenantLogRestoreSource() tasktypes.TaskErr
if err != nil {
return err
}
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, con, m.Resource.Namespace, m.Resource.Spec.Switchover.StandbyTenant)
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, m.Resource.Namespace, m.Resource.Spec.Switchover.StandbyTenant)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/resource/obtenantrestore/obtenantrestore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ func (m *ObTenantRestoreManager) checkRestoreProgress() error {
if restoreJob.Status == "SUCCESS" {
m.Recorder.Event(m.Resource, corev1.EventTypeNormal, "Restore job finished", "Restore job finished")
if m.Resource.Spec.RestoreRole == constants.TenantRoleStandby {
m.Resource.Status.Status = constants.RestoreJobStatusReplaying
if m.Resource.Spec.Source.ReplayEnabled {
// Only if replay is enabled start log replay
m.Resource.Status.Status = constants.RestoreJobStatusReplaying
} else {
m.Resource.Status.Status = constants.RestoreJobSuccessful
}
} else {
m.Resource.Status.Status = constants.RestoreJobStatusActivating
}
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/obtenantrestore/obtenantrestore_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *ObTenantRestoreManager) StartLogReplay() tasktypes.TaskError {
return err
}
if m.Resource.Spec.PrimaryTenant != nil {
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, con, m.Resource.Namespace, *m.Resource.Spec.PrimaryTenant)
restoreSource, err := resourceutils.GetTenantRestoreSource(m.Ctx, m.Client, m.Logger, m.Resource.Namespace, *m.Resource.Spec.PrimaryTenant)
if err != nil {
return err
}
Expand Down
26 changes: 23 additions & 3 deletions internal/resource/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,40 @@ func NeedAnnotation(pod *corev1.Pod, cni string) bool {
}
}

func GetTenantRestoreSource(ctx context.Context, clt client.Client, logger *logr.Logger, con *operation.OceanbaseOperationManager, ns, tenantCR string) (string, error) {
// GetTenantRestoreSource gets restore source from tenant CR. If tenantCR is in form of ns/name, the parameter ns is ignored.
func GetTenantRestoreSource(ctx context.Context, clt client.Client, logger *logr.Logger, ns, tenantCR string) (string, error) {
finalNs := ns
finalTenantCR := tenantCR
splits := strings.Split(tenantCR, "/")
if len(splits) == 2 {
finalNs = splits[0]
finalTenantCR = splits[1]
}
var restoreSource string
var err error

primary := &v1alpha1.OBTenant{}
err = clt.Get(ctx, types.NamespacedName{
Namespace: ns,
Name: tenantCR,
Namespace: finalNs,
Name: finalTenantCR,
}, primary)
if err != nil {
if client.IgnoreNotFound(err) != nil {
return "", err
}
} else {
obcluster := &v1alpha1.OBCluster{}
err := clt.Get(ctx, types.NamespacedName{
Namespace: finalNs,
Name: primary.Spec.ClusterName,
}, obcluster)
if err != nil {
return "", errors.Wrap(err, "get obcluster")
}
con, err := GetSysOperationClient(clt, logger, obcluster)
if err != nil {
return "", errors.Wrap(err, "get oceanbase operation manager")
}
// Get ip_list from primary tenant
aps, err := con.ListTenantAccessPoints(tenantCR)
if err != nil {
Expand Down

0 comments on commit 164664e

Please sign in to comment.