From a37ca2f645b50755ffff3882dab9ba23047de369 Mon Sep 17 00:00:00 2001 From: Dinesh Israni Date: Mon, 15 Apr 2019 20:01:10 -0700 Subject: [PATCH] Move resource collection logic from migration controller Can be used by other modules to get unstructured objects from a namepsace and matching label selectors --- cmd/stork/stork.go | 13 +- pkg/migration/controllers/migration.go | 405 +++--------------- pkg/migration/migration.go | 7 +- pkg/resourcecollector/clusterrole.go | 194 +++++++++ pkg/resourcecollector/persistentvolume.go | 105 +++++ .../persistentvolumeclaim.go | 66 +++ pkg/resourcecollector/resourcecollector.go | 383 +++++++++++++++++ pkg/resourcecollector/secret.go | 24 ++ pkg/resourcecollector/service.go | 37 ++ pkg/resourcecollector/serviceaccount.go | 19 + 10 files changed, 894 insertions(+), 359 deletions(-) create mode 100644 pkg/resourcecollector/clusterrole.go create mode 100644 pkg/resourcecollector/persistentvolume.go create mode 100644 pkg/resourcecollector/persistentvolumeclaim.go create mode 100644 pkg/resourcecollector/resourcecollector.go create mode 100644 pkg/resourcecollector/secret.go create mode 100644 pkg/resourcecollector/service.go create mode 100644 pkg/resourcecollector/serviceaccount.go diff --git a/cmd/stork/stork.go b/cmd/stork/stork.go index 6d9f8b2cd8..387815c3da 100644 --- a/cmd/stork/stork.go +++ b/cmd/stork/stork.go @@ -17,6 +17,7 @@ import ( "github.com/libopenstorage/stork/pkg/migration" "github.com/libopenstorage/stork/pkg/monitor" "github.com/libopenstorage/stork/pkg/pvcwatcher" + "github.com/libopenstorage/stork/pkg/resourcecollector" "github.com/libopenstorage/stork/pkg/rule" "github.com/libopenstorage/stork/pkg/schedule" "github.com/libopenstorage/stork/pkg/snapshot" @@ -294,11 +295,19 @@ func runStork(d volume.Driver, recorder record.EventRecorder, c *cli.Context) { } } + resourceCollector := resourcecollector.ResourceCollector{ + Driver: d, + } + if err := resourceCollector.Init(); err != nil { + log.Fatalf("Error initializing ResourceCollector: %v", err) + } + if c.Bool("migration-controller") { migrationAdminNamespace := c.String("migration-admin-namespace") migration := migration.Migration{ - Driver: d, - Recorder: recorder, + Driver: d, + Recorder: recorder, + ResourceCollector: resourceCollector, } if err := migration.Init(migrationAdminNamespace); err != nil { log.Fatalf("Error initializing migration: %v", err) diff --git a/pkg/migration/controllers/migration.go b/pkg/migration/controllers/migration.go index d4e1b78b55..e8d97f2318 100644 --- a/pkg/migration/controllers/migration.go +++ b/pkg/migration/controllers/migration.go @@ -8,32 +8,28 @@ import ( "strings" "time" - "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" "github.com/libopenstorage/stork/drivers/volume" "github.com/libopenstorage/stork/pkg/apis/stork" stork_api "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1" "github.com/libopenstorage/stork/pkg/controller" "github.com/libopenstorage/stork/pkg/log" + "github.com/libopenstorage/stork/pkg/resourcecollector" "github.com/libopenstorage/stork/pkg/rule" "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/portworx/sched-ops/k8s" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -49,38 +45,13 @@ const ( type MigrationController struct { Driver volume.Driver Recorder record.EventRecorder - discoveryHelper discovery.Helper - dynamicInterface dynamic.Interface + ResourceCollector resourcecollector.ResourceCollector migrationAdminNamespace string } // Init Initialize the migration controller func (m *MigrationController) Init(migrationAdminNamespace string) error { - config, err := rest.InClusterConfig() - if err != nil { - return fmt.Errorf("error getting cluster config: %v", err) - } - - aeclient, err := apiextensionsclient.NewForConfig(config) - if err != nil { - return fmt.Errorf("error getting apiextention client, %v", err) - } - - err = m.createCRD() - if err != nil { - return err - } - - discoveryClient := aeclient.Discovery() - m.discoveryHelper, err = discovery.NewHelper(discoveryClient, logrus.New()) - if err != nil { - return err - } - err = m.discoveryHelper.Refresh() - if err != nil { - return err - } - m.dynamicInterface, err = dynamic.NewForConfig(config) + err := m.createCRD() if err != nil { return err } @@ -524,174 +495,53 @@ func (m *MigrationController) runPostExecRule(migration *stork_api.Migration) er return nil } -func resourceToBeMigrated(migration *stork_api.Migration, resource metav1.APIResource) bool { - // Deployment is present in "apps" and "extensions" group, so ignore - // "extensions" - if resource.Group == "extensions" && resource.Kind == "Deployment" { - return false - } - - switch resource.Kind { - case "PersistentVolumeClaim", - "PersistentVolume", - "Deployment", - "StatefulSet", - "ConfigMap", - "Service", - "Secret", - "DaemonSet", - "ServiceAccount", - "ClusterRole", - "ClusterRoleBinding": - return true - default: - return false - } -} - -func (m *MigrationController) objectToBeMigrated( - migration *stork_api.Migration, - resourceMap map[types.UID]bool, - object runtime.Unstructured, - namespace string, -) (bool, error) { - metadata, err := meta.Accessor(object) +func (m *MigrationController) migrateResources(migration *stork_api.Migration) error { + schedulerStatus, err := getClusterPairSchedulerStatus(migration.Spec.ClusterPair, migration.Namespace) if err != nil { - return false, err + return err } - // Skip if we've already processed this object - if _, ok := resourceMap[metadata.GetUID()]; ok { - return false, nil + if schedulerStatus != stork_api.ClusterPairStatusReady { + return fmt.Errorf("scheduler Cluster pair is not ready. Status: %v", schedulerStatus) } - objectType, err := meta.TypeAccessor(object) + allObjects, err := m.ResourceCollector.GetResources(migration.Spec.Namespaces, migration.Spec.Selectors) if err != nil { - return false, err + m.Recorder.Event(migration, + v1.EventTypeWarning, + string(stork_api.MigrationStatusFailed), + fmt.Sprintf("Error getting resource: %v", err)) + log.MigrationLog(migration).Errorf("Error getting resources: %v", err) + return err } - switch objectType.GetKind() { - case "Service": - // Don't migrate the kubernetes service - metadata, err := meta.Accessor(object) - if err != nil { - return false, err - } - if metadata.GetName() == "kubernetes" { - return false, nil - } - case "PersistentVolumeClaim": - metadata, err := meta.Accessor(object) - if err != nil { - return false, err - } - pvcName := metadata.GetName() - pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, namespace) - if err != nil { - return false, err - } - if pvc.Status.Phase != v1.ClaimBound { - return false, nil - } - - if !m.Driver.OwnsPVC(pvc) { - return false, nil - } - return true, nil - case "PersistentVolume": - phase, err := collections.GetString(object.UnstructuredContent(), "status.phase") - if err != nil { - return false, err - } - if phase != string(v1.ClaimBound) { - return false, nil - } - pvcName, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.name") - if err != nil { - return false, err - } - if pvcName == "" { - return false, nil - } - - pvcNamespace, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.namespace") - if err != nil { - return false, err - } - if pvcNamespace != namespace { - return false, nil - } - - pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, pvcNamespace) + // Save the collected resources infos in the status + resourceInfos := make([]*stork_api.ResourceInfo, 0) + for _, obj := range allObjects { + metadata, err := meta.Accessor(obj) if err != nil { - return false, err - } - if !m.Driver.OwnsPVC(pvc) { - return false, nil - } - - if len(pvc.Labels) == 0 && len(migration.Spec.Selectors) > 0 { - return false, nil + return err } - if !labels.AreLabelsInWhiteList(labels.Set(migration.Spec.Selectors), - labels.Set(pvc.Labels)) { - return false, nil + resourceInfo := &stork_api.ResourceInfo{ + Name: metadata.GetName(), + Namespace: metadata.GetNamespace(), + Status: stork_api.MigrationStatusInProgress, } - return true, nil - case "ClusterRoleBinding": - name := metadata.GetName() - crb, err := k8s.Instance().GetClusterRoleBinding(name) - if err != nil { - return false, err - } - for _, subject := range crb.Subjects { - if subject.Namespace == namespace { - return true, nil - } - } - return false, nil - case "ClusterRole": - name := metadata.GetName() - crbs, err := k8s.Instance().ListClusterRoleBindings() - if err != nil { - return false, err - } - for _, crb := range crbs.Items { - if crb.RoleRef.Name == name { - for _, subject := range crb.Subjects { - if subject.Namespace == namespace { - return true, nil - } - } - } - } - return false, nil - - case "ServiceAccount": - // Don't migrate the default service account - name := metadata.GetName() - if name == "default" { - return false, nil + gvk := obj.GetObjectKind().GroupVersionKind() + resourceInfo.Kind = gvk.Kind + resourceInfo.Group = gvk.Group + // core Group doesn't have a name, so override it + if resourceInfo.Group == "" { + resourceInfo.Group = "core" } - } - - return true, nil -} - -func (m *MigrationController) migrateResources(migration *stork_api.Migration) error { - schedulerStatus, err := getClusterPairSchedulerStatus(migration.Spec.ClusterPair, migration.Namespace) - if err != nil { - return err - } + resourceInfo.Version = gvk.Version - if schedulerStatus != stork_api.ClusterPairStatusReady { - return fmt.Errorf("scheduler Cluster pair is not ready. Status: %v", schedulerStatus) + resourceInfos = append(resourceInfos, resourceInfo) } - - allObjects, err := m.getResources(migration) + migration.Status.Resources = resourceInfos + err = sdk.Update(migration) if err != nil { - log.MigrationLog(migration).Errorf("Error getting resources: %v", err) return err } @@ -730,158 +580,26 @@ func (m *MigrationController) migrateResources(migration *stork_api.Migration) e return nil } -func (m *MigrationController) getResources( - migration *stork_api.Migration, -) ([]runtime.Unstructured, error) { - err := m.discoveryHelper.Refresh() - if err != nil { - return nil, err - } - allObjects := make([]runtime.Unstructured, 0) - resourceInfos := make([]*stork_api.ResourceInfo, 0) - - for _, group := range m.discoveryHelper.Resources() { - groupVersion, err := schema.ParseGroupVersion(group.GroupVersion) - if err != nil { - return nil, err - } - if groupVersion.Group == "extensions" { - continue - } - - resourceMap := make(map[types.UID]bool) - for _, resource := range group.APIResources { - if !resourceToBeMigrated(migration, resource) { - continue - } - - for _, ns := range migration.Spec.Namespaces { - var dynamicClient dynamic.ResourceInterface - if !resource.Namespaced { - dynamicClient = m.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)) - } else { - dynamicClient = m.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)).Namespace(ns) - } - - var selectors string - // PVs don't get the labels from their PVCs, so don't use - // the label selector - if resource.Kind != "PersistentVolume" { - selectors = labels.Set(migration.Spec.Selectors).String() - } - objectsList, err := dynamicClient.List(metav1.ListOptions{ - LabelSelector: selectors, - }) - if err != nil { - return nil, err - } - objects, err := meta.ExtractList(objectsList) - if err != nil { - return nil, err - } - for _, o := range objects { - runtimeObject, ok := o.(runtime.Unstructured) - if !ok { - return nil, fmt.Errorf("error casting object: %v", o) - } - - migrate, err := m.objectToBeMigrated(migration, resourceMap, runtimeObject, ns) - if err != nil { - return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err) - } - if !migrate { - continue - } - metadata, err := meta.Accessor(runtimeObject) - if err != nil { - return nil, err - } - resourceInfo := &stork_api.ResourceInfo{ - Name: metadata.GetName(), - Namespace: metadata.GetNamespace(), - Status: stork_api.MigrationStatusInProgress, - } - resourceInfo.Kind = resource.Kind - resourceInfo.Group = groupVersion.Group - // core Group doesn't have a name, so override it - if resourceInfo.Group == "" { - resourceInfo.Group = "core" - } - resourceInfo.Version = groupVersion.Version - resourceInfos = append(resourceInfos, resourceInfo) - allObjects = append(allObjects, runtimeObject) - resourceMap[metadata.GetUID()] = true - } - } - } - migration.Status.Resources = resourceInfos - err = sdk.Update(migration) - if err != nil { - return nil, err - } - } - - return allObjects, nil -} - func (m *MigrationController) prepareResources( migration *stork_api.Migration, objects []runtime.Unstructured, ) error { for _, o := range objects { - content := o.UnstructuredContent() - // Status shouldn't be migrated between clusters - delete(content, "status") + metadata, err := meta.Accessor(o) + if err != nil { + return err + } switch o.GetObjectKind().GroupVersionKind().Kind { case "PersistentVolume": - updatedObject, err := m.preparePVResource(migration, o) + err := m.preparePVResource(o) if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing PV resource: %v", err)) - continue + return fmt.Errorf("error preparing PV resource %v: %v", metadata.GetName(), err) } - o = updatedObject case "Deployment", "StatefulSet": - updatedObject, err := m.prepareApplicationResource(migration, o) + err := m.prepareApplicationResource(migration, o) if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing Application resource: %v", err)) - continue - } - o = updatedObject - case "Service": - updatedObject, err := m.prepareServiceResource(migration, o) - if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error preparing Service resource: %v", err)) - continue - } - o = updatedObject - } - metadata, err := collections.GetMap(content, "metadata") - if err != nil { - m.updateResourceStatus( - migration, - o, - stork_api.MigrationStatusFailed, - fmt.Sprintf("Error getting metadata for resource: %v", err)) - continue - } - for key := range metadata { - switch key { - case "name", "namespace", "labels", "annotations": - default: - delete(metadata, key) + return fmt.Errorf("error preparing %v resource %v: %v", o.GetObjectKind().GroupVersionKind().Kind, metadata.GetName(), err) } } } @@ -922,59 +640,36 @@ func (m *MigrationController) updateResourceStatus( } } -func (m *MigrationController) prepareServiceResource( - migration *stork_api.Migration, - object runtime.Unstructured, -) (runtime.Unstructured, error) { - spec, err := collections.GetMap(object.UnstructuredContent(), "spec") - if err != nil { - return nil, err - } - // Don't delete clusterIP for headless services - if ip, err := collections.GetString(spec, "clusterIP"); err == nil && ip != "None" { - delete(spec, "clusterIP") - } - - return object, nil -} - func (m *MigrationController) preparePVResource( - migration *stork_api.Migration, object runtime.Unstructured, -) (runtime.Unstructured, error) { - spec, err := collections.GetMap(object.UnstructuredContent(), "spec") - if err != nil { - return nil, err - } - delete(spec, "claimRef") - delete(spec, "storageClassName") - - return m.Driver.UpdateMigratedPersistentVolumeSpec(object) +) error { + _, err := m.Driver.UpdateMigratedPersistentVolumeSpec(object) + return err } func (m *MigrationController) prepareApplicationResource( migration *stork_api.Migration, object runtime.Unstructured, -) (runtime.Unstructured, error) { +) error { if *migration.Spec.StartApplications { - return object, nil + return nil } // Reset the replicas to 0 and store the current replicas in an annotation content := object.UnstructuredContent() spec, err := collections.GetMap(content, "spec") if err != nil { - return nil, err + return err } replicas := spec["replicas"].(int64) annotations, err := collections.GetMap(content, "metadata.annotations") if err != nil { - return nil, err + return err } annotations[StorkMigrationReplicasAnnotation] = strconv.FormatInt(replicas, 10) spec["replicas"] = 0 - return object, nil + return nil } func (m *MigrationController) applyResources( diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go index b0e696da3c..b738796c5e 100644 --- a/pkg/migration/migration.go +++ b/pkg/migration/migration.go @@ -5,6 +5,7 @@ import ( "github.com/libopenstorage/stork/drivers/volume" "github.com/libopenstorage/stork/pkg/migration/controllers" + "github.com/libopenstorage/stork/pkg/resourcecollector" "k8s.io/client-go/tools/record" ) @@ -12,6 +13,7 @@ import ( type Migration struct { Driver volume.Driver Recorder record.EventRecorder + ResourceCollector resourcecollector.ResourceCollector clusterPairController *controllers.ClusterPairController migrationController *controllers.MigrationController migrationScheduleController *controllers.MigrationScheduleController @@ -29,8 +31,9 @@ func (m *Migration) Init(migrationAdminNamespace string) error { } m.migrationController = &controllers.MigrationController{ - Driver: m.Driver, - Recorder: m.Recorder, + Driver: m.Driver, + Recorder: m.Recorder, + ResourceCollector: m.ResourceCollector, } err = m.migrationController.Init(migrationAdminNamespace) if err != nil { diff --git a/pkg/resourcecollector/clusterrole.go b/pkg/resourcecollector/clusterrole.go new file mode 100644 index 0000000000..23d71b06ed --- /dev/null +++ b/pkg/resourcecollector/clusterrole.go @@ -0,0 +1,194 @@ +package resourcecollector + +import ( + "strings" + + "github.com/portworx/sched-ops/k8s" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/authentication/serviceaccount" +) + +func (r *ResourceCollector) subjectInNamespace(subject *rbacv1.Subject, namespace string) (bool, error) { + switch subject.Kind { + case rbacv1.ServiceAccountKind: + if subject.Namespace == namespace { + return true, nil + } + case rbacv1.UserKind: + userNamespace, _, err := serviceaccount.SplitUsername(subject.Name) + if err != nil { + return false, nil + } + if userNamespace == namespace { + return true, nil + } + case rbacv1.GroupKind: + groupNamespace := strings.TrimPrefix(subject.Name, serviceaccount.ServiceAccountUsernamePrefix) + if groupNamespace == namespace { + return true, nil + } + } + return false, nil +} + +func (r *ResourceCollector) clusterRoleBindingToBeCollected( + labelSelectors map[string]string, + object runtime.Unstructured, + namespace string, +) (bool, error) { + var crb rbacv1.ClusterRoleBinding + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &crb); err != nil { + return false, err + } + // Check if there is a subject for the namespace which is + // requested + for _, subject := range crb.Subjects { + collect, err := r.subjectInNamespace(&subject, namespace) + if err != nil || collect { + return collect, err + } + } + return false, nil +} + +func (r *ResourceCollector) clusterRoleToBeCollected( + labelSelectors map[string]string, + object runtime.Unstructured, + namespace string, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + name := metadata.GetName() + crbs, err := k8s.Instance().ListClusterRoleBindings() + if err != nil { + return false, err + } + // Find the corresponding ClusterRoleBinding and see + // if it belongs to the requested namespace + for _, crb := range crbs.Items { + if crb.RoleRef.Name == name { + for _, subject := range crb.Subjects { + collect, err := r.subjectInNamespace(&subject, namespace) + if err != nil || collect { + return collect, err + } + } + } + } + return false, nil +} + +func (r *ResourceCollector) prepareClusterRoleBindingForCollection( + object runtime.Unstructured, + namespaces []string, +) error { + var crb rbacv1.ClusterRoleBinding + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &crb); err != nil { + return err + } + subjectsToCollect := make([]rbacv1.Subject, 0) + // Check if there is a subject for the namespace which is requested + for _, subject := range crb.Subjects { + for _, ns := range namespaces { + collect, err := r.subjectInNamespace(&subject, ns) + if err != nil { + return err + } + + if collect { + subjectsToCollect = append(subjectsToCollect, subject) + } + } + } + crb.Subjects = subjectsToCollect + o, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&crb) + if err != nil { + return err + } + object.SetUnstructuredContent(o) + + return nil +} + +func (r *ResourceCollector) prepareClusterRoleBindingForApply( + object runtime.Unstructured, + namespaceMappings map[string]string, +) error { + var crb rbacv1.ClusterRoleBinding + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &crb); err != nil { + return err + } + subjectsToApply := make([]rbacv1.Subject, 0) + for sourceNamespace, destNamespace := range namespaceMappings { + // Check if there is a subject for the namespace which is requested + for _, subject := range crb.Subjects { + collect, err := r.subjectInNamespace(&subject, sourceNamespace) + if err != nil { + return err + } + if !collect { + continue + } + + switch subject.Kind { + case rbacv1.UserKind: + _, username, err := serviceaccount.SplitUsername(subject.Name) + if err != nil { + return err + } + subject.Name = serviceaccount.MakeUsername(destNamespace, username) + case rbacv1.GroupKind: + subject.Name = serviceaccount.MakeNamespaceGroupName(destNamespace) + case rbacv1.ServiceAccountKind: + subject.Namespace = destNamespace + } + subjectsToApply = append(subjectsToApply, subject) + } + } + crb.Subjects = subjectsToApply + o, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&crb) + if err != nil { + return err + } + object.SetUnstructuredContent(o) + + return nil +} + +func (r *ResourceCollector) mergeAndUpdateClusterRoleBinding( + object runtime.Unstructured, +) error { + var newCRB rbacv1.ClusterRoleBinding + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &newCRB); err != nil { + return err + } + + currentCRB, err := k8s.Instance().GetClusterRoleBinding(newCRB.Name) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = k8s.Instance().CreateClusterRoleBinding(&newCRB) + } + return err + } + + // Map which will help eliminate duplicate subjects + updatedSubjects := make(map[string]rbacv1.Subject) + for _, subject := range currentCRB.Subjects { + updatedSubjects[subject.String()] = subject + } + for _, subject := range newCRB.Subjects { + updatedSubjects[subject.String()] = subject + } + currentCRB.Subjects = make([]rbacv1.Subject, 0) + for _, subject := range updatedSubjects { + currentCRB.Subjects = append(currentCRB.Subjects, subject) + } + + _, err = k8s.Instance().UpdateClusterRoleBinding(currentCRB) + return err +} diff --git a/pkg/resourcecollector/persistentvolume.go b/pkg/resourcecollector/persistentvolume.go new file mode 100644 index 0000000000..66042d1853 --- /dev/null +++ b/pkg/resourcecollector/persistentvolume.go @@ -0,0 +1,105 @@ +package resourcecollector + +import ( + "fmt" + + "github.com/heptio/ark/pkg/util/collections" + "github.com/portworx/sched-ops/k8s" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" +) + +func (r *ResourceCollector) pvToBeCollected( + labelSelectors map[string]string, + object runtime.Unstructured, + namespace string, +) (bool, error) { + phase, err := collections.GetString(object.UnstructuredContent(), "status.phase") + if err != nil { + return false, err + } + // Only collect Bound PVs + if phase != string(v1.ClaimBound) { + return false, nil + } + + // Collect only PVs which have a reference to a PVC in the namespace + // requested + pvcName, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.name") + if err != nil { + return false, err + } + if pvcName == "" { + return false, nil + } + + pvcNamespace, err := collections.GetString(object.UnstructuredContent(), "spec.claimRef.namespace") + if err != nil { + return false, err + } + if pvcNamespace != namespace { + return false, nil + } + + pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, pvcNamespace) + if err != nil { + return false, err + } + // Collect only if the PVC bound to the PV is owned by the driver + if !r.Driver.OwnsPVC(pvc) { + return false, nil + } + + // Also check the labels on the PVC since the PV doesn't inherit the + // labels + if len(pvc.Labels) == 0 && len(labelSelectors) > 0 { + return false, nil + } + + if !labels.AreLabelsInWhiteList(labels.Set(labelSelectors), + labels.Set(pvc.Labels)) { + return false, nil + } + return true, nil +} + +func (r *ResourceCollector) preparePVResourceForCollection( + object runtime.Unstructured, +) error { + spec, err := collections.GetMap(object.UnstructuredContent(), "spec") + if err != nil { + return err + } + + // Delete the claimRef so that the collected resource can be rebound + delete(spec, "claimRef") + + // Storage class needs to be removed so that it can rebind + // to an + // existing PV + delete(spec, "storageClassName") + + return nil +} + +func (r *ResourceCollector) preparePVResourceForApply( + object runtime.Unstructured, + pvNameMappings map[string]string, +) error { + var updatedName string + var present bool + + metadata, err := meta.Accessor(object) + if err != nil { + return err + } + + if updatedName, present = pvNameMappings[metadata.GetName()]; !present { + return fmt.Errorf("PV name mapping not found for %v", metadata.GetName()) + } + metadata.SetName(updatedName) + _, err = r.Driver.UpdateMigratedPersistentVolumeSpec(object) + return err +} diff --git a/pkg/resourcecollector/persistentvolumeclaim.go b/pkg/resourcecollector/persistentvolumeclaim.go new file mode 100644 index 0000000000..dea5cdb9a4 --- /dev/null +++ b/pkg/resourcecollector/persistentvolumeclaim.go @@ -0,0 +1,66 @@ +package resourcecollector + +import ( + "fmt" + + "github.com/portworx/sched-ops/k8s" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +func (r *ResourceCollector) pvcToBeCollected( + object runtime.Unstructured, + namespace string, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + + pvcName := metadata.GetName() + pvc, err := k8s.Instance().GetPersistentVolumeClaim(pvcName, namespace) + if err != nil { + return false, err + } + // Only collect Bound PVCs + if pvc.Status.Phase != v1.ClaimBound { + return false, nil + } + + // Don't collect PVCs not owned by the + // driver + if !r.Driver.OwnsPVC(pvc) { + return false, nil + } + return true, nil +} + +func (r *ResourceCollector) preparePVCResourceForApply( + object runtime.Unstructured, + pvNameMappings map[string]string, +) error { + var pvc v1.PersistentVolumeClaim + var updatedName string + var present bool + + metadata, err := meta.Accessor(object) + if err != nil { + return err + } + + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &pvc); err != nil { + return fmt.Errorf("error converting PVC object: %v: %v", object, err) + } + + if updatedName, present = pvNameMappings[pvc.Spec.VolumeName]; !present { + return fmt.Errorf("PV name mapping not found for %v", metadata.GetName()) + } + pvc.Spec.VolumeName = updatedName + o, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pvc) + if err != nil { + return err + } + object.SetUnstructuredContent(o) + return nil +} diff --git a/pkg/resourcecollector/resourcecollector.go b/pkg/resourcecollector/resourcecollector.go new file mode 100644 index 0000000000..96023dc0b5 --- /dev/null +++ b/pkg/resourcecollector/resourcecollector.go @@ -0,0 +1,383 @@ +package resourcecollector + +import ( + "fmt" + "strconv" + "strings" + + "github.com/heptio/ark/pkg/discovery" + "github.com/heptio/ark/pkg/util/collections" + "github.com/libopenstorage/stork/drivers/volume" + "github.com/sirupsen/logrus" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" +) + +const ( + // Annotation to use when the resource shouldn't be collected + skipResourceAnnotation = "stork.libopenstorage.ord/skipresource" +) + +// ResourceCollector is used to collect and process unstructured objects in namespaces and using label selectors +type ResourceCollector struct { + Driver volume.Driver + discoveryHelper discovery.Helper + dynamicInterface dynamic.Interface +} + +// Init initializes the resource collector +func (r *ResourceCollector) Init() error { + config, err := rest.InClusterConfig() + if err != nil { + return fmt.Errorf("error getting cluster config: %v", err) + } + + aeclient, err := apiextensionsclient.NewForConfig(config) + if err != nil { + return fmt.Errorf("error getting apiextension client, %v", err) + } + + discoveryClient := aeclient.Discovery() + r.discoveryHelper, err = discovery.NewHelper(discoveryClient, logrus.New()) + if err != nil { + return err + } + err = r.discoveryHelper.Refresh() + if err != nil { + return err + } + r.dynamicInterface, err = dynamic.NewForConfig(config) + if err != nil { + return err + } + return nil +} + +func resourceToBeCollected(resource metav1.APIResource) bool { + // Deployment is present in "apps" and "extensions" group, so ignore + // "extensions" + if resource.Group == "extensions" && resource.Kind == "Deployment" { + return false + } + + switch resource.Kind { + case "PersistentVolumeClaim", + "PersistentVolume", + "Deployment", + "StatefulSet", + "ConfigMap", + "Service", + "Secret", + "DaemonSet", + "ServiceAccount", + "ClusterRole", + "ClusterRoleBinding": + return true + default: + return false + } +} + +// GetResources gets all the resources in the given list of namespaces which match the labelSelectors +func (r *ResourceCollector) GetResources(namespaces []string, labelSelectors map[string]string) ([]runtime.Unstructured, error) { + err := r.discoveryHelper.Refresh() + if err != nil { + return nil, err + } + allObjects := make([]runtime.Unstructured, 0) + + for _, group := range r.discoveryHelper.Resources() { + groupVersion, err := schema.ParseGroupVersion(group.GroupVersion) + if err != nil { + return nil, err + } + if groupVersion.Group == "extensions" { + continue + } + + // Map to prevent collection of duplicate objects + resourceMap := make(map[types.UID]bool) + for _, resource := range group.APIResources { + if !resourceToBeCollected(resource) { + continue + } + + for _, ns := range namespaces { + var dynamicClient dynamic.ResourceInterface + if !resource.Namespaced { + dynamicClient = r.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)) + } else { + dynamicClient = r.dynamicInterface.Resource(groupVersion.WithResource(resource.Name)).Namespace(ns) + } + + var selectors string + // PVs don't get the labels from their PVCs, so don't use the label selector + // Also skip for some other resources that aren't necessarily tied to an application + switch resource.Kind { + case "PersistentVolume", + "ClusterRoleBinding", + "ClusterRole", + "ServiceAccount": + default: + selectors = labels.Set(labelSelectors).String() + } + objectsList, err := dynamicClient.List(metav1.ListOptions{ + LabelSelector: selectors, + }) + if err != nil { + return nil, err + } + objects, err := meta.ExtractList(objectsList) + if err != nil { + return nil, err + } + for _, o := range objects { + runtimeObject, ok := o.(runtime.Unstructured) + if !ok { + return nil, fmt.Errorf("error casting object: %v", o) + } + + collect, err := r.objectToBeCollected(labelSelectors, resourceMap, runtimeObject, ns) + if err != nil { + return nil, fmt.Errorf("error processing object %v: %v", runtimeObject, err) + } + if !collect { + continue + } + metadata, err := meta.Accessor(runtimeObject) + if err != nil { + return nil, err + } + allObjects = append(allObjects, runtimeObject) + resourceMap[metadata.GetUID()] = true + } + } + } + } + + err = r.prepareResourcesForCollection(allObjects, namespaces) + if err != nil { + return nil, err + } + return allObjects, nil +} + +// Returns whether an object should be collected or not for the requested +// namespace +func (r *ResourceCollector) objectToBeCollected( + labelSelectors map[string]string, + resourceMap map[types.UID]bool, + object runtime.Unstructured, + namespace string, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + + if value, present := metadata.GetAnnotations()[skipResourceAnnotation]; present { + if skip, err := strconv.ParseBool(value); err == nil && skip { + return false, err + } + } + + // Skip if we've already processed this object + if _, ok := resourceMap[metadata.GetUID()]; ok { + return false, nil + } + + objectType, err := meta.TypeAccessor(object) + if err != nil { + return false, err + } + + switch objectType.GetKind() { + case "Service": + return r.serviceToBeCollected(object) + case "PersistentVolumeClaim": + return r.pvcToBeCollected(object, namespace) + case "PersistentVolume": + return r.pvToBeCollected(labelSelectors, object, namespace) + case "ClusterRoleBinding": + return r.clusterRoleBindingToBeCollected(labelSelectors, object, namespace) + case "ClusterRole": + return r.clusterRoleToBeCollected(labelSelectors, object, namespace) + case "ServiceAccount": + return r.serviceAccountToBeCollected(object) + case "Secret": + return r.secretToBeCollected(object) + } + + return true, nil +} + +func (r *ResourceCollector) prepareResourcesForCollection( + objects []runtime.Unstructured, + namespaces []string, +) error { + for _, o := range objects { + metadata, err := meta.Accessor(o) + if err != nil { + return err + } + + switch o.GetObjectKind().GroupVersionKind().Kind { + case "PersistentVolume": + err := r.preparePVResourceForCollection(o) + if err != nil { + return fmt.Errorf("error preparing PV resource %v: %v", metadata.GetName(), err) + } + case "Service": + err := r.prepareServiceResourceForCollection(o) + if err != nil { + return fmt.Errorf("error preparing Service resource %v/%v: %v", metadata.GetNamespace(), metadata.GetName(), err) + } + case "ClusterRoleBinding": + err := r.prepareClusterRoleBindingForCollection(o, namespaces) + if err != nil { + return fmt.Errorf("error preparing ClusterRoleBindings resource %v: %v", metadata.GetName(), err) + } + } + + content := o.UnstructuredContent() + // Status shouldn't be retained when collecting resources + delete(content, "status") + metadataMap, err := collections.GetMap(content, "metadata") + if err != nil { + return fmt.Errorf("error getting metadata for resource %v: %v", metadata.GetName(), err) + } + // Remove all metadata except some well-known ones + for key := range metadataMap { + switch key { + case "name", "namespace", "labels", "annotations": + default: + delete(metadataMap, key) + } + } + } + return nil +} + +func (r *ResourceCollector) prepareResourceForApply( + object runtime.Unstructured, + namespaceMappings map[string]string, + pvNameMappings map[string]string, +) error { + objectType, err := meta.TypeAccessor(object) + if err != nil { + return err + } + + metadata, err := meta.Accessor(object) + if err != nil { + return err + } + if metadata.GetNamespace() != "" { + // Update the namepsace of the object, will be no-op for clustered resources + metadata.SetNamespace(namespaceMappings[metadata.GetNamespace()]) + } + + switch objectType.GetKind() { + case "PersistentVolume": + return r.preparePVResourceForApply(object, pvNameMappings) + case "PersistentVolumeClaim": + return r.preparePVCResourceForApply(object, pvNameMappings) + case "ClusterRoleBinding": + err := r.prepareClusterRoleBindingForApply(object, namespaceMappings) + if err != nil { + return err + } + + } + return nil +} + +func (r *ResourceCollector) mergeSupportedForResource( + resourceName string, +) bool { + switch resourceName { + case "ClusterRoleBindings": + return true + } + return false +} + +func (r *ResourceCollector) mergeAndUpdateResource( + object runtime.Unstructured, +) error { + objectType, err := meta.TypeAccessor(object) + if err != nil { + return err + } + + switch objectType.GetKind() { + case "ClusterRoleBinding": + return r.mergeAndUpdateClusterRoleBinding(object) + } + return nil +} + +// ApplyResource applies a given resource using the provided client interface +func (r *ResourceCollector) ApplyResource( + dynamicInterface dynamic.Interface, + object *unstructured.Unstructured, + pvNameMappings map[string]string, + namespaceMappings map[string]string, + deleteIfPresent bool, +) error { + metadata, err := meta.Accessor(object) + if err != nil { + return err + } + objectType, err := meta.TypeAccessor(object) + if err != nil { + return err + } + resource := &metav1.APIResource{ + Name: strings.ToLower(objectType.GetKind()) + "s", + Namespaced: len(metadata.GetNamespace()) > 0, + } + + destNamespace := "" + if resource.Namespaced { + destNamespace = namespaceMappings[metadata.GetNamespace()] + } + dynamicClient := dynamicInterface.Resource( + object.GetObjectKind().GroupVersionKind().GroupVersion().WithResource(resource.Name)).Namespace(destNamespace) + + err = r.prepareResourceForApply(object, namespaceMappings, pvNameMappings) + if err != nil { + return err + } + + _, err = dynamicClient.Create(object) + if err != nil && (apierrors.IsAlreadyExists(err) || strings.Contains(err.Error(), portallocator.ErrAllocated.Error())) { + if r.mergeSupportedForResource(resource.Name) { + err := r.mergeAndUpdateResource(object) + if err != nil { + return err + } + } else if deleteIfPresent { + // Delete the resource if it already exists on the destination + // cluster and try creating again + err = dynamicClient.Delete(metadata.GetName(), &metav1.DeleteOptions{}) + if err == nil { + _, err = dynamicClient.Create(object) + } else { + return err + } + } + } + + return err +} diff --git a/pkg/resourcecollector/secret.go b/pkg/resourcecollector/secret.go new file mode 100644 index 0000000000..40f949ba46 --- /dev/null +++ b/pkg/resourcecollector/secret.go @@ -0,0 +1,24 @@ +package resourcecollector + +import ( + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func (r *ResourceCollector) secretToBeCollected( + object runtime.Unstructured, +) (bool, error) { + var secret v1.Secret + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(object.UnstructuredContent(), &secret); err != nil { + logrus.Errorf("Error converting Secret object %v: %v", object, err) + return false, err + } + if secret.Type == v1.SecretTypeServiceAccountToken { + if accountName, ok := secret.Annotations[v1.ServiceAccountNameKey]; ok && accountName == "default" { + return false, nil + } + } + return true, nil + +} diff --git a/pkg/resourcecollector/service.go b/pkg/resourcecollector/service.go new file mode 100644 index 0000000000..383922f566 --- /dev/null +++ b/pkg/resourcecollector/service.go @@ -0,0 +1,37 @@ +package resourcecollector + +import ( + "github.com/heptio/ark/pkg/util/collections" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +func (r *ResourceCollector) serviceToBeCollected( + object runtime.Unstructured, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + + // Don't migrate the kubernetes service + if metadata.GetName() == "kubernetes" { + return false, nil + } + return true, nil +} + +func (r *ResourceCollector) prepareServiceResourceForCollection( + object runtime.Unstructured, +) error { + spec, err := collections.GetMap(object.UnstructuredContent(), "spec") + if err != nil { + return err + } + // Don't delete clusterIP for headless services + if ip, err := collections.GetString(spec, "clusterIP"); err == nil && ip != "None" { + delete(spec, "clusterIP") + } + + return nil +} diff --git a/pkg/resourcecollector/serviceaccount.go b/pkg/resourcecollector/serviceaccount.go new file mode 100644 index 0000000000..c9507daf26 --- /dev/null +++ b/pkg/resourcecollector/serviceaccount.go @@ -0,0 +1,19 @@ +package resourcecollector + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +func (r *ResourceCollector) serviceAccountToBeCollected( + object runtime.Unstructured, +) (bool, error) { + metadata, err := meta.Accessor(object) + if err != nil { + return false, err + } + + // Don't migrate the default service account + name := metadata.GetName() + return name != "default", nil +}