Skip to content

Commit

Permalink
feat(restore): add support for restore of csi volumes (#93)
Browse files Browse the repository at this point in the history
This PR helps support restore of OpenEBS cStor CSI based volumes. As cStor CSI volumes
are based on v1 APIs of OpenEBS cStor, the PR integrates v1 APIs client.

To know more about OpenEBS V1 apis, please refer the following links:
https://github.com/openebs/api/tree/master/design/cstor/v1
https://github.com/openebs/api/tree/master/pkg/apis/cstor


Signed-off-by: Ashutosh Kumar <[email protected]>
  • Loading branch information
Ashutosh Kumar authored and kmova committed Jun 11, 2020
1 parent 722587f commit aae7aa7
Show file tree
Hide file tree
Showing 9 changed files with 493 additions and 102 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/93-sonasingh46
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add restore support for cStor CSI based volumes
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ module github.com/openebs/velero-plugin
go 1.13

require (
github.com/aws/aws-sdk-go v1.30.7
github.com/aws/aws-sdk-go v1.31.13
github.com/ghodss/yaml v1.0.0
github.com/gofrs/uuid v3.2.0+incompatible
github.com/hashicorp/go-plugin v1.0.1-0.20190610192547-a1bc61569a26 // indirect
github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.0
github.com/openebs/api v1.10.0
github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/sirupsen/logrus v1.5.0
github.com/spf13/cobra v1.0.0 // indirect
github.com/vmware-tanzu/velero v1.3.2
gocloud.dev v0.19.0
gocloud.dev v0.20.0
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
google.golang.org/api v0.21.0
google.golang.org/api v0.26.0
k8s.io/api v0.17.4
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.15.12-beta.0
k8s.io/client-go v0.17.3
k8s.io/utils v0.0.0-20191218082557-f07c713de883 // indirect
)
385 changes: 329 additions & 56 deletions go.sum

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions pkg/cstor/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
Spec: *bkpSpec,
}

if p.isCSIVolume {
if vol.isCSIVolume {
url = p.cvcAddr + backupEndpoint
} else {
url = p.mayaAddr + backupEndpoint
Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error)
},
}

if p.isCSIVolume {
if vol.isCSIVolume {
url = p.cvcAddr + restorePath
} else {
url = p.mayaAddr + restorePath
Expand All @@ -249,6 +249,7 @@ func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error)
// if apiserver is having version <=1.8 then it will return empty response
ok, err := isEmptyRestResponse(data)
if !ok && err == nil {
// TODO: for CSI base volume response type may be different
err = p.updateVolCASInfo(data, vol.volname)
if err != nil {
err = errors.Wrapf(err, "Error parsing restore API response")
Expand All @@ -275,10 +276,10 @@ func isEmptyRestResponse(data []byte) (bool, error) {
return false, nil
}

func (p *Plugin) sendDeleteRequest(backup, volume, namespace, schedule string) error {
func (p *Plugin) sendDeleteRequest(backup, volume, namespace, schedule string, isCSIVolume bool) error {
var url string

if p.isCSIVolume {
if isCSIVolume {
url = p.cvcAddr + backupEndpoint + backup
} else {
url = p.mayaAddr + backupEndpoint + backup
Expand Down
65 changes: 48 additions & 17 deletions pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
/* Due to dependency conflict, please ensure openebs
* dependency manually instead of using dep
*/
v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
openebsapis "github.com/openebs/api/pkg/client/clientset/versioned"
"github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
openebs "github.com/openebs/maya/pkg/client/generated/clientset/versioned"
velero "github.com/openebs/velero-plugin/pkg/velero"
"github.com/openebs/velero-plugin/pkg/velero"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -76,6 +77,17 @@ type Plugin struct {
// OpenEBSClient is used for openEBS CR operation
OpenEBSClient *openebs.Clientset

// OpenEBSAPIsClient clientset for OpenEBS CR operations
/*
Note: This client comes from openebs/api ( github repo )
and this client has the latest cstor v1 APIs.
For compatibility this client has also some (not all) v1alpha1 APIs
that is present in above OpenEBSClient(this client comes
from openebs/maya github repo)
Finally, we will migrate to client based on openebs/api.
*/
OpenEBSAPIsClient openebsapis.Interface

// config to store parameters from velero server
config map[string]string

Expand Down Expand Up @@ -103,9 +115,6 @@ type Plugin struct {

// if only local snapshot enabled
local bool

// if cstor-csi is enabled
isCSIVolume bool
}

// Snapshot describes snapshot object information
Expand All @@ -118,6 +127,9 @@ type Snapshot struct {

// namespace is volume's namespace
namespace string

// isCSIVolume is true for cStor based CSI volume
isCSIVolume bool
}

// Volume describes volume object information
Expand Down Expand Up @@ -150,6 +162,9 @@ type Volume struct {
storageClass string

iscsi v1.ISCSIPersistentVolumeSource

// isCSIVolume is true for cStor based CSI volume
isCSIVolume bool
}

func (p *Plugin) getServerAddress() string {
Expand Down Expand Up @@ -198,6 +213,12 @@ func (p *Plugin) Init(config map[string]string) error {
}
p.OpenEBSClient = openEBSClient

// Set client from openebs apis
err = p.SetOpenEBSAPIClient(conf)
if err != nil {
return err
}

p.mayaAddr, err = p.getMapiAddr()
if err != nil {
return errors.Wrapf(err, "error fetching Maya-ApiServer rest client address")
Expand All @@ -212,10 +233,6 @@ func (p *Plugin) Init(config map[string]string) error {
return errors.New("faile to get address for maya-apiserver/cvc-server service")
}

if p.cvcAddr != "" {
p.isCSIVolume = true
}

p.cstorServerAddr = p.getServerAddress()
if p.cstorServerAddr == "" {
return errors.New("error fetching cstorVeleroServer address")
Expand All @@ -242,8 +259,21 @@ func (p *Plugin) Init(config map[string]string) error {
return p.cl.Init(config)
}

// SetOpenEBSAPIClient sets openebs client from openebs/apis
// Ref: https://github.com/openebs/api/tree/master/pkg/apis
func (p *Plugin) SetOpenEBSAPIClient(c *rest.Config) error {
OpenEBSAPIClient, err := openebsapis.NewForConfig(c)
if err != nil {
p.Log.Errorf("Failed to create OpenEBS client from openebs apis. %s", err)
return err
}
p.OpenEBSAPIsClient = OpenEBSAPIClient
return nil
}

// GetVolumeID return volume name for given PV
func (p *Plugin) GetVolumeID(unstructuredPV runtime.Unstructured) (string, error) {
var isCSIVolume bool
pv := new(v1.PersistentVolume)

if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.UnstructuredContent(), pv); err != nil {
Expand All @@ -265,11 +295,9 @@ func (p *Plugin) GetVolumeID(unstructuredPV runtime.Unstructured) (string, error
}
} else {
// check if PV is created by CSI driver
if !(pv.Spec.CSI != nil &&
pv.Spec.CSI.Driver == openebsCSIName) {
if isCSIVolume = isCSIPv(*pv); !isCSIVolume {
return "", nil
}
p.isCSIVolume = true
}

if pv.Status.Phase == v1.VolumeReleased ||
Expand All @@ -284,6 +312,7 @@ func (p *Plugin) GetVolumeID(unstructuredPV runtime.Unstructured) (string, error
storageClass: pv.Spec.StorageClassName,
namespace: pv.Spec.ClaimRef.Namespace,
size: pv.Spec.Capacity[v1.ResourceStorage],
isCSIVolume: isCSIVolume,
}
}

Expand Down Expand Up @@ -324,7 +353,7 @@ func (p *Plugin) DeleteSnapshot(snapshotID string) error {
err = p.sendDeleteRequest(snapInfo.backupName,
snapInfo.volID,
snapInfo.namespace,
scheduleName)
scheduleName, snapInfo.isCSIVolume)
if err != nil {
return errors.Wrapf(err, "failed to execute maya-apiserver DELETE API")
}
Expand Down Expand Up @@ -395,7 +424,7 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin
return "", errors.Errorf("Error creating remote file name for backup")
}

go p.checkBackupStatus(bkp)
go p.checkBackupStatus(bkp, vol.isCSIVolume)

ok = p.cl.Upload(filename, size)
if !ok {
Expand Down Expand Up @@ -427,10 +456,12 @@ func (p *Plugin) getSnapInfo(snapshotID string) (*Snapshot, error) {
if pv.Spec.ClaimRef.Namespace == "" {
return nil, errors.Errorf("No namespace in pv.spec.claimref for PV{%s}", snapshotID)
}
isCSIVolume := isCSIPv(*pv)
return &Snapshot{
volID: volumeID,
backupName: bkpName,
namespace: pv.Spec.ClaimRef.Namespace,
volID: volumeID,
backupName: bkpName,
namespace: pv.Spec.ClaimRef.Namespace,
isCSIVolume: isCSIVolume,
}, nil
}

Expand Down
74 changes: 65 additions & 9 deletions pkg/cstor/cvr_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cstor
import (
"time"

cstorv1 "github.com/openebs/api/pkg/apis/cstor/v1"
"github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,14 +31,23 @@ var CVRWaitCount = 100
// CVRCheckInterval defines amount of delay for CVR check
var CVRCheckInterval = 5 * time.Second

// waitForAllCVR will ensure that all CVR related to
// given volumes are created
func (p *Plugin) waitForAllCVR(vol *Volume) error {
replicaCount := p.getCVRCount(vol.volname)
// waitForAllCVRs will ensure that all CVR related to
// the given volume is created
func (p *Plugin) waitForAllCVRs(vol *Volume) error {
replicaCount := p.getCVRCount(vol.volname, vol.isCSIVolume)
if replicaCount == -1 {
return errors.Errorf("Failed to fetch replicaCount for volume{%s}", vol.volname)
}

if vol.isCSIVolume {
return p.waitForCSIBasedCVRs(vol, replicaCount)
}
return p.waitFoNonCSIBasedCVRs(vol, replicaCount)
}

// waitFoNonCSIBasedCVRs will ensure that all CVRs related to
// given non CSI based volume is created
func (p *Plugin) waitFoNonCSIBasedCVRs(vol *Volume, replicaCount int) error {
for cnt := 0; cnt < CVRWaitCount; cnt++ {
cvrList, err := p.OpenEBSClient.
OpenebsV1alpha1().
Expand All @@ -48,12 +58,10 @@ func (p *Plugin) waitForAllCVR(vol *Volume) error {
if err != nil {
return errors.Errorf("Failed to fetch CVR for volume=%s %s", vol.volname, err)
}

if len(cvrList.Items) != replicaCount {
time.Sleep(CVRCheckInterval)
continue
}

cvrCount := 0
for _, cvr := range cvrList.Items {
if cvr.Status.Phase == v1alpha1.CVRStatusOnline ||
Expand All @@ -67,12 +75,61 @@ func (p *Plugin) waitForAllCVR(vol *Volume) error {
}
time.Sleep(CVRCheckInterval)
}
return errors.Errorf("CVR for volume{%s} are not ready!", vol.volname)
}

// waitForCSIBasedCVRs will ensure that all CVRs related to
// the given CSI volume is created.
func (p *Plugin) waitForCSIBasedCVRs(vol *Volume, replicaCount int) error {
for cnt := 0; cnt < CVRWaitCount; cnt++ {
cvrList, err := p.OpenEBSAPIsClient.
CstorV1().
CStorVolumeReplicas(p.namespace).
List(metav1.ListOptions{
LabelSelector: "openebs.io/persistent-volume=" + vol.volname,
})
if err != nil {
return errors.Errorf("Failed to fetch CVR for volume=%s %s", vol.volname, err)
}

if len(cvrList.Items) != replicaCount {
time.Sleep(CVRCheckInterval)
continue
}

cvrCount := 0
for _, cvr := range cvrList.Items {
if cvr.Status.Phase == cstorv1.CVRStatusOnline ||
cvr.Status.Phase == cstorv1.CVRStatusError ||
cvr.Status.Phase == cstorv1.CVRStatusDegraded {
cvrCount++
}
}
if cvrCount == replicaCount {
return nil
}
time.Sleep(CVRCheckInterval)
}
return errors.Errorf("CVR for volume{%s} are not ready!", vol.volname)
}

// getCVRCount returns the number of CVR for given volume
func (p *Plugin) getCVRCount(volname string) int {
// getCVRCount returns the number of CVR for a given volume
func (p *Plugin) getCVRCount(volname string, isCSIVolume bool) int {
// For CSI based volume, CVR of v1 is used.
if isCSIVolume {
// If the volume is CSI based, then CVR V1 is used.
obj, err := p.OpenEBSAPIsClient.
CstorV1().
CStorVolumes(p.namespace).
Get(volname, metav1.GetOptions{})
if err != nil {
p.Log.Errorf("Failed to fetch cstorVolume.. %s", err)
return -1
}

return obj.Spec.ReplicationFactor
}
// For non CSI based volume, CVR of v1alpha1 is used.
obj, err := p.OpenEBSClient.
OpenebsV1alpha1().
CStorVolumes(p.namespace).
Expand All @@ -81,6 +138,5 @@ func (p *Plugin) getCVRCount(volname string) int {
p.Log.Errorf("Failed to fetch cstorVolume.. %s", err)
return -1
}

return obj.Spec.ReplicationFactor
}
14 changes: 12 additions & 2 deletions pkg/cstor/pv_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ func (p *Plugin) getVolumeForLocalRestore(volumeID, snapName string) (*Volume, e
}
p.Log.Infof("Renaming PV %s to %s", pv.Name, clonePvName)

isCSIVolume := isCSIPv(*pv)

vol := &Volume{
volname: clonePvName,
srcVolname: pv.Name,
backupName: snapName,
storageClass: pv.Spec.StorageClassName,
size: pv.Spec.Capacity[v1.ResourceStorage],
isCSIVolume: isCSIVolume,
}
p.volumes[vol.volname] = vol
return vol, nil
Expand All @@ -132,8 +135,6 @@ func (p *Plugin) getVolumeForRemoteRestore(volumeID, snapName string) (*Volume,
return nil, err
}

p.volumes[vol.volname] = vol

p.Log.Infof("Generated PV name is %s", vol.volname)

return vol, nil
Expand All @@ -148,3 +149,12 @@ func generateClonePVName() (string, error) {

return PvClonePrefix + nuuid.String(), nil
}

// isCSIPv returns true if given PV is created by cstor CSI driver
func isCSIPv(pv v1.PersistentVolume) bool {
if pv.Spec.CSI != nil &&
pv.Spec.CSI.Driver == openebsCSIName {
return true
}
return false
}
Loading

0 comments on commit aae7aa7

Please sign in to comment.