From 5f05863a3b9f083293e5a270ea776e2816ec5902 Mon Sep 17 00:00:00 2001 From: Pawan Prakash Sharma Date: Thu, 8 Oct 2020 13:30:16 +0530 Subject: [PATCH] feat(zfspv): adding support to do incremental backup/restore (#121) We can create the snapshot location as below ```yaml apiVersion: velero.io/v1 kind: VolumeSnapshotLocation metadata: name: incr namespace: velero spec: provider: openebs.io/zfspv-blockstore config: bucket: velero prefix: zfs incrBackupCount: "3" # number of incremental backup we want to have namespace: openebs # this is namespace where ZFS-LocalPV creates all the CRs, passed as OPENEBS_NAMESPACE env in the ZFS-LocalPV deployment provider: aws region: minio s3ForcePathStyle: "true" s3Url: http://minio.velero.svc:9000 ``` here we can specify how many incremental backups we want to have. We will create a full backup first and if we have provided non zero value for `incrBackupCount` option, then the plugin will create that many incremental backup. Thing to note here is `incrBackupCount` parameter defines how many incremental backup we want, it does not include the first full backup. The plugin will create the incremental backup if it is scheduled backup. If it is not a scheduled backup, the ZFS-LocalPV plugin will create full backup for each request. While doing the restore, we just need to give the backup name which we want to restore. The plugin is capable of identifying the full backup of the incremental snapshot group and will restore from the full backup and keep restoring the incremental backup till the backup name provided in the restore command. Note: We should not modify the VolumeSnapshotLocation once it is created. Signed-off-by: Pawan --- changelogs/unreleased/121-pawanpraka1 | 1 + pkg/clouduploader/operation.go | 110 +++++++++++- pkg/zfs/plugin/backup.go | 20 ++- pkg/zfs/plugin/restore.go | 248 +++++++++++++++----------- pkg/zfs/plugin/zfs.go | 19 +- pkg/zfs/utils/utils.go | 4 +- 6 files changed, 277 insertions(+), 125 deletions(-) create mode 100644 changelogs/unreleased/121-pawanpraka1 diff --git a/changelogs/unreleased/121-pawanpraka1 b/changelogs/unreleased/121-pawanpraka1 new file mode 100644 index 00000000..9a84a173 --- /dev/null +++ b/changelogs/unreleased/121-pawanpraka1 @@ -0,0 +1 @@ +adding support to do incremental backup/restore for ZFS-LocalPV diff --git a/pkg/clouduploader/operation.go b/pkg/clouduploader/operation.go index 47d62c13..b858f03e 100644 --- a/pkg/clouduploader/operation.go +++ b/pkg/clouduploader/operation.go @@ -16,13 +16,34 @@ limitations under the License. package clouduploader -import "github.com/aws/aws-sdk-go/service/s3/s3manager" +import ( + "io" + "strings" + + "github.com/pkg/errors" + + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "gocloud.dev/blob" +) const ( // backupDir is remote storage-bucket directory backupDir = "backups" ) +const ( + // Type of Key, used while listing keys + + // KeyFile - if key is a file + ListKeyFile int = 1 << iota + + // KeyDirectory - if key is a directory + ListKeyDir + + // KeyBoth - if key is a file or directory + ListKeyBoth +) + // Upload will perform upload operation for given file. // It will create a TCP server through which client can // connect and upload data to cloud blob storage file @@ -146,3 +167,90 @@ func (c *Conn) ConnReadyWait() bool { ok := <-*c.ConnReady return ok } + +// listKeys return list of Keys -- files/directories +// Note: +// - list may contain incomplete list of keys, check for error before using list +// - listKeys uses '/' as delimiter. +func (c *Conn) listKeys(prefix string, keyType int) ([]string, error) { + keys := []string{} + + lister := c.bucket.List(&blob.ListOptions{ + Delimiter: "/", + Prefix: prefix, + }) + for { + obj, err := lister.Next(c.ctx) + if err == io.EOF { + break + } + + if err != nil { + c.Log.Errorf("Failed to get next blob err=%v", err) + return keys, err + } + + switch keyType { + case ListKeyBoth: + case ListKeyFile: + if obj.IsDir { + continue + } + case ListKeyDir: + if !obj.IsDir { + continue + } + default: + c.Log.Warningf("Invalid keyType=%d, Ignored", keyType) + continue + } + + keys = append(keys, obj.Key) + } + return keys, nil +} + +// bkpPathPrefix return 'prefix path' for the given 'backup name prefix' +func (c *Conn) bkpPathPrefix(backupPrefix string) string { + if c.backupPathPrefix == "" { + return backupDir + "/" + backupPrefix + } + return c.backupPathPrefix + "/" + backupDir + "/" + backupPrefix +} + +// filePathPrefix generate prefix for the given file name prefix using 'configured file prefix' +func (c *Conn) filePathPrefix(filePrefix string) string { + return c.prefix + "-" + filePrefix +} + +// GetSnapListFromCloud gets the list of a snapshot for the given backup name +// the argument should be same as that of GenerateRemoteFilename(file, backup) call +// used while doing the backup of the volume +func (c *Conn) GetSnapListFromCloud(file, backup string) ([]string, error) { + var snapList []string + + // list directory having schedule/backup name as prefix + dirs, err := c.listKeys(c.bkpPathPrefix(backup), ListKeyDir) + if err != nil { + return snapList, errors.Wrapf(err, "failed to get list of directory") + } + + for _, dir := range dirs { + // list files for dir having volume name as prefix + files, err := c.listKeys(dir+c.filePathPrefix(file), ListKeyFile) + if err != nil { + return snapList, errors.Wrapf(err, "failed to get list of snapshot file at path=%v", dir) + } + + if len(files) != 0 { + // snapshot exist in the backup directory + + // add backup name from dir path to snapList + s := strings.Split(dir, "/") + + // dir will contain path with trailing '/', example: 'backups/b-0/' + snapList = append(snapList, s[len(s)-2]) + } + } + return snapList, nil +} diff --git a/pkg/zfs/plugin/backup.go b/pkg/zfs/plugin/backup.go index d74af213..6841d497 100644 --- a/pkg/zfs/plugin/backup.go +++ b/pkg/zfs/plugin/backup.go @@ -83,6 +83,14 @@ func (p *Plugin) getPrevSnap(volname, schdname string) (string, error) { var backups []string + size := len(bkpList.Items) + count := p.incremental + 1 + + if uint64(size)%count == 0 { + // have to start the next snapshot incremental group, take the full backup + return "", nil + } + /* * Backup names are in the form of - * to get the last snapshot, sort the list of successful backups, @@ -115,15 +123,15 @@ func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string, po // add schdeule name as label labels[VeleroSchdKey] = schdname labels[VeleroVolKey] = vol.Name - if p.incremental { - prevSnap, err = p.getPrevSnap(vol.Name, schdname) - if err != nil { - p.Log.Errorf("zfs: Failed to get prev snapshot bkp %s err: {%v}", snapname, err) - return "", err - } + prevSnap, err = p.getPrevSnap(vol.Name, schdname) + if err != nil { + p.Log.Errorf("zfs: Failed to get prev snapshot bkp %s err: {%v}", snapname, err) + return "", err } } + p.Log.Debugf("zfs: backup incr(%d) schd=%s snap=%s prevsnap=%s vol=%s", p.incremental, schdname, snapname, prevSnap, vol.Name) + serverAddr := p.remoteAddr + ":" + strconv.Itoa(port) bkp, err := bkpbuilder.NewBuilder(). diff --git a/pkg/zfs/plugin/restore.go b/pkg/zfs/plugin/restore.go index 3db4d657..0c6583cb 100644 --- a/pkg/zfs/plugin/restore.go +++ b/pkg/zfs/plugin/restore.go @@ -18,6 +18,7 @@ package plugin import ( "encoding/json" + "sort" "strconv" "sync" "time" @@ -55,78 +56,57 @@ func (p *Plugin) createVolume(pvname string, bkpname string, bkpZV *apis.ZFSVolu return nil, err } - var vol *apis.ZFSVolume = nil - - if len(volList.Items) > 1 { - return nil, errors.Errorf("zfs: error can not have more than one source volume %s bkpname %s", pvname, bkpname) - } else if len(volList.Items) == 1 { - vol = &volList.Items[0] - if !p.incremental || - bkpname == vol.Annotations[VeleroBkpKey] { - // volume has already been restored - return vol, errors.Errorf("zfs: pv %s is already restored bkpname %s", pvname, bkpname) - } - - p.Log.Debugf("zfs: got existing volume %s for restore vol %s snap %s", vol.Name, pvname, bkpname) + if len(volList.Items) > 0 { + return nil, errors.Errorf("zfs: err pv %s has already been restored bkpname %s", pvname, bkpname) } - if vol == nil { - // this is first full restore, go ahead and create the volume - rZV := &apis.ZFSVolume{} - // hack(https://github.com/vmware-tanzu/velero/pull/2835): generate a new uuid only if PV exist - pv, err := p.getPV(pvname) - - if err == nil && pv != nil { - rvol, err := utils.GetRestorePVName() - if err != nil { - return nil, errors.Errorf("zfs: failed to get restore vol name for %s", pvname) - } - rZV.Name = rvol - } else { - rZV.Name = pvname + // this is first full restore, go ahead and create the volume + rZV := &apis.ZFSVolume{} + // hack(https://github.com/vmware-tanzu/velero/pull/2835): generate a new uuid only if PV exist + pv, err := p.getPV(pvname) + + if err == nil && pv != nil { + rvol, err := utils.GetRestorePVName() + if err != nil { + return nil, errors.Errorf("zfs: failed to get restore vol name for %s", pvname) } + rZV.Name = rvol + } else { + rZV.Name = pvname + } - rZV.Spec = bkpZV.Spec + rZV.Spec = bkpZV.Spec - // if restored volume was a clone, create a new volume instead of cloning it from a snaphsot - rZV.Spec.SnapName = "" + // if restored volume was a clone, create a new volume instead of cloning it from a snaphsot + rZV.Spec.SnapName = "" - // get the target node - tnode, err := velero.GetTargetNode(p.K8sClient, rZV.Spec.OwnerNodeID) - if err != nil { - return nil, err - } + // get the target node + tnode, err := velero.GetTargetNode(p.K8sClient, rZV.Spec.OwnerNodeID) + if err != nil { + return nil, err + } - // update the target node name - p.Log.Debugf("zfs: GetTargetNode node %s=>%s", rZV.Spec.OwnerNodeID, tnode) - rZV.Spec.OwnerNodeID = tnode + // update the target node name + p.Log.Debugf("zfs: GetTargetNode node %s=>%s", rZV.Spec.OwnerNodeID, tnode) + rZV.Spec.OwnerNodeID = tnode - // set the volume status as pending - rZV.Status.State = zfs.ZFSStatusPending + // set the volume status as pending + rZV.Status.State = zfs.ZFSStatusPending - // add original volume and schedule name in the label - rZV.Labels = map[string]string{VeleroVolKey: pvname, VeleroNsKey: ns} - rZV.Annotations = map[string]string{VeleroBkpKey: bkpname} + // add original volume and schedule name in the label + rZV.Labels = map[string]string{VeleroVolKey: pvname, VeleroNsKey: ns} + rZV.Annotations = map[string]string{VeleroBkpKey: bkpname} - vol, err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Create(rZV) - if err != nil { - p.Log.Errorf("zfs: create ZFSVolume failed vol %v err: %v", rZV, err) - return nil, err - } + vol, err := volbuilder.NewKubeclient().WithNamespace(p.namespace).Create(rZV) + if err != nil { + p.Log.Errorf("zfs: create ZFSVolume failed vol %v err: %v", rZV, err) + return nil, err + } - err = p.checkVolCreation(rZV.Name) - if err != nil { - p.Log.Errorf("zfs: checkVolCreation failed %s err: %v", rZV.Name, err) - return nil, err - } - } else { - // this is incremental restore, update the ZFS volume - vol.Spec = bkpZV.Spec - vol, err := volbuilder.NewKubeclient().WithNamespace(p.namespace).Update(vol) - if err != nil { - p.Log.Errorf("zfs: update ZFSVolume failed vol %v err: %v", vol, err) - return nil, err - } + err = p.checkVolCreation(rZV.Name) + if err != nil { + p.Log.Errorf("zfs: checkVolCreation failed %s err: %v", rZV.Name, err) + return nil, err } return vol, nil @@ -149,6 +129,14 @@ func (p *Plugin) restoreZFSVolume(pvname, bkpname string) (*apis.ZFSVolume, erro return p.createVolume(pvname, bkpname, bkpZV) } +func (p *Plugin) destroyZFSVolume(volname string) error { + err := volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(volname) + if err != nil { + p.Log.Errorf("zfs: delete vol failed vol %s err: %v", volname, err) + } + return err +} + func (p *Plugin) isVolumeReady(volumeID string) (ready bool, err error) { getOptions := metav1.GetOptions{} vol, err := volbuilder.NewKubeclient(). @@ -161,7 +149,7 @@ func (p *Plugin) isVolumeReady(volumeID string) (ready bool, err error) { return vol.Status.State == zfs.ZFSStatusReady, nil } -func (p *Plugin) checkRestoreStatus(rname, volname string) error { +func (p *Plugin) checkRestoreStatus(rname string) error { defer func() { err := restorebuilder.NewKubeclient().WithNamespace(p.namespace).Delete(rname) if err != nil { @@ -184,13 +172,6 @@ func (p *Plugin) checkRestoreStatus(rname, volname string) error { case apis.RSTZFSStatusDone: return nil case apis.RSTZFSStatusFailed, apis.RSTZFSStatusInvalid: - // delete the volume - err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(volname) - if err != nil { - // ignore error - p.Log.Errorf("zfs: delete vol failed vol %s restore %s err: %v", volname, rname, err) - } - return errors.Errorf("zfs: error in restoring %s, status:{%v}", rname, rstr.Status) } @@ -222,15 +203,8 @@ func (p *Plugin) checkVolCreation(volname string) error { return nil } -// restoreVolume returns restored vol name and a boolean value indication if we need -// to restore the volume. If Volume is already restored, we don't need to restore it. -func (p *Plugin) restoreVolume(volname, bkpname string, port int) (string, string, error) { - zv, err := p.restoreZFSVolume(volname, bkpname) - if err != nil { - p.Log.Errorf("zfs: restore ZFSVolume failed vol %s bkp %s err %v", volname, bkpname, err) - return "", "", err - } - +// startRestore creates the ZFSRestore CR to start downloading the data and returns ZFSRestore CR name +func (p *Plugin) startRestore(zv *apis.ZFSVolume, bkpname string, port int) (string, error) { node := zv.Spec.OwnerNodeID serverAddr := p.remoteAddr + ":" + strconv.Itoa(port) zfsvol := zv.Name @@ -245,27 +219,15 @@ func (p *Plugin) restoreVolume(volname, bkpname string, port int) (string, strin Build() if err != nil { - // delete the volume - verr := volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(zfsvol) - if verr != nil { - // ignore error - p.Log.Errorf("zfs: delete vol failed vol %s rname %s err: %v", zfsvol, rname, verr) - } - return "", "", err + return "", err } _, err = restorebuilder.NewKubeclient().WithNamespace(p.namespace).Create(rstr) if err != nil { - // delete the volume - verr := volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(zfsvol) - if verr != nil { - // ignore error - p.Log.Errorf("zfs: delete vol failed vol %s rname %s err: %v", zfsvol, rname, err) - } - return "", "", err + return "", err } - return zfsvol, rname, nil + return rname, nil } func (p *Plugin) doDownload(wg *sync.WaitGroup, filename string, port int) { @@ -280,16 +242,10 @@ func (p *Plugin) doDownload(wg *sync.WaitGroup, filename string, port int) { close(*p.cl.ConnReady) } -func (p *Plugin) doRestore(snapshotID string, port int) (string, error) { - - volname, bkpname, err := utils.GetInfoFromSnapshotID(snapshotID) - if err != nil { - return "", err - } - - filename := p.cl.GenerateRemoteFilename(volname, bkpname) +func (p *Plugin) dataRestore(zv *apis.ZFSVolume, pvname, bkpname string, port int) error { + filename := p.cl.GenerateRemoteFilename(pvname, bkpname) if filename == "" { - return "", errors.Errorf("zfs: Error creating remote file name for restore") + return errors.Errorf("zfs: Error creating remote file name for restore") } // reset the connection state @@ -310,21 +266,95 @@ func (p *Plugin) doRestore(snapshotID string, port int) (string, error) { // wait for the connection to be ready ok := p.cl.ConnReadyWait() if !ok { - return "", errors.Errorf("zfs: restore server is not ready") + return errors.Errorf("zfs: restore server is not ready") + } + + rname, err := p.startRestore(zv, bkpname, port) + if err != nil { + p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", pvname, bkpname, err) + return err + } + + err = p.checkRestoreStatus(rname) + if err != nil { + p.Log.Errorf("zfs: restore failed vol %s snap %s err: %v", pvname, bkpname, err) + return err + } + + p.Log.Debugf("zfs: restore done vol %s => %s bkp %s", pvname, zv.Name, bkpname) + return nil +} + +func (p *Plugin) getSnapList(pvname, bkpname string) ([]string, error) { + var list []string + + schdname := utils.GetScheduleName(bkpname) + + snapList, err := p.cl.GetSnapListFromCloud(pvname, schdname) + if err != nil { + return list, err + } + + sort.Strings(snapList) + + var size uint64 = 0 + + // get the index of the backup + for idx, snap := range snapList { + if snap == bkpname { + size = uint64(idx) + 1 + break + } + } + + if size == 0 { + return list, errors.Errorf("zfs: error backup not found in snap list %s", bkpname) } - newvol, rname, err := p.restoreVolume(volname, bkpname, port) + // add the full backup count to get the closest full snapshot index for the backup + count := p.incremental + 1 + + // get the index of full backup + fullBkpIdx := (uint64(size-1) / count) * count + list = snapList[fullBkpIdx:size] + + return list, nil +} + +func (p *Plugin) doRestore(snapshotID string, port int) (string, error) { + pvname, bkpname, err := utils.GetInfoFromSnapshotID(snapshotID) if err != nil { - p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, bkpname, err) return "", err } - err = p.checkRestoreStatus(rname, newvol) + bkpList, err := p.getSnapList(pvname, bkpname) if err != nil { - p.Log.Errorf("zfs: restore failed vol %s snap %s err: %v", volname, bkpname, err) return "", err } - p.Log.Debugf("zfs: restore done vol %s => %s snap %s", volname, newvol, snapshotID) - return newvol, nil + if len(bkpList) == 0 { + return "", errors.Errorf("zfs: error empty restore list %s", bkpname) + } + + p.Log.Debugf("zfs: backup list for restore %v", bkpList) + + zv, err := p.restoreZFSVolume(pvname, bkpname) + if err != nil { + p.Log.Errorf("zfs: restore ZFSVolume failed vol %s bkp %s err %v", pvname, bkpname, err) + return "", err + } + + // attempt the incremental restore, will resote single backup if it is not a incremental backup + for _, bkp := range bkpList { + err = p.dataRestore(zv, pvname, bkp, port) + + if err != nil { + // delete the volume + p.destroyZFSVolume(zv.Name) + p.Log.Errorf("zfs: error doRestore returning snap %s err %v", snapshotID, err) + return "", err + } + } + + return zv.Name, nil } diff --git a/pkg/zfs/plugin/zfs.go b/pkg/zfs/plugin/zfs.go index 42f87e87..19ae1d38 100644 --- a/pkg/zfs/plugin/zfs.go +++ b/pkg/zfs/plugin/zfs.go @@ -17,6 +17,8 @@ limitations under the License. package plugin import ( + "strconv" + cloud "github.com/openebs/velero-plugin/pkg/clouduploader" "github.com/openebs/velero-plugin/pkg/velero" "github.com/openebs/velero-plugin/pkg/zfs/utils" @@ -36,8 +38,8 @@ const ( // ZfsPvNamespace config key for OpenEBS namespace ZfsPvNamespace = "namespace" - // ZfsPvBackup config key for backup type full or incremental - ZfsPvBackup = "backup" + // ZfsPvIncr config key for providing count of incremental backups + ZfsPvIncr = "incrBackupCount" // zfs csi driver name ZfsDriverName = "zfs.csi.openebs.io" @@ -67,8 +69,8 @@ type Plugin struct { // as env OPENEBS_NAMESPACE while deploying it. namespace string - // This specify whether we have to take incremental backup or full backup - incremental bool + // This specifies how many incremental backup we have to keep + incremental uint64 // cl stores cloud connection information cl *cloud.Conn @@ -92,8 +94,12 @@ func (p *Plugin) Init(config map[string]string) error { return errors.New("zfs: namespace not provided for ZFS-LocalPV") } - if bkptype, ok := config[ZfsPvBackup]; ok && bkptype == "incremental" { - p.incremental = true + if count, ok := config[ZfsPvIncr]; ok { + incr, err := strconv.ParseUint(count, 10, 64) + if err != nil { + return errors.Wrapf(err, "zfs: invalid incrBackupCount value=%s", count) + } + p.incremental = incr } conf, err := rest.InClusterConfig() @@ -123,7 +129,6 @@ func (p *Plugin) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ strin p.Log.Debugf("zfs: CreateVolumeFromSnapshot called snap %s", snapshotID) volumeID, err := p.doRestore(snapshotID, ZFSRestorePort) - if err != nil { p.Log.Errorf("zfs: error CreateVolumeFromSnapshot returning snap %s err %v", snapshotID, err) return "", err diff --git a/pkg/zfs/utils/utils.go b/pkg/zfs/utils/utils.go index 553f07a9..538f7fc0 100644 --- a/pkg/zfs/utils/utils.go +++ b/pkg/zfs/utils/utils.go @@ -84,7 +84,7 @@ func GetRestorePVName() (string, error) { // It will check if backup name have 'bkp-20060102150405' format func GetScheduleName(backupName string) string { // for non-scheduled backup, we are considering backup name as schedule name only - schdName := "" + schdName := backupName // If it is scheduled backup then we need to get the schedule name splitName := strings.Split(backupName, "-") @@ -92,7 +92,7 @@ func GetScheduleName(backupName string) string { _, err := time.Parse("20060102150405", splitName[len(splitName)-1]) if err != nil { // last substring is not timestamp, so it is not generated from schedule - return "" + return schdName } schdName = strings.Join(splitName[0:len(splitName)-1], "-") }