diff --git a/pkg/cstor/cstor.go b/pkg/cstor/cstor.go index 7b9401e..21f2394 100644 --- a/pkg/cstor/cstor.go +++ b/pkg/cstor/cstor.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "os" "strings" "time" @@ -111,6 +112,9 @@ type Plugin struct { // namespace in which openebs is installed, default is openebs namespace string + // nodeID is used to identify the node on which the program is running + nodeID string + // cl stores cloud connection information cl *cloud.Conn @@ -218,6 +222,13 @@ func (p *Plugin) Init(config map[string]string) error { p.namespace = ns } + nodeID := os.Getenv("VELERO_NODE_ID") + if nodeID == "" { + return errors.New("env VELERO_NODE_ID not set") + } + p.Log.Infof("env VELERO_NODE_ID: ", nodeID) + p.nodeID = nodeID + conf, err := rest.InClusterConfig() if err != nil { p.Log.Errorf("Failed to get cluster config : %s", err.Error()) diff --git a/pkg/cstor/cvc_operation.go b/pkg/cstor/cvc_operation.go index d859570..c5f585d 100644 --- a/pkg/cstor/cvc_operation.go +++ b/pkg/cstor/cvc_operation.go @@ -17,6 +17,7 @@ limitations under the License. package cstor import ( + "context" "encoding/json" "fmt" @@ -24,6 +25,7 @@ import ( maya "github.com/openebs/cstor-csi/pkg/utils" "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // (Kasakaze)todo: Determine whether it is csiVolume, if so, cvc must be backed up @@ -83,10 +85,13 @@ func (p *Plugin) restoreCVC(volumeID, pvcName, pvcNamespace, snapName string) er rCount = fmt.Sprint(rcvc.Spec.Provision.ReplicaCount) cspcName = rcvc.ObjectMeta.Labels["openebs.io/cstor-pool-cluster"] snapshotID = "" - // (Kasakaze)todo: If the data is migrated to another cluster, the nodeID may not be the same nodeID = rcvc.Publish.NodeID policyName = rcvc.ObjectMeta.Labels["openebs.io/volume-policy"] ) + nodeID, err = p.getValidNodeID(nodeID) + if err != nil { + return errors.Cause(err) + } err = maya.ProvisionVolume(size, volumeID, rCount, cspcName, snapshotID, @@ -114,3 +119,22 @@ func (p *Plugin) downloadCVC(volumeID, snapName string) (*cstorv1.CStorVolumeCon return cvc, nil } + +// If the backup cvc nodeID does not belong to the current cluster, use the environment variable VELERO_NODE_ID +func (p *Plugin) getValidNodeID(nodeID string) (string, error) { + if nodeID == "" { + return p.nodeID, nil + } + + _, err := p.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return "", errors.Cause(err) + } + + p.Log.Warnf("invalid nodeID(%s), use env VELERO_NODE_ID(%s)", nodeID, p.nodeID) + nodeID = p.nodeID + } + + return nodeID, nil +}