Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add data engine option to restore backing image api #3392

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions api/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (

"github.com/gorilla/mux"
"github.com/pkg/errors"

"github.com/rancher/go-rancher/api"
"github.com/rancher/go-rancher/client"

"github.com/longhorn/longhorn-manager/types"
)

func (s *Server) backupBackingImageList(apiContext *api.ApiContext) (*client.GenericCollection, error) {
Expand Down Expand Up @@ -56,7 +57,7 @@ func (s *Server) BackupBackingImageRestore(w http.ResponseWriter, req *http.Requ
}

backupBackingImageName := mux.Vars(req)["name"]
if err := s.m.RestoreBackupBackingImage(backupBackingImageName, input.Secret, input.SecretNamespace); err != nil {
if err := s.m.RestoreBackupBackingImage(backupBackingImageName, input.Secret, input.SecretNamespace, input.DataEngine); err != nil {
return errors.Wrapf(err, "failed to restore backup backing image '%s'", backupBackingImageName)
}
return nil
Expand All @@ -71,8 +72,9 @@ func (s *Server) BackupBackingImageCreate(w http.ResponseWriter, req *http.Reque
}

backingImageName := mux.Vars(req)["name"]
if err := s.m.CreateBackupBackingImage(input.Name, backingImageName, input.BackupTargetName); err != nil {
return errors.Wrapf(err, "failed to create backup backing image '%s'", input.Name)
backupBackingImageName := types.GetBackupBackingImageNameFromBIName(backingImageName)
mantissahz marked this conversation as resolved.
Show resolved Hide resolved
if err := s.m.CreateBackupBackingImage(backupBackingImageName, backingImageName, input.BackupTargetName); err != nil {
return errors.Wrapf(err, "failed to create backup backing image '%s'", backupBackingImageName)
}
return nil
}
1 change: 1 addition & 0 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ type UpdateMinNumberOfCopiesInput struct {
type BackingImageRestoreInput struct {
Secret string `json:"secret"`
SecretNamespace string `json:"secretNamespace"`
DataEngine string `json:"dataEngine"`
}

type AttachInput struct {
Expand Down
2 changes: 2 additions & 0 deletions client/generated_backing_image_restore_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
type BackingImageRestoreInput struct {
Resource `yaml:"-"`

DataEngine string `json:"dataEngine,omitempty" yaml:"data_engine,omitempty"`

derekbit marked this conversation as resolved.
Show resolved Hide resolved
Secret string `json:"secret,omitempty" yaml:"secret,omitempty"`

SecretNamespace string `json:"secretNamespace,omitempty" yaml:"secret_namespace,omitempty"`
Expand Down
4 changes: 4 additions & 0 deletions controller/system_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,158 +320,158 @@
record.message = message
}

func (c *SystemBackupController) reconcile(name string, backupTargetClient engineapi.SystemBackupOperationInterface, backupTarget *longhorn.BackupTarget) (err error) {
systemBackup, err := c.ds.GetSystemBackup(name)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
return nil
}

if !c.isResponsibleFor(systemBackup) {
return nil
}

log := getLoggerForSystemBackup(c.logger, systemBackup)

if systemBackup.Status.OwnerID != c.controllerID {
systemBackup.Status.OwnerID = c.controllerID
systemBackup, err = c.ds.UpdateSystemBackupStatus(systemBackup)
if err != nil {
// we don't mind others coming first
if apierrors.IsConflict(errors.Cause(err)) {
return nil
}
return err
}

log.Infof("System backup got new owner %v", c.controllerID)
}

record := &systemBackupRecord{}
existingSystemBackup := systemBackup.DeepCopy()
defer c.handleStatusUpdate(record, systemBackup, existingSystemBackup, err, log)

if !systemBackup.DeletionTimestamp.IsZero() &&
systemBackup.Status.State != longhorn.SystemBackupStateDeleting {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeNormal, longhorn.SystemBackupStateDeleting,
constant.EventReasonDeleting, SystemBackupMsgDeletingRemote,
)
return
}

tempBackupArchivePath := filepath.Join(SystemBackupTempDir, systemBackup.Name+types.SystemBackupExtension)
tempBackupDir := filepath.Join(SystemBackupTempDir, systemBackup.Name)

switch systemBackup.Status.State {
case longhorn.SystemBackupStateSyncing:
err = syncSystemBackupFromBackupTarget(systemBackup, backupTargetClient)
if err != nil {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeError, longhorn.SystemBackupStateError,
longhorn.SystemBackupConditionReasonSync, SystemBackupErrSync,
)
return err
}

c.updateSystemBackupRecord(record,
systemBackupRecordTypeNormal, longhorn.SystemBackupStateReady,
constant.EventReasonSynced, SystemBackupMsgSyncedBackupTarget,
)

case longhorn.SystemBackupStateNone:
// Sync the ready SystemBackups that are not in the current cluster.
if isSystemBackupFromRemoteBackupTarget(systemBackup) {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeNormal, longhorn.SystemBackupStateSyncing,
constant.EventReasonSyncing, SystemBackupMsgSyncingBackupTarget,
)

return
}

var longhornVersion string
longhornVersion, err = getSystemBackupVersionExistInRemoteBackupTarget(systemBackup, backupTargetClient)
if err != nil {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeError, longhorn.SystemBackupStateError,
constant.EventReasonStart, err.Error(),
)
return nil
}

if longhornVersion != "" {
if err := datastore.LabelSystemBackupVersion(longhornVersion, systemBackup); err != nil {
return err
}

_, err = c.ds.UpdateSystemBackup(systemBackup)
if err != nil {
return err
}
return
}

err = c.InitSystemBackup(systemBackup, log)
if err != nil {
return err
}

c.updateSystemBackupRecord(record,
systemBackupRecordTypeNormal, longhorn.SystemBackupStateVolumeBackup,
constant.EventReasonStart, SystemBackupMsgStarting,
)

case longhorn.SystemBackupStateVolumeBackup:
backups, err := c.BackupVolumes(systemBackup)
if err != nil {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeError, longhorn.SystemBackupStateError,
constant.EventReasonStart, err.Error(),
)
return nil
}

// TODO: handle error check
go c.WaitForVolumeBackupToComplete(backups, systemBackup) // nolint: errcheck

case longhorn.SystemBackupStateBackingImageBackup:
backupBackingImages, err := c.BackupBackingImage()
if err != nil {
c.updateSystemBackupRecord(record,
systemBackupRecordTypeError, longhorn.SystemBackupStateError,
constant.EventReasonStart, err.Error(),
)
return nil
}

go c.WaitForBackingImageBackupToComplete(backupBackingImages, systemBackup)

case longhorn.SystemBackupStateGenerating:
go c.GenerateSystemBackup(systemBackup, tempBackupArchivePath, tempBackupDir)

case longhorn.SystemBackupStateUploading:
go c.UploadSystemBackup(systemBackup, tempBackupArchivePath, tempBackupDir, backupTargetClient)

case longhorn.SystemBackupStateReady, longhorn.SystemBackupStateError:
cleanupLocalSystemBackupFiles(tempBackupArchivePath, tempBackupDir, log)

case longhorn.SystemBackupStateDeleting:
cleanupRemoteSystemBackupFiles(systemBackup, backupTargetClient, backupTarget, log)

cleanupLocalSystemBackupFiles(tempBackupArchivePath, tempBackupDir, log)

err = c.ds.RemoveFinalizerForSystemBackup(systemBackup)
if err != nil {
return err
}
}

return nil
}

Check notice on line 474 in controller/system_backup_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/system_backup_controller.go#L323-L474

Complex Method
func getSystemBackupVersionExistInRemoteBackupTarget(systemBackup *longhorn.SystemBackup, backupTargetClient engineapi.SystemBackupOperationInterface) (string, error) {
systemBackupsInBackupstore, err := backupTargetClient.ListSystemBackup()
if err != nil {
Expand Down Expand Up @@ -987,6 +987,10 @@

backingImageBackups := make(map[string]*longhorn.BackupBackingImage, len(backingImages))
for _, backingImage := range backingImages {
// TODO: support backup backing image v2
if types.IsDataEngineV2(backingImage.Spec.DataEngine) {
continue
}
backupBackingImage, err := c.createBackingImageBackup(backingImage)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions manager/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func (m *VolumeManager) DeleteBackupBackingImage(name string) error {
return m.ds.DeleteBackupBackingImage(name)
}

func (m *VolumeManager) RestoreBackupBackingImage(name string, secret, secretNamespace string) error {
if name == "" {
return fmt.Errorf("restore backing image name is not given")
func (m *VolumeManager) RestoreBackupBackingImage(name, secret, secretNamespace, dataEngine string) error {
if name == "" || dataEngine == "" {
return fmt.Errorf("missing parameters for restoring backing image, name=%v dataEngine=%v", name, dataEngine)
}

bbi, err := m.ds.GetBackupBackingImageRO(name)
Expand All @@ -62,7 +62,7 @@ func (m *VolumeManager) RestoreBackupBackingImage(name string, secret, secretNam
return fmt.Errorf("backing image %v already exists", biName)
}

return m.restoreBackingImage(bbi.Spec.BackupTargetName, biName, secret, secretNamespace)
return m.restoreBackingImage(bbi.Spec.BackupTargetName, biName, secret, secretNamespace, dataEngine)
}

func (m *VolumeManager) CreateBackupBackingImage(name, backingImageName, backupTargetName string) error {
Expand Down
18 changes: 15 additions & 3 deletions manager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
// TODO: We should record the secret and secret namespace in the backing image
// so we can auto fill in the secret and namespace when auto restore the backing image in volume creation api.
// Currently, if the backing image is encrypted, users need to restore it first so they can specifically assign the secret and namespace
if err := m.restoreBackingImage(backupTargetName, spec.BackingImage, "", ""); err != nil {
if err := m.restoreBackingImage(backupTargetName, spec.BackingImage, "", "", string(spec.DataEngine)); err != nil {
return nil, errors.Wrapf(err, "failed to restore backing image %v when create volume %v", spec.BackingImage, name)
}

Expand Down Expand Up @@ -322,72 +322,72 @@
return v, nil
}

func (m *VolumeManager) Salvage(volumeName string, replicaNames []string) (v *longhorn.Volume, err error) {
defer func() {
err = errors.Wrapf(err, "unable to salvage volume %v", volumeName)
}()

v, err = m.ds.GetVolume(volumeName)
if err != nil {
return nil, err
}
if v.Status.State != longhorn.VolumeStateDetached {
return nil, fmt.Errorf("invalid volume state to salvage: %v", v.Status.State)
}
if v.Status.Robustness != longhorn.VolumeRobustnessFaulted {
return nil, fmt.Errorf("invalid robustness state to salvage: %v", v.Status.Robustness)
}
v.Spec.NodeID = ""
v, err = m.ds.UpdateVolume(v)
if err != nil {
return nil, err
}

for _, name := range replicaNames {
r, err := m.ds.GetReplica(name)
if err != nil {
return nil, err
}
if r.Spec.VolumeName != v.Name {
return nil, fmt.Errorf("replica %v doesn't belong to volume %v", r.Name, v.Name)
}
isDownOrDeleted, err := m.ds.IsNodeDownOrDeletedOrDelinquent(r.Spec.NodeID, v.Name)
if err != nil {
return nil, fmt.Errorf("failed to check if the related node %v is still running for replica %v", r.Spec.NodeID, name)
}
if isDownOrDeleted {
return nil, fmt.Errorf("unable to check if the related node %v is down or deleted for replica %v", r.Spec.NodeID, name)
}
node, err := m.ds.GetNode(r.Spec.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to get the related node %v for replica %v", r.Spec.NodeID, name)
}
diskSchedulable := false
for _, diskStatus := range node.Status.DiskStatus {
if diskStatus.DiskUUID == r.Spec.DiskID {
if types.GetCondition(diskStatus.Conditions, longhorn.DiskConditionTypeSchedulable).Status == longhorn.ConditionStatusTrue {
diskSchedulable = true
break
}
}
}
if !diskSchedulable {
return nil, fmt.Errorf("disk with UUID %v on node %v is unschedulable for replica %v", r.Spec.DiskID, r.Spec.NodeID, name)
}
if r.Spec.FailedAt == "" {
// already updated, ignore it for idempotency
continue
}
r.Spec.FailedAt = ""
if _, err := m.ds.UpdateReplica(r); err != nil {
return nil, err
}
}

logrus.Infof("Salvaged replica %+v for volume %v", replicaNames, v.Name)
return v, nil
}

Check notice on line 390 in manager/volume.go

View check run for this annotation

codefactor.io / CodeFactor

manager/volume.go#L325-L390

Complex Method
func (m *VolumeManager) Activate(volumeName string, frontend string) (v *longhorn.Volume, err error) {
defer func() {
err = errors.Wrapf(err, "unable to activate volume %v", volumeName)
Expand Down Expand Up @@ -760,92 +760,92 @@
return nodeIPMap, nil
}

func (m *VolumeManager) EngineUpgrade(volumeName, image string) (v *longhorn.Volume, err error) {
defer func() {
err = errors.Wrapf(err, "cannot upgrade engine for volume %v using image %v", volumeName, image)
}()

// Only allow to upgrade to the default engine image if the setting `Automatically upgrade volumes' engine to the default engine image` is enabled
concurrentAutomaticEngineUpgradePerNodeLimit, err := m.ds.GetSettingAsInt(types.SettingNameConcurrentAutomaticEngineUpgradePerNodeLimit)
if err != nil {
return nil, err
}
if concurrentAutomaticEngineUpgradePerNodeLimit > 0 {
defaultEngineImage, err := m.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage)
if err != nil {
return nil, err
}
if image != defaultEngineImage {
return nil, fmt.Errorf("upgrading to %v is not allowed. "+
"Only allow to upgrade to the default engine image %v because the setting "+
"`Concurrent Automatic Engine Upgrade Per Node Limit` is greater than 0",
image, defaultEngineImage)
}
}

v, err = m.ds.GetVolume(volumeName)
if err != nil {
return nil, err
}

if types.IsDataEngineV2(v.Spec.DataEngine) {
return nil, fmt.Errorf("cannot upgrade engine for volume %v using image %v because the volume is using data engine v2", volumeName, image)
}

if v.Spec.Image == image {
return nil, fmt.Errorf("upgrading in process for volume %v engine image from %v to %v already",
v.Name, v.Status.CurrentImage, v.Spec.Image)
}

if v.Spec.Image != v.Status.CurrentImage && image != v.Status.CurrentImage {
return nil, fmt.Errorf("upgrading in process for volume %v engine image from %v to %v, cannot upgrade to another engine image",
v.Name, v.Status.CurrentImage, v.Spec.Image)
}

if isReady, err := m.ds.CheckImageReadyOnAllVolumeReplicas(image, v.Name, v.Status.CurrentNodeID, v.Spec.DataEngine); !isReady {
if err != nil {
return nil, fmt.Errorf("cannot upgrade engine image for volume %v from image %v to image %v: %v", v.Name, v.Spec.Image, image, err)
}
return nil, fmt.Errorf("cannot upgrade engine image for volume %v from image %v to image %v because the engine image %v is not deployed on the replicas' nodes or the node that the volume is attached to", v.Name, v.Spec.Image, image, image)
}

if isReady, err := m.ds.CheckImageReadyOnAllVolumeReplicas(v.Status.CurrentImage, v.Name, v.Status.CurrentNodeID, v.Spec.DataEngine); !isReady {
if err != nil {
return nil, fmt.Errorf("cannot upgrade engine image for volume %v from image %v to image %v: %v", v.Name, v.Spec.Image, image, err)
}
return nil, fmt.Errorf("cannot upgrade engine image for volume %v from image %v to image %v because the volume's current engine image %v is not deployed on the replicas' nodes or the node that the volume is attached to", v.Name, v.Spec.Image, image, v.Status.CurrentImage)
}

if util.IsVolumeMigrating(v) {
return nil, fmt.Errorf("cannot upgrade during migration")
}

// Note: Rebuild is not supported for old DR volumes and the handling of a degraded DR volume live upgrade will get stuck.
// Hence if you modify this part, the live upgrade should be prevented in API level for all old DR volumes.
if v.Status.State == longhorn.VolumeStateAttached && v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
return nil, fmt.Errorf("cannot do live upgrade for a unhealthy volume %v", v.Name)
}

if v.Status.State == longhorn.VolumeStateAttached && v.Spec.DataLocality == longhorn.DataLocalityStrictLocal {
return nil, fmt.Errorf("cannot do live upgrade for an attached strict-local volume %v", v.Name)
}

oldImage := v.Spec.Image
v.Spec.Image = image

v, err = m.ds.UpdateVolume(v)
if err != nil {
return nil, err
}
if image != v.Status.CurrentImage {
logrus.Infof("Upgrading volume %v engine image from %v to %v", v.Name, oldImage, v.Spec.Image)
} else {
logrus.Infof("Rolling back volume %v engine image to %v", v.Name, v.Status.CurrentImage)
}

return v, nil
}

Check notice on line 848 in manager/volume.go

View check run for this annotation

codefactor.io / CodeFactor

manager/volume.go#L763-L848

Complex Method
func (m *VolumeManager) UpdateReplicaCount(name string, count int) (v *longhorn.Volume, err error) {
defer func() {
err = errors.Wrapf(err, "unable to update replica count for volume %v", name)
Expand Down Expand Up @@ -1189,63 +1189,75 @@
return v, nil
}

func (m *VolumeManager) restoreBackingImage(backupTargetName, biName, secret, secretNamespace string) error {
func (m *VolumeManager) restoreBackingImage(backupTargetName, biName, secret, secretNamespace, dataEngine string) error {
if secret != "" || secretNamespace != "" {
_, err := m.ds.GetSecretRO(secretNamespace, secret)
if err != nil {
return errors.Wrapf(err, "failed to get secret %v in namespace %v for the backing image %v", secret, secretNamespace, biName)
}
}

if dataEngine == "" {
dataEngine = string(longhorn.DataEngineTypeV1)
}

if longhorn.DataEngineType(dataEngine) != longhorn.DataEngineTypeV1 && longhorn.DataEngineType(dataEngine) != longhorn.DataEngineTypeV2 {
return fmt.Errorf("invalid data engine type %v", dataEngine)
}

if biName == "" {
return nil
}
bi, err := m.ds.GetBackingImageRO(biName)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get backing image %v", biName)
}
}
// backing image already exists
if bi != nil {
if bi.Spec.DataEngine != longhorn.DataEngineType(dataEngine) {
return fmt.Errorf("backing image %v already exists with different data engine type %v", biName, bi.Spec.DataEngine)
}
return nil
}

// try find the backup backing image
// try to find the backup backing image
bbi, err := m.ds.GetBackupBackingImagesWithBackupTargetNameRO(backupTargetName, biName)
if err != nil {
return errors.Wrapf(err, "failed to get backup backing image %v", biName)
}

// restore by creating backing image with type restore
concurrentLimit, err := m.ds.GetSettingAsInt(types.SettingNameBackupConcurrentLimit)
if err != nil {
return errors.Wrapf(err, "failed to get %v value", types.SettingNameBackupConcurrentLimit)
}

restoreBi := &longhorn.BackingImage{
ObjectMeta: metav1.ObjectMeta{
Name: biName,
},
Spec: longhorn.BackingImageSpec{
DataEngine: longhorn.DataEngineType(dataEngine),
Checksum: bbi.Status.Checksum,
SourceType: longhorn.BackingImageDataSourceTypeRestore,
SourceParameters: map[string]string{
longhorn.DataSourceTypeRestoreParameterBackupTargetName: backupTargetName,
longhorn.DataSourceTypeRestoreParameterBackupURL: bbi.Status.URL,
longhorn.DataSourceTypeRestoreParameterConcurrentLimit: strconv.FormatInt(concurrentLimit, 10),
},
Secret: secret,
SecretNamespace: secretNamespace,
},
}
if _, err = m.ds.CreateBackingImage(restoreBi); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create backing image %v", biName)
}

return nil
}

Check notice on line 1260 in manager/volume.go

View check run for this annotation

codefactor.io / CodeFactor

manager/volume.go#L1192-L1260

Complex Method
func (m *VolumeManager) UpdateFreezeFilesystemForSnapshot(name string,
freezeFilesystemForSnapshot longhorn.FreezeFilesystemForSnapshot) (v *longhorn.Volume, err error) {
defer func() {
Expand Down
Loading