From 303d46eda40ab76d6774a822206d30beebddb127 Mon Sep 17 00:00:00 2001 From: Dinesh Israni Date: Mon, 15 Apr 2019 20:01:10 -0700 Subject: [PATCH] Move resource collection logic from migration controller Can be used by other modules to get unstructured objects from a namepsace and matching label selectors --- cmd/stork/stork.go | 13 +- pkg/migration/controllers/migration.go | 405 +++------------------ pkg/migration/migration.go | 7 +- pkg/resourcecollector/resourcecollector.go | 378 +++++++++++++++++++ 4 files changed, 444 insertions(+), 359 deletions(-) create mode 100644 pkg/resourcecollector/resourcecollector.go diff --git a/cmd/stork/stork.go b/cmd/stork/stork.go index 6d9f8b2cd8..387815c3da 100644 --- a/cmd/stork/stork.go +++ b/cmd/stork/stork.go @@ -17,6 +17,7 @@ import ( "github.com/libopenstorage/stork/pkg/migration" "github.com/libopenstorage/stork/pkg/monitor" "github.com/libopenstorage/stork/pkg/pvcwatcher" + "github.com/libopenstorage/stork/pkg/resourcecollector" "github.com/libopenstorage/stork/pkg/rule" "github.com/libopenstorage/stork/pkg/schedule" "github.com/libopenstorage/stork/pkg/snapshot" @@ -294,11 +295,19 @@ func runStork(d volume.Driver, recorder record.EventRecorder, c *cli.Context) { } } + resourceCollector := resourcecollector.ResourceCollector{ + Driver: d, + } + if err := resourceCollector.Init(); err != nil { + log.Fatalf("Error initializing ResourceCollector: %v", err) + } + if c.Bool("migration-controller") { migrationAdminNamespace := c.String("migration-admin-namespace") migration := migration.Migration{ - Driver: d, - Recorder: recorder, + Driver: d, + Recorder: recorder, + ResourceCollector: resourceCollector, } if err := migration.Init(migrationAdminNamespace); err != nil { log.Fatalf("Error initializing migration: %v", err) diff --git a/pkg/migration/controllers/migration.go b/pkg/migration/controllers/migration.go index d4e1b78b55..e8d97f2318 100644 --- a/pkg/migration/controllers/migration.go +++ b/pkg/migration/controllers/migration.go @@ -8,32 +8,28 @@ import ( "strings" "time" - "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" "github.com/libopenstorage/stork/drivers/volume" "github.com/libopenstorage/stork/pkg/apis/stork" stork_api "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1" "github.com/libopenstorage/stork/pkg/controller" "github.com/libopenstorage/stork/pkg/log" + "github.com/libopenstorage/stork/pkg/resourcecollector" "github.com/libopenstorage/stork/pkg/rule" "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/portworx/sched-ops/k8s" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -49,38 +45,13 @@ const ( type MigrationController struct { Driver volume.Driver Recorder record.EventRecorder - discoveryHelper discovery.Helper - dynamicInterface dynamic.Interface + ResourceCollector resourcecollector.ResourceCollector migrationAdminNamespace string } // Init Initialize the migration controller func (m *MigrationController) Init(migrationAdminNamespace string) error { - config, err := rest.InClusterConfig() - if err != nil { - return fmt.Errorf("error getting cluster config: %v", err) - } - - aeclient, err := apiextensionsclient.NewForConfig(config) - if err != nil { - return fmt.Errorf("error getting apiextention client, %v", err) - } - - err = m.createCRD() - if err != nil { - return err - } - - discoveryClient := aeclient.Discovery() - m.discoveryHelper, err = discovery.NewHelper(discoveryClient, logrus.New()) - if err != nil { - return err - } - err = m.discoveryHelper.Refresh() - if err != nil { - return err - } - m.dynamicInterface, err = dynamic.NewForConfig(config) + err := m.createCRD() if err != nil { return err } @@ -524,174 +495,53 @@ func (m *MigrationController) runPostExecRule(migration *stork_api.Migration) er return nil } -func resourceToBeMigrated(migration *stork_api.Migration, resource metav1.APIResource) bool { - // Deployment is present in "apps" and "extensions" group, so ignore - // "extensions" - if resource.Group == "extensions" && resource.Kind == "Deployment" { - return false - } - - switch resource.Kind { - case "PersistentVolumeClaim", - "PersistentVolume", - "Deployment", - "StatefulSet", - "ConfigMap", - "Service", - "Secret", - "DaemonSet", - "ServiceAccount", - "ClusterRole", - "ClusterRoleBinding": - return true - default: - return false - } -} - -func (m *MigrationController) objectToBeMigrated( - migration *stork_api.Migration, - resourceMap map[types.UID]bool, - object runtime.Unstructured, - namespace string, -) (bool, error) { - metadata, err := meta.Accessor(object) +func (m *MigrationController) migrateResources(migration *stork_api.Migration) error { + schedulerStatus, err := getClusterPairSchedulerStatus(migration.Spec.ClusterPair, migration.Namespace) if err != nil { - return false, err + return err } - // Skip if we've already processed this object - if _, ok := resourceMap[metadata.GetUID()]; ok { - return false, nil + if schedulerStatus != stork_api.ClusterPairStatusReady { + return fmt.Errorf("scheduler Cluster pair is not ready. Status: %v", schedulerStatus) } - objectType, err := meta.TypeAccessor(object) + allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors) if err != nil { - return false, err + m.Recorder.Event(migration, + v1.EventTypeWarning, + string(stork_api.MigrationStatusFailed), + fmt.Sprintf("Error getting resource: %v", err)) + log.MigrationLog(migration).Errorf("Error getting resources: %v", err) + return err } - switch objectType.GetKind() { - case "Service": - // Don't migrate the kubernetes service - metadata, err := meta.Accessor(object) - if err != nil { - return false, err - } - if metadata.GetName() == "kubernetes" { - return false, nil - } - case "PersistentVolumeClaim": - metadata, err := meta.Accessor(object) - if err != nil { - return false, err - } - pvcName := metadata.GetName() - pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, namespace) - if err != nil { - return false, err - } - if pvc.Status.Phase != v1.ClaimBound { - return false, nil - } - - if !m.Driver.OwnsPVC(pvc) { - return false, nil - } - return true, nil - case "PersistentVolume": - phase, err := collections.GetString(object.UnstructuredContent(), "status.phase") - if err != nil { - return false, err - } - if phase != string(v1.ClaimBound) { - return false, nil - } - pvcName, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.name") - if err != nil { - return false, err - } - if pvcName == "" { - return false, nil - } - - pvcNamespace, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.namespace") - if err != nil { - return false, err - } - if pvcNamespace != namespace { - return false, nil - } - - pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, pvcNamespace) + // Save the collected resources infos in the status + resourceInfos := make([]*stork_api.ResourceInfo, 0) + for _, obj := range allObjects { + metadata, err := meta.Accessor(obj) if err != nil { - return false, err - } - if !m.Driver.OwnsPVC(pvc) { - return false, nil - } - - if len(pvc.Labels) == 0 && len(migration.Spec.Selectors) > 0 { - return false, nil + return err } - if !labels.AreLabelsInWhiteList(labels.Set(migration.Spec.Selectors), - labels.Set(pvc.Labels)) { - return false, nil + resourceInfo := &stork_api.ResourceInfo{ + Name: metadata.GetName(), + Namespace: metadata.GetNamespace(), + Status: stork_api.MigrationStatusInProgress, } - return true, nil - case "ClusterRoleBinding": - name := metadata.GetName() - crb, err := k8s.Instance().GetClusterRoleBinding(name) - if err != nil { - return false, err - } - for _, subject := range crb.Subjects { - if subject.Namespace == namespace { - return true, nil - } - } - return false, nil - case "ClusterRole": - name := metadata.GetName() - crbs, err := k8s.Instance().ListClusterRoleBindings() - if err != nil { - return false, err - } - for _, crb := range crbs.Items { - if crb.RoleRef.Name == name { - for _, subject := range crb.Subjects { - if subject.Namespace == namespace { - return true, nil - } - } - } - } - return false, nil - - case "ServiceAccount": - // Don't migrate the default service account - name := metadata.GetName() - if name == "default" { - return false, nil + gvk := obj.GetObjectKind().GroupVersionKind() + resourceInfo.Kind = gvk.Kind + resourceInfo.Group = gvk.Group + // core Group doesn't have a name, so override it + if resourceInfo.Group == "" { + resourceInfo.Group = "core" } - } - - return true, nil -} - -func (m *MigrationController) migrateResources(migration *stork_api.Migration) error { - schedulerStatus, err := getClusterPairSchedulerStatus(migration.Spec.ClusterPair, migration.Namespace) - if err != nil { - return err - } + resourceInfo.Version = gvk.Version - if schedulerStatus != stork_api.ClusterPairStatusReady { - return fmt.Errorf("scheduler Cluster pair is not ready. Status: %v", schedulerStatus) + resourceInfos = append(resourceInfos, resourceInfo) } - - allObjects, err := m.getResources(migration) + migration.Status.Resources = resourceInfos + err = sdk.Update(migration) if err != nil { - log.MigrationLog(migration).Errorf("Error getting resources: %v", err) return err } @@ -730,158 +580,26 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e return nil } -func (m *MigrationController) getResources( - migration *stork_api.Migration, -) ([]runtime.Unstructured, error) { - err := m.discoveryHelper.Refresh() - if err != nil { - return nil, err - } - allObjects := make([]runtime.Unstructured, 0) - resourceInfos := make([]*stork_api.ResourceInfo, 0) - - for _, group := range m.discoveryHelper.Resources() { - groupVersion, err := schema.ParseGroupVersion(group.GroupVersion) - if err != nil { - return nil, err - } - if groupVersion.Group == "extensions" { - continue - } - - resourceMap := make(map[types.UID]bool) - for _, resource := range group.APIResources { - if !resourceToBeMigrated(migration, resource) { - continue - } - - for _, ns := range migration.Spec.Namespaces { - var dynamicClient dynamic.ResourceInterface - if !resource.Namespaced { - dynamicClient = m.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)) - } else { - dynamicClient = m.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)).Namespace(ns) - } - - var selectors string - // PVs don't get the labels from their PVCs, so don't use - // the label selector - if resource.Kind != "PersistentVolume" { - selectors = labels.Set(migration.Spec.Selectors).String() - } - objectsList, err := dynamicClient.List(metav1.ListOptions{ - LabelSelector: selectors, - }) - if err != nil { - return nil, err - } - objects, err := meta.ExtractList(objectsList) - if err != nil { - return nil, err - } - for _, o := range objects { - runtimeObject, ok := o.(runtime.Unstructured) - if !ok { - return nil, fmt.Errorf("error casting object: %v", o) - } - - migrate, err := m.objectToBeMigrated(migration, resourceMap, runtimeObject, ns) - if err != nil { - return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err) - } - if !migrate { - continue - } - metadata, err := meta.Accessor(runtimeObject) - if err != nil { - return nil, err - } - resourceInfo := &stork_api.ResourceInfo{ - Name: metadata.GetName(), - Namespace: metadata.GetNamespace(), - Status: stork_api.MigrationStatusInProgress, - } - resourceInfo.Kind = resource.Kind - resourceInfo.Group = groupVersion.Group - // core Group doesn't have a name, so override it - if resourceInfo.Group == "" { - resourceInfo.Group = "core" - } - resourceInfo.Version = groupVersion.Version - resourceInfos = append(resourceInfos, resourceInfo) - allObjects = append(allObjects, runtimeObject) - resourceMap[metadata.GetUID()] = true - } - } - } - migration.Status.Resources = resourceInfos - err = sdk.Update(migration) - if err != nil { - return nil, err - } - } - - return allObjects, nil -} - func (m *MigrationController) prepareResources( migration *stork_api.Migration, objects []runtime.Unstructured, ) error { for _, o := range objects { - content := o.UnstructuredContent() - // Status shouldn't be migrated between clusters - delete(content, "status") + metadata, err := meta.Accessor(o) + if err != nil { + return err + } switch o.GetObjectKind().GroupVersionKind().Kind { case "PersistentVolume": - updatedObject, err := m.preparePVResource(migration, o) + err := m.preparePVResource(o) if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing PV resource: %v", err)) - continue + return fmt.Errorf("error preparing PV resource %v: %v", metadata.GetName(), err) } - o = updatedObject case "Deployment", "StatefulSet": - updatedObject, err := m.prepareApplicationResource(migration, o) + err := m.prepareApplicationResource(migration, o) if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing Application resource: %v", err)) - continue - } - o = updatedObject - case "Service": - updatedObject, err := m.prepareServiceResource(migration, o) - if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing Service resource: %v", err)) - continue - } - o = updatedObject - } - metadata, err := collections.GetMap(content, "metadata") - if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error getting metadata for resource: %v", err)) - continue - } - for key := range metadata { - switch key { - case "name", "namespace", "labels", "annotations": - default: - delete(metadata, key) + return fmt.Errorf("error preparing %v resource %v: %v", o.GetObjectKind().GroupVersionKind().Kind, metadata.GetName(), err) } } } @@ -922,59 +640,36 @@ func (m *MigrationController) updateResourceStatus( } } -func (m *MigrationController) prepareServiceResource( - migration *stork_api.Migration, - object runtime.Unstructured, -) (runtime.Unstructured, error) { - spec, err := collections.GetMap(object.UnstructuredContent(), "spec") - if err != nil { - return nil, err - } - // Don't delete clusterIP for headless services - if ip, err := collections.GetString(spec, "clusterIP"); err == nil && ip != "None" { - delete(spec, "clusterIP") - } - - return object, nil -} - func (m *MigrationController) preparePVResource( - migration *stork_api.Migration, object runtime.Unstructured, -) (runtime.Unstructured, error) { - spec, err := collections.GetMap(object.UnstructuredContent(), "spec") - if err != nil { - return nil, err - } - delete(spec, "claimRef") - delete(spec, "storageClassName") - - return m.Driver.UpdateMigratedPersistentVolumeSpec(object) +) error { + _, err := m.Driver.UpdateMigratedPersistentVolumeSpec(object) + return err } func (m *MigrationController) prepareApplicationResource( migration *stork_api.Migration, object runtime.Unstructured, -) (runtime.Unstructured, error) { +) error { if *migration.Spec.StartApplications { - return object, nil + return nil } // Reset the replicas to 0 and store the current replicas in an annotation content := object.UnstructuredContent() spec, err := collections.GetMap(content, "spec") if err != nil { - return nil, err + return err } replicas := spec["replicas"].(int64) annotations, err := collections.GetMap(content, "metadata.annotations") if err != nil { - return nil, err + return err } annotations[StorkMigrationReplicasAnnotation] = strconv.FormatInt(replicas, 10) spec["replicas"] = 0 - return object, nil + return nil } func (m *MigrationController) applyResources( diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go index b0e696da3c..b738796c5e 100644 --- a/pkg/migration/migration.go +++ b/pkg/migration/migration.go @@ -5,6 +5,7 @@ import ( "github.com/libopenstorage/stork/drivers/volume" "github.com/libopenstorage/stork/pkg/migration/controllers" + "github.com/libopenstorage/stork/pkg/resourcecollector" "k8s.io/client-go/tools/record" ) @@ -12,6 +13,7 @@ import ( type Migration struct { Driver volume.Driver Recorder record.EventRecorder + ResourceCollector resourcecollector.ResourceCollector clusterPairController *controllers.ClusterPairController migrationController *controllers.MigrationController migrationScheduleController *controllers.MigrationScheduleController @@ -29,8 +31,9 @@ func (m *Migration) Init(migrationAdminNamespace string) error { } m.migrationController = &controllers.MigrationController{ - Driver: m.Driver, - Recorder: m.Recorder, + Driver: m.Driver, + Recorder: m.Recorder, + ResourceCollector: m.ResourceCollector, } err = m.migrationController.Init(migrationAdminNamespace) if err != nil { diff --git a/pkg/resourcecollector/resourcecollector.go b/pkg/resourcecollector/resourcecollector.go new file mode 100644 index 0000000000..d67851b793 --- /dev/null +++ b/pkg/resourcecollector/resourcecollector.go @@ -0,0 +1,378 @@ +package resourcecollector + +import ( + "fmt" + + "github.com/heptio/ark/pkg/discovery" + "github.com/heptio/ark/pkg/util/collections" + "github.com/libopenstorage/stork/drivers/volume" + "github.com/portworx/sched-ops/k8s" + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +// ResourceCollector is used to collect and process unstructured objects in namespaces and using label selectors +type ResourceCollector struct { + Driver volume.Driver + discoveryHelper discovery.Helper + dynamicInterface dynamic.Interface +} + +// Init initializes the resource collector +func (r *ResourceCollector) Init() error { + config, err := rest.InClusterConfig() + if err != nil { + return fmt.Errorf("error getting cluster config: %v", err) + } + + aeclient, err := apiextensionsclient.NewForConfig(config) + if err != nil { + return fmt.Errorf("error getting apiextention client, %v", err) + } + + discoveryClient := aeclient.Discovery() + r.discoveryHelper, err = discovery.NewHelper(discoveryClient, logrus.New()) + if err != nil { + return err + } + err = r.discoveryHelper.Refresh() + if err != nil { + return err + } + r.dynamicInterface, err = dynamic.NewForConfig(config) + if err != nil { + return err + } + return nil +} + +func resourceToBeCollected(resource metav1.APIResource) bool { + // Deployment is present in "apps" and "extensions" group, so ignore + // "extensions" + if resource.Group == "extensions" && resource.Kind == "Deployment" { + return false + } + + switch resource.Kind { + case "PersistentVolumeClaim", + "PersistentVolume", + "Deployment", + "StatefulSet", + "ConfigMap", + "Service", + "Secret", + "DaemonSet", + "ServiceAccount", + "ClusterRole", + "ClusterRoleBinding": + return true + default: + return false + } +} + +// GetResources gets all the resources in the given list of namespaces which match the labelSelectors +func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string) ([]runtime.Unstructured, error) { + err := r.discoveryHelper.Refresh() + if err != nil { + return nil, err + } + allObjects := make([]runtime.Unstructured, 0) + + for _, group := range r.discoveryHelper.Resources() { + groupVersion, err := schema.ParseGroupVersion(group.GroupVersion) + if err != nil { + return nil, err + } + if groupVersion.Group == "extensions" { + continue + } + + // Map to prevent collection of duplicate objects + resourceMap := make(map[types.UID]bool) + for _, resource := range group.APIResources { + if !resourceToBeCollected(resource) { + continue + } + + for _, ns := range namespaces { + var dynamicClient dynamic.ResourceInterface + if !resource.Namespaced { + dynamicClient = r.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)) + } else { + dynamicClient = r.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)).Namespace(ns) + } + + var selectors string + // PVs don't get the labels from their PVCs, so don't use + // the label selector + if resource.Kind != "PersistentVolume" { + selectors = labels.Set(labelSelectors).String() + } + objectsList, err := dynamicClient.List(metav1.ListOptions{ + LabelSelector: selectors, + }) + if err != nil { + return nil, err + } + objects, err := meta.ExtractList(objectsList) + if err != nil { + return nil, err + } + for _, o := range objects { + runtimeObject, ok := o.(runtime.Unstructured) + if !ok { + return nil, fmt.Errorf("error casting object: %v", o) + } + + collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, ns) + if err != nil { + return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err) + } + if !collect { + continue + } + metadata, err := meta.Accessor(runtimeObject) + if err != nil { + return nil, err + } + allObjects = append(allObjects, runtimeObject) + resourceMap[metadata.GetUID()] = true + } + } + } + } + + err = r.prepareResources(allObjects) + if err != nil { + return nil, err + } + return allObjects, nil +} + +// Returns whether an object should be collected or not for the requested +// namespace +func (r *ResourceCollector) objectToBeCollected( + labelSelectors map[string]string, + resourceMap map[types.UID]bool, + object runtime.Unstructured, + namespace string, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + + // Skip if we've already processed this object + if _, ok := resourceMap[metadata.GetUID()]; ok { + return false, nil + } + + objectType, err := meta.TypeAccessor(object) + if err != nil { + return false, err + } + + switch objectType.GetKind() { + case "Service": + // Don't migrate the kubernetes service + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + if metadata.GetName() == "kubernetes" { + return false, nil + } + case "PersistentVolumeClaim": + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + pvcName := metadata.GetName() + pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, namespace) + if err != nil { + return false, err + } + // Only collect Bound PVCs + if pvc.Status.Phase != v1.ClaimBound { + return false, nil + } + + // Don't collect PVCs not owned by the driver + if !r.Driver.OwnsPVC(pvc) { + return false, nil + } + return true, nil + case "PersistentVolume": + phase, err := collections.GetString(object.UnstructuredContent(), "status.phase") + if err != nil { + return false, err + } + // Only collect Bound PVs + if phase != string(v1.ClaimBound) { + return false, nil + } + + // Collect only PVs which have a reference to a PVC in the namespace + // requested + pvcName, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.name") + if err != nil { + return false, err + } + if pvcName == "" { + return false, nil + } + + pvcNamespace, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.namespace") + if err != nil { + return false, err + } + if pvcNamespace != namespace { + return false, nil + } + + pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, pvcNamespace) + if err != nil { + return false, err + } + // Collect only if the PVC bound to the PV is owned by the driver + if !r.Driver.OwnsPVC(pvc) { + return false, nil + } + + // Also check the labels on the PVC since the PV doesn't inherit the + // labels + if len(pvc.Labels) == 0 && len(labelSelectors) > 0 { + return false, nil + } + + if !labels.AreLabelsInWhiteList(labels.Set(labelSelectors), + labels.Set(pvc.Labels)) { + return false, nil + } + return true, nil + case "ClusterRoleBinding": + name := metadata.GetName() + crb, err := k8s.Instance().GetClusterRoleBinding(name) + if err != nil { + return false, err + } + // Check if there is a subject for the namespace which is requested + for _, subject := range crb.Subjects { + if subject.Namespace == namespace { + return true, nil + } + } + return false, nil + case "ClusterRole": + name := metadata.GetName() + crbs, err := k8s.Instance().ListClusterRoleBindings() + if err != nil { + return false, err + } + // Find the corresponding ClusterRoleBinding and see if if belongs to + // the requested namespace + for _, crb := range crbs.Items { + if crb.RoleRef.Name == name { + for _, subject := range crb.Subjects { + if subject.Namespace == namespace { + return true, nil + } + } + } + } + return false, nil + + case "ServiceAccount": + // Don't migrate the default service account + name := metadata.GetName() + if name == "default" { + return false, nil + } + } + + return true, nil +} + +func (r *ResourceCollector) preparePVResource( + object runtime.Unstructured, +) error { + spec, err := collections.GetMap(object.UnstructuredContent(), "spec") + if err != nil { + return err + } + + // Delete the claimRef so that the collected resource can be rebound + delete(spec, "claimRef") + + // Storage class needs to be removed so that it can rebind to an + // existing PV + delete(spec, "storageClassName") + + return nil +} + +func (r *ResourceCollector) prepareServiceResource( + object runtime.Unstructured, +) error { + spec, err := collections.GetMap(object.UnstructuredContent(), "spec") + if err != nil { + return err + } + // Don't delete clusterIP for headless services + if ip, err := collections.GetString(spec, "clusterIP"); err == nil && ip != "None" { + delete(spec, "clusterIP") + } + + return nil +} + +func (r *ResourceCollector) prepareResources( + objects []runtime.Unstructured, +) error { + for _, o := range objects { + content := o.UnstructuredContent() + // Status shouldn't be retained when collecting resources + delete(content, "status") + + metadata, err := meta.Accessor(o) + if err != nil { + return err + } + + switch o.GetObjectKind().GroupVersionKind().Kind { + case "PersistentVolume": + err := r.preparePVResource(o) + if err != nil { + return fmt.Errorf("error preparing PV resource %v: %v", metadata.GetName(), err) + } + case "Service": + err := r.prepareServiceResource(o) + if err != nil { + return fmt.Errorf("error preparing Service resource %v/%v: %v", metadata.GetNamespace(), metadata.GetName(), err) + } + } + metadataMap, err := collections.GetMap(content, "metadata") + if err != nil { + return fmt.Errorf("error getting metadata for resource %v: %v", metadata.GetName(), err) + } + // Remove all metadata except some well-known ones + for key := range metadataMap { + switch key { + case "name", "namespace", "labels", "annotations": + default: + delete(metadataMap, key) + } + } + } + return nil +}