Skip to content

Commit

Permalink
support for cstor csi volume backup (#91)
Browse files Browse the repository at this point in the history
Signed-off-by: mynktl <[email protected]>
  • Loading branch information
mynktl authored and kmova committed Jun 11, 2020
1 parent eda5f4e commit 722587f
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 23 deletions.
84 changes: 76 additions & 8 deletions pkg/cstor/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func (p *Plugin) httpRestCall(url, reqtype string, data []byte) ([]byte, error)
}

// getMapiAddr return maya API server's ip address
func (p *Plugin) getMapiAddr() string {
func (p *Plugin) getMapiAddr() (string, error) {
var openebsNs string

// check if user has provided openebs namespace
Expand All @@ -85,8 +86,11 @@ func (p *Plugin) getMapiAddr() string {
)

if err != nil {
if k8serrors.IsNotFound(err) {
return "", nil
}
p.Log.Errorf("Error getting maya-apiservice : %v", err.Error())
return ""
return "", err
}

if len(svclist.Items) != 0 {
Expand All @@ -103,23 +107,71 @@ func (p *Plugin) getMapiAddr() string {
FieldSelector: "metadata.name=" + mayaAPIServiceName,
})
if err != nil {
if k8serrors.IsNotFound(err) {
return "", nil
}
p.Log.Errorf("Error getting IP Address for service{%s} : %v", mayaAPIServiceName, err.Error())
return ""
return "", err
}

fetchip:
for _, s := range svclist.Items {
if s.Spec.ClusterIP != "" {
// update the namespace
p.namespace = s.Namespace
return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10)
return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10), nil
}
}

return ""
return "", nil
}

// getCVCAddr return CVC server's ip address
func (p *Plugin) getCVCAddr() (string, error) {
var openebsNs string

// check if user has provided openebs namespace
if p.namespace != "" {
openebsNs = p.namespace
} else {
openebsNs = metav1.NamespaceAll
}

svclist, err := p.K8sClient.
CoreV1().
Services(openebsNs).
List(
metav1.ListOptions{
LabelSelector: cvcAPIServiceLabel,
},
)

if err != nil {
if k8serrors.IsNotFound(err) {
return "", nil
}
p.Log.Errorf("Error getting cvc service: %v", err.Error())
return "", err
}

if len(svclist.Items) == 0 {
return "", nil
}

for _, s := range svclist.Items {
if s.Spec.ClusterIP != "" {
// update the namespace
p.namespace = s.Namespace
return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10), nil
}
}

return "", nil
}

func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
var url string

scheduleName := p.getScheduleName(vol.backupName) // This will be backup/schedule name

bkpSpec := &v1alpha1.CStorBackupSpec{
Expand All @@ -137,7 +189,11 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
Spec: *bkpSpec,
}

url := p.mayaAddr + backupEndpoint
if p.isCSIVolume {
url = p.cvcAddr + backupEndpoint
} else {
url = p.mayaAddr + backupEndpoint
}

bkpData, err := json.Marshal(bkp)
if err != nil {
Expand All @@ -153,6 +209,8 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
}

func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error) {
var url string

restoreSrc := p.cstorServerAddr
if p.local {
restoreSrc = vol.srcVolname
Expand All @@ -172,7 +230,11 @@ func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error)
},
}

url := p.mayaAddr + restorePath
if p.isCSIVolume {
url = p.cvcAddr + restorePath
} else {
url = p.mayaAddr + restorePath
}

restoreData, err := json.Marshal(restore)
if err != nil {
Expand Down Expand Up @@ -214,7 +276,13 @@ func isEmptyRestResponse(data []byte) (bool, error) {
}

func (p *Plugin) sendDeleteRequest(backup, volume, namespace, schedule string) error {
url := p.mayaAddr + backupEndpoint + backup
var url string

if p.isCSIVolume {
url = p.cvcAddr + backupEndpoint + backup
} else {
url = p.mayaAddr + backupEndpoint + backup
}

req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
Expand Down
47 changes: 36 additions & 11 deletions pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ import (
const (
mayaAPIServiceName = "maya-apiserver-service"
mayaAPIServiceLabel = "openebs.io/component-name=maya-apiserver-svc"
cvcAPIServiceLabel = "openebs.io/component-name=cvc-operator-svc"
backupEndpoint = "/latest/backups/"
restorePath = "/latest/restore/"
casTypeCStor = "cstor"
backupStatusInterval = 5
restoreStatusInterval = 5
openebsVolumeLabel = "openebs.io/cas-type"
openebsCSIName = "cstor.csi.openebs.io"
)

const (
Expand Down Expand Up @@ -86,6 +88,9 @@ type Plugin struct {
// mayaAddr is maya API server address
mayaAddr string

// cvcAddr is cvc API server address
cvcAddr string

// cstorServerAddr is network address used for CStor volume operation
// on this address cloud server will perform data operation(backup/restore)
cstorServerAddr string
Expand All @@ -98,6 +103,9 @@ type Plugin struct {

// if only local snapshot enabled
local bool

// if cstor-csi is enabled
isCSIVolume bool
}

// Snapshot describes snapshot object information
Expand Down Expand Up @@ -190,9 +198,22 @@ func (p *Plugin) Init(config map[string]string) error {
}
p.OpenEBSClient = openEBSClient

p.mayaAddr = p.getMapiAddr()
if p.mayaAddr == "" {
return errors.New("error fetching OpenEBS rest client address")
p.mayaAddr, err = p.getMapiAddr()
if err != nil {
return errors.Wrapf(err, "error fetching Maya-ApiServer rest client address")
}

p.cvcAddr, err = p.getCVCAddr()
if err != nil {
return errors.Wrapf(err, "error fetching CVC rest client address")
}

if p.mayaAddr == "" && p.cvcAddr == "" {
return errors.New("faile to get address for maya-apiserver/cvc-server service")
}

if p.cvcAddr != "" {
p.isCSIVolume = true
}

p.cstorServerAddr = p.getServerAddress()
Expand Down Expand Up @@ -233,18 +254,22 @@ func (p *Plugin) GetVolumeID(unstructuredPV runtime.Unstructured) (string, error
// then we will return empty volumeId and error as nil.
if pv.Name == "" ||
pv.Spec.StorageClassName == "" ||
(pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace == "") ||
len(pv.Labels) == 0 {
(pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace == "") {
return "", nil
}

volType, ok := pv.Labels[openebsVolumeLabel]
if !ok {
return "", nil
}

if volType != casTypeCStor {
return "", nil
if ok {
if volType != casTypeCStor {
return "", nil
}
} else {
// check if PV is created by CSI driver
if !(pv.Spec.CSI != nil &&
pv.Spec.CSI.Driver == openebsCSIName) {
return "", nil
}
p.isCSIVolume = true
}

if pv.Status.Phase == v1.VolumeReleased ||
Expand Down
22 changes: 18 additions & 4 deletions pkg/cstor/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@ import (
// checkBackupStatus queries MayaAPI server for given backup status
// and wait until backup completes
func (p *Plugin) checkBackupStatus(bkp *v1alpha1.CStorBackup) {
var bkpDone bool
var (
bkpDone bool
url string
)

url := p.mayaAddr + backupEndpoint
if p.isCSIVolume {
url = p.cvcAddr + backupEndpoint
} else {
url = p.mayaAddr + backupEndpoint
}

bkpvolume, exists := p.volumes[bkp.Spec.VolumeName]
if !exists {
Expand Down Expand Up @@ -78,9 +85,16 @@ func (p *Plugin) checkBackupStatus(bkp *v1alpha1.CStorBackup) {
// checkRestoreStatus queries MayaAPI server for given restore status
// and wait until restore completes
func (p *Plugin) checkRestoreStatus(rst *v1alpha1.CStorRestore, vol *Volume) {
var rstDone bool
var (
rstDone bool
url string
)

url := p.mayaAddr + restorePath
if p.isCSIVolume {
url = p.cvcAddr + restorePath
} else {
url = p.mayaAddr + restorePath
}

rstData, err := json.Marshal(rst)
if err != nil {
Expand Down

0 comments on commit 722587f

Please sign in to comment.