diff --git a/pkg/cstor/api_service.go b/pkg/cstor/api_service.go index d2a820ea..3757f382 100644 --- a/pkg/cstor/api_service.go +++ b/pkg/cstor/api_service.go @@ -26,6 +26,7 @@ import ( v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1" "github.com/pkg/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -65,7 +66,7 @@ func (p *Plugin) httpRestCall(url, reqtype string, data []byte) ([]byte, error) } // getMapiAddr return maya API server's ip address -func (p *Plugin) getMapiAddr() string { +func (p *Plugin) getMapiAddr() (string, error) { var openebsNs string // check if user has provided openebs namespace @@ -85,8 +86,11 @@ func (p *Plugin) getMapiAddr() string { ) if err != nil { + if k8serrors.IsNotFound(err) { + return "", nil + } p.Log.Errorf("Error getting maya-apiservice : %v", err.Error()) - return "" + return "", err } if len(svclist.Items) != 0 { @@ -103,8 +107,11 @@ func (p *Plugin) getMapiAddr() string { FieldSelector: "metadata.name=" + mayaAPIServiceName, }) if err != nil { + if k8serrors.IsNotFound(err) { + return "", nil + } p.Log.Errorf("Error getting IP Address for service{%s} : %v", mayaAPIServiceName, err.Error()) - return "" + return "", err } fetchip: @@ -112,14 +119,59 @@ fetchip: if s.Spec.ClusterIP != "" { // update the namespace p.namespace = s.Namespace - return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10) + return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10), nil } } - return "" + return "", nil +} + +// getCVCAddr return CVC server's ip address +func (p *Plugin) getCVCAddr() (string, error) { + var openebsNs string + + // check if user has provided openebs namespace + if p.namespace != "" { + openebsNs = p.namespace + } else { + openebsNs = metav1.NamespaceAll + } + + svclist, err := p.K8sClient. + CoreV1(). + Services(openebsNs). + List( + metav1.ListOptions{ + LabelSelector: cvcAPIServiceLabel, + }, + ) + + if err != nil { + if k8serrors.IsNotFound(err) { + return "", nil + } + p.Log.Errorf("Error getting cvc service: %v", err.Error()) + return "", err + } + + if len(svclist.Items) == 0 { + return "", nil + } + + for _, s := range svclist.Items { + if s.Spec.ClusterIP != "" { + // update the namespace + p.namespace = s.Namespace + return "http://" + s.Spec.ClusterIP + ":" + strconv.FormatInt(int64(s.Spec.Ports[0].Port), 10), nil + } + } + + return "", nil } func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) { + var url string + scheduleName := p.getScheduleName(vol.backupName) // This will be backup/schedule name bkpSpec := &v1alpha1.CStorBackupSpec{ @@ -137,7 +189,11 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) { Spec: *bkpSpec, } - url := p.mayaAddr + backupEndpoint + if p.isCSIVolume { + url = p.cvcAddr + backupEndpoint + } else { + url = p.mayaAddr + backupEndpoint + } bkpData, err := json.Marshal(bkp) if err != nil { @@ -153,6 +209,8 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) { } func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error) { + var url string + restoreSrc := p.cstorServerAddr if p.local { restoreSrc = vol.srcVolname @@ -172,7 +230,11 @@ func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error) }, } - url := p.mayaAddr + restorePath + if p.isCSIVolume { + url = p.cvcAddr + restorePath + } else { + url = p.mayaAddr + restorePath + } restoreData, err := json.Marshal(restore) if err != nil { @@ -214,7 +276,13 @@ func isEmptyRestResponse(data []byte) (bool, error) { } func (p *Plugin) sendDeleteRequest(backup, volume, namespace, schedule string) error { - url := p.mayaAddr + backupEndpoint + backup + var url string + + if p.isCSIVolume { + url = p.cvcAddr + backupEndpoint + backup + } else { + url = p.mayaAddr + backupEndpoint + backup + } req, err := http.NewRequest("DELETE", url, nil) if err != nil { diff --git a/pkg/cstor/cstor.go b/pkg/cstor/cstor.go index 6ca64d3a..00c445c6 100644 --- a/pkg/cstor/cstor.go +++ b/pkg/cstor/cstor.go @@ -44,12 +44,14 @@ import ( const ( mayaAPIServiceName = "maya-apiserver-service" mayaAPIServiceLabel = "openebs.io/component-name=maya-apiserver-svc" + cvcAPIServiceLabel = "openebs.io/component-name=cvc-operator-svc" backupEndpoint = "/latest/backups/" restorePath = "/latest/restore/" casTypeCStor = "cstor" backupStatusInterval = 5 restoreStatusInterval = 5 openebsVolumeLabel = "openebs.io/cas-type" + openebsCSIName = "cstor.csi.openebs.io" ) const ( @@ -86,6 +88,9 @@ type Plugin struct { // mayaAddr is maya API server address mayaAddr string + // cvcAddr is cvc API server address + cvcAddr string + // cstorServerAddr is network address used for CStor volume operation // on this address cloud server will perform data operation(backup/restore) cstorServerAddr string @@ -98,6 +103,9 @@ type Plugin struct { // if only local snapshot enabled local bool + + // if cstor-csi is enabled + isCSIVolume bool } // Snapshot describes snapshot object information @@ -190,9 +198,22 @@ func (p *Plugin) Init(config map[string]string) error { } p.OpenEBSClient = openEBSClient - p.mayaAddr = p.getMapiAddr() - if p.mayaAddr == "" { - return errors.New("error fetching OpenEBS rest client address") + p.mayaAddr, err = p.getMapiAddr() + if err != nil { + return errors.Wrapf(err, "error fetching Maya-ApiServer rest client address") + } + + p.cvcAddr, err = p.getCVCAddr() + if err != nil { + return errors.Wrapf(err, "error fetching CVC rest client address") + } + + if p.mayaAddr == "" && p.cvcAddr == "" { + return errors.New("faile to get address for maya-apiserver/cvc-server service") + } + + if p.cvcAddr != "" { + p.isCSIVolume = true } p.cstorServerAddr = p.getServerAddress() @@ -233,18 +254,22 @@ func (p *Plugin) GetVolumeID(unstructuredPV runtime.Unstructured) (string, error // then we will return empty volumeId and error as nil. if pv.Name == "" || pv.Spec.StorageClassName == "" || - (pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace == "") || - len(pv.Labels) == 0 { + (pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace == "") { return "", nil } volType, ok := pv.Labels[openebsVolumeLabel] - if !ok { - return "", nil - } - - if volType != casTypeCStor { - return "", nil + if ok { + if volType != casTypeCStor { + return "", nil + } + } else { + // check if PV is created by CSI driver + if !(pv.Spec.CSI != nil && + pv.Spec.CSI.Driver == openebsCSIName) { + return "", nil + } + p.isCSIVolume = true } if pv.Status.Phase == v1.VolumeReleased || diff --git a/pkg/cstor/status.go b/pkg/cstor/status.go index f92cb61f..a2989a39 100644 --- a/pkg/cstor/status.go +++ b/pkg/cstor/status.go @@ -26,9 +26,16 @@ import ( // checkBackupStatus queries MayaAPI server for given backup status // and wait until backup completes func (p *Plugin) checkBackupStatus(bkp *v1alpha1.CStorBackup) { - var bkpDone bool + var ( + bkpDone bool + url string + ) - url := p.mayaAddr + backupEndpoint + if p.isCSIVolume { + url = p.cvcAddr + backupEndpoint + } else { + url = p.mayaAddr + backupEndpoint + } bkpvolume, exists := p.volumes[bkp.Spec.VolumeName] if !exists { @@ -78,9 +85,16 @@ func (p *Plugin) checkBackupStatus(bkp *v1alpha1.CStorBackup) { // checkRestoreStatus queries MayaAPI server for given restore status // and wait until restore completes func (p *Plugin) checkRestoreStatus(rst *v1alpha1.CStorRestore, vol *Volume) { - var rstDone bool + var ( + rstDone bool + url string + ) - url := p.mayaAddr + restorePath + if p.isCSIVolume { + url = p.cvcAddr + restorePath + } else { + url = p.mayaAddr + restorePath + } rstData, err := json.Marshal(rst) if err != nil {