From 3a1e6e0db61eed457bee347856838c5528a5aef4 Mon Sep 17 00:00:00 2001 From: morvencao Date: Tue, 11 Jun 2024 11:20:36 +0000 Subject: [PATCH] add e2e testing. Signed-off-by: morvencao --- test/e2e/pkg/grpc_test.go | 547 +++++++++++++++++++++++++++++ test/e2e/pkg/resources_test.go | 65 ++-- test/e2e/pkg/spec_resync_test.go | 375 ++++++++++++++++++++ test/e2e/pkg/status_resync_test.go | 165 +++++++++ test/e2e/pkg/suite_test.go | 15 + test/e2e/setup/e2e_setup.sh | 2 + test/factories.go | 123 +++++++ 7 files changed, 1268 insertions(+), 24 deletions(-) create mode 100644 test/e2e/pkg/grpc_test.go create mode 100644 test/e2e/pkg/spec_resync_test.go create mode 100644 test/e2e/pkg/status_resync_test.go diff --git a/test/e2e/pkg/grpc_test.go b/test/e2e/pkg/grpc_test.go new file mode 100644 index 00000000..a5a7bb57 --- /dev/null +++ b/test/e2e/pkg/grpc_test.go @@ -0,0 +1,547 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + cetypes "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" + grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" +) + +var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { + + Context("GRPC Manifest Tests", func() { + + source := "grpc-e2e" + resourceID := uuid.NewString() + resourceStatus := &api.ResourceStatus{ + ReconcileStatus: &api.ReconcileStatus{}, + } + + It("subscribe to resource status with grpc client", func() { + + go func() { + subClient, err := grpcClient.Subscribe(context.Background(), &pbv1.SubscriptionRequest{Source: source}) + if err != nil { + return + } + + for { + pvEvt, err := subClient.Recv() + if err == io.EOF { + return + } + if err != nil { + return + } + evt, err := binding.ToEvent(context.Background(), grpcprotocol.NewMessage(pvEvt)) + if err != nil { + continue + } + + evtExtensions := evt.Context.GetExtensions() + resID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID]) + if err != nil { + continue + } + + if resID != resourceID { + continue + } + + resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion]) + if err != nil { + continue + } + resourceStatus.ReconcileStatus.ObservedVersion = resourceVersion + + manifestStatus := &payload.ManifestStatus{} + if err := evt.DataAs(manifestStatus); err != nil { + continue + } + + if manifestStatus.Status != nil { + resourceStatus.ReconcileStatus.Conditions = manifestStatus.Status.Conditions + if meta.IsStatusConditionTrue(manifestStatus.Conditions, common.ManifestsDeleted) { + deletedCondition := meta.FindStatusCondition(manifestStatus.Conditions, common.ManifestsDeleted) + resourceStatus.ReconcileStatus.Conditions = append(resourceStatus.ReconcileStatus.Conditions, *deletedCondition) + } + for _, value := range manifestStatus.Status.StatusFeedbacks.Values { + if value.Name == "status" { + contentStatus := make(map[string]interface{}) + if err := json.Unmarshal([]byte(*value.Value.JsonRaw), &contentStatus); err != nil { + continue + } + resourceStatus.ContentStatus = contentStatus + } + } + } + } + }() + }) + + It("publish a resource spec using grpc client", func() { + + evt, err := helper.ManifestToEvent(1, source, "create_request", consumer_name, resourceID, 1, false) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource status using grpc client", func() { + + Eventually(func() error { + if resourceStatus.ReconcileStatus == nil { + return fmt.Errorf("reconcile status is empty") + } + + if !meta.IsStatusConditionTrue(resourceStatus.ReconcileStatus.Conditions, "Applied") { + return fmt.Errorf("resource not applied") + } + + if !meta.IsStatusConditionTrue(resourceStatus.ReconcileStatus.Conditions, "Available") { + return fmt.Errorf("resource not Available") + } + + availableReplicas, ok := resourceStatus.ContentStatus["availableReplicas"] + if !ok { + return fmt.Errorf("available replicas not found in content status") + } + + if availableReplicas.(float64) != float64(1) { + return fmt.Errorf("unexpected available replicas, expected 1, got %d", availableReplicas) + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource with the maestro api", func() { + + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), resourceID).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(resourceID)) + Expect(*gotResource.Version).To(Equal(int32(1))) + }) + + It("publish a resource spec with update request using grpc client", func() { + + evt, err := helper.ManifestToEvent(2, source, "update_request", consumer_name, resourceID, 1, false) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource status using grpc client", func() { + + Eventually(func() error { + if resourceStatus.ReconcileStatus == nil { + return fmt.Errorf("reconcile status is empty") + } + + if !meta.IsStatusConditionTrue(resourceStatus.ReconcileStatus.Conditions, "Applied") { + return fmt.Errorf("resource not applied") + } + + if !meta.IsStatusConditionTrue(resourceStatus.ReconcileStatus.Conditions, "Available") { + return fmt.Errorf("resource not Available") + } + + availableReplicas, ok := resourceStatus.ContentStatus["availableReplicas"] + if !ok { + return fmt.Errorf("available replicas not found in content status") + } + + if availableReplicas.(float64) != float64(2) { + return fmt.Errorf("unexpected available replicas, expected 2, got %d", availableReplicas) + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 2 { + return fmt.Errorf("unexpected replicas, expected 2, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource with the maestro api", func() { + + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), resourceID).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(resourceID)) + Expect(*gotResource.Version).To(Equal(int32(2))) + }) + + It("publish a resource spec with delete request using grpc client", func() { + + evt, err := helper.ManifestToEvent(2, source, "delete_request", consumer_name, resourceID, 1, true) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource status using grpc client", func() { + + Eventually(func() error { + if resourceStatus.ReconcileStatus == nil { + return fmt.Errorf("reconcile status is empty") + } + + if !meta.IsStatusConditionTrue(resourceStatus.ReconcileStatus.Conditions, "Deleted") { + return fmt.Errorf("resource not deleted") + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource with the maestro api", func() { + + _, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), resourceID).Execute() + Expect(err).To(HaveOccurred(), "Expected 404") + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + }) + + }) + + Context("GRPC Manifest Bundle Tests", func() { + + source := "grpc-e2e" + resourceID := uuid.NewString() + resourceBundleStatus := &api.ResourceBundleStatus{ + ManifestBundleStatus: &payload.ManifestBundleStatus{}, + } + + It("subscribe to resource bundle status with grpc client", func() { + + go func() { + subClient, err := grpcClient.Subscribe(context.Background(), &pbv1.SubscriptionRequest{Source: source}) + if err != nil { + return + } + + for { + pvEvt, err := subClient.Recv() + if err == io.EOF { + return + } + if err != nil { + return + } + evt, err := binding.ToEvent(context.Background(), grpcprotocol.NewMessage(pvEvt)) + if err != nil { + continue + } + + evtExtensions := evt.Context.GetExtensions() + resID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID]) + if err != nil { + continue + } + + if resID != resourceID { + continue + } + + resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion]) + if err != nil { + continue + } + resourceBundleStatus.ObservedVersion = resourceVersion + + if err := evt.DataAs(resourceBundleStatus.ManifestBundleStatus); err != nil { + continue + } + } + }() + }) + + It("publish a resource bundle spec using grpc client", func() { + + evt, err := helper.ManifestsToBundleEvent(1, source, "create_request", consumer_name, resourceID, 1, false) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource bundle status using grpc client", func() { + + Eventually(func() error { + if resourceBundleStatus.ManifestBundleStatus == nil { + return fmt.Errorf("resource bundle status is empty") + } + + if !meta.IsStatusConditionTrue(resourceBundleStatus.ManifestBundleStatus.Conditions, "Applied") { + return fmt.Errorf("resource bundle not applied") + } + + if !meta.IsStatusConditionTrue(resourceBundleStatus.ManifestBundleStatus.Conditions, "Available") { + return fmt.Errorf("resource bundle not Available") + } + + if len(resourceBundleStatus.ManifestBundleStatus.ResourceStatus) != 1 { + return fmt.Errorf("unexpected number of resource status, expected 1, got %d", len(resourceBundleStatus.ManifestBundleStatus.ResourceStatus)) + } + + resourceStatus := resourceBundleStatus.ManifestBundleStatus.ResourceStatus[0] + if len(resourceStatus.StatusFeedbacks.Values) != 1 { + return fmt.Errorf("unexpected number of status feedbacks, expected 1, got %d", len(resourceStatus.StatusFeedbacks.Values)) + } + + value := resourceStatus.StatusFeedbacks.Values[0] + contentStatus := make(map[string]interface{}) + if err := json.Unmarshal([]byte(*value.Value.JsonRaw), &contentStatus); err != nil { + return fmt.Errorf("failed to convert status feedback value to content status: %v", err) + } + + availableReplicas, ok := contentStatus["availableReplicas"] + if !ok { + return fmt.Errorf("available replicas not found in content status") + } + + if availableReplicas.(float64) != float64(1) { + return fmt.Errorf("unexpected available replicas, expected 1, got %d", availableReplicas) + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource bundle with the maestro api", func() { + + gotResourceBundle, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), resourceID).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResourceBundle.Id).To(Equal(resourceID)) + Expect(*gotResourceBundle.Version).To(Equal(int32(1))) + }) + + It("publish a resource bundle spec with update request using grpc client", func() { + + evt, err := helper.ManifestsToBundleEvent(2, source, "update_request", consumer_name, resourceID, 1, false) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource bundle status using grpc client", func() { + + Eventually(func() error { + if resourceBundleStatus.ManifestBundleStatus == nil { + return fmt.Errorf("resource bundle status is empty") + } + + if !meta.IsStatusConditionTrue(resourceBundleStatus.ManifestBundleStatus.Conditions, "Applied") { + return fmt.Errorf("resource bundle not applied") + } + + if !meta.IsStatusConditionTrue(resourceBundleStatus.ManifestBundleStatus.Conditions, "Available") { + return fmt.Errorf("resource bundle not Available") + } + + if len(resourceBundleStatus.ManifestBundleStatus.ResourceStatus) != 1 { + return fmt.Errorf("unexpected number of resource status, expected 1, got %d", len(resourceBundleStatus.ManifestBundleStatus.ResourceStatus)) + } + + resourceStatus := resourceBundleStatus.ManifestBundleStatus.ResourceStatus[0] + if len(resourceStatus.StatusFeedbacks.Values) != 1 { + return fmt.Errorf("unexpected number of status feedbacks, expected 1, got %d", len(resourceStatus.StatusFeedbacks.Values)) + } + + value := resourceStatus.StatusFeedbacks.Values[0] + contentStatus := make(map[string]interface{}) + if err := json.Unmarshal([]byte(*value.Value.JsonRaw), &contentStatus); err != nil { + return fmt.Errorf("failed to convert status feedback value to content status: %v", err) + } + + availableReplicas, ok := contentStatus["availableReplicas"] + if !ok { + return fmt.Errorf("available replicas not found in content status") + } + + if availableReplicas.(float64) != float64(2) { + return fmt.Errorf("unexpected available replicas, expected 2, got %d", availableReplicas) + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 2 { + return fmt.Errorf("unexpected replicas, expected 2, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource bundle with the maestro api", func() { + + gotResourceBundle, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), resourceID).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResourceBundle.Id).To(Equal(resourceID)) + Expect(*gotResourceBundle.Version).To(Equal(int32(2))) + }) + + It("publish a resource bundle spec with delete request using grpc client", func() { + + evt, err := helper.ManifestsToBundleEvent(2, source, "delete_request", consumer_name, resourceID, 1, true) + Expect(err).ShouldNot(HaveOccurred()) + + pbEvt := &pbv1.CloudEvent{} + if err = grpcprotocol.WritePBMessage(context.Background(), binding.ToMessage(evt), pbEvt); err != nil { + log.Fatalf("failed to convert spec from cloudevent to protobuf: %v", err) + } + + _, err = grpcClient.Publish(context.Background(), &pbv1.PublishRequest{Event: pbEvt}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Subscribe to the resource bundle status using grpc client", func() { + + Eventually(func() error { + if resourceBundleStatus.ManifestBundleStatus == nil { + return fmt.Errorf("resource bundle status is empty") + } + + if !meta.IsStatusConditionTrue(resourceBundleStatus.ManifestBundleStatus.Conditions, "Deleted") { + return fmt.Errorf("resource bundle not applied") + } + + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the nginx deployment from cluster", func() { + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("get the resource with the maestro api", func() { + + _, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), resourceID).Execute() + Expect(err).To(HaveOccurred(), "Expected 404") + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + }) + + }) + +}) diff --git a/test/e2e/pkg/resources_test.go b/test/e2e/pkg/resources_test.go index 2f93dead..d302571c 100644 --- a/test/e2e/pkg/resources_test.go +++ b/test/e2e/pkg/resources_test.go @@ -9,12 +9,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/api/openapi" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/api/openapi" ) var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { @@ -31,17 +30,29 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusCreated)) Expect(*resource.Id).ShouldNot(BeEmpty()) + Expect(*resource.Version).To(Equal(int32(1))) Eventually(func() error { - _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) if err != nil { return err } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } return nil }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) - It("patch the nginx resource", func() { + It("get the nginx resource from the maestro api", func() { + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(*resource.Id)) + Expect(*gotResource.Version).To(Equal(*resource.Version)) + }) + + It("patch the nginx resource with the maestro api", func() { newRes := helper.NewAPIResource(consumer_name, 2) patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(context.Background(), *resource.Id). @@ -55,10 +66,10 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { if err != nil { return err } - if *deploy.Spec.Replicas == 2 { - return nil + if *deploy.Spec.Replicas != 2 { + return fmt.Errorf("unexpected replicas, expected 2, got %d", *deploy.Spec.Replicas) } - return fmt.Errorf("replicas is not 2") + return nil }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) @@ -93,10 +104,13 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*resource.Id).ShouldNot(BeEmpty()) Eventually(func() error { - _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) if err != nil { return err } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } return nil }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) @@ -107,16 +121,16 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + // ensure the "nginx" deployment in the "default" namespace is not deleted Consistently(func() error { - // Attempt to retrieve the "nginx" deployment in the "default" namespace _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) - // If an error occurs if err != nil { - // Return any other errors directly - return err + if errors.IsNotFound(err) { + return fmt.Errorf("nginx deployment is deleted") + } } return nil - }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + }, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred()) }) It("delete the nginx deployment", func() { @@ -150,10 +164,13 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*resource.Id).ShouldNot(BeEmpty()) Eventually(func() error { - _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) if err != nil { return err } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } return nil }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) @@ -170,13 +187,13 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Consistently(func() error { deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) if err != nil { - return err - } - if *deploy.Spec.Replicas == 1 { return nil } - return fmt.Errorf("replicas is not 1") - }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred()) }) It("delete the nginx resource", func() { @@ -202,8 +219,9 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { It("create a sample deployment in the target cluster", func() { nginxDeploy := &appsv1.Deployment{} - json.Unmarshal(helper.GetTestNginxJSON(1), nginxDeploy) - _, err := kubeClient.AppsV1().Deployments("default").Create(context.Background(), nginxDeploy, metav1.CreateOptions{}) + err := json.Unmarshal(helper.GetTestNginxJSON(1), nginxDeploy) + Expect(err).ShouldNot(HaveOccurred()) + _, err = kubeClient.AppsV1().Deployments("default").Create(context.Background(), nginxDeploy, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) @@ -241,7 +259,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { return nil } } - return fmt.Errorf("contentStatus should be empty") + return fmt.Errorf("contentStatus should not be empty") }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) @@ -265,5 +283,4 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) }) - }) diff --git a/test/e2e/pkg/spec_resync_test.go b/test/e2e/pkg/spec_resync_test.go new file mode 100644 index 00000000..7086796f --- /dev/null +++ b/test/e2e/pkg/spec_resync_test.go @@ -0,0 +1,375 @@ +package e2e_test + +import ( + "context" + "fmt" + "net/http" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api/openapi" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("Spec resync", Ordered, Label("e2e-tests-spec-resync"), func() { + + var resource *openapi.Resource + + Context("Resource resync created resource spec", func() { + + It("shut down maestro agent", func() { + + // patch marstro agent replicas to 0 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running maestro agent pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro-agent pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("post the nginx resource to the maestro api", func() { + + res := helper.NewAPIResource(consumer_name, 1) + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + }) + + It("ensure the resource is not created", func() { + + // ensure the "nginx" deployment in the "default" namespace is not created + Consistently(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err == nil { + return fmt.Errorf("nginx deployment is created") + } + return nil + }, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred()) + }) + + It("start maestro agent", func() { + + // patch marstro agent replicas to 1 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":1}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(1))) + + // ensure maestro agent pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) != 1 { + return fmt.Errorf("unexpected maestro-agent pod count, expected 1, got %d", len(pods.Items)) + } + if pods.Items[0].Status.Phase != "Running" { + return fmt.Errorf("maestro-agent pod not in running state") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource is created", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + }) + + Context("Resource resync updated resource spec", func() { + + It("post the nginx resource to the maestro api", func() { + + res := helper.NewAPIResource(consumer_name, 1) + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("shut down maestro agent", func() { + + // patch marstro agent replicas to 0 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running maestro agent pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro-agent pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("patch the nginx resource", func() { + + newRes := helper.NewAPIResource(consumer_name, 2) + patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(context.Background(), *resource.Id). + ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resource.Version, Manifest: newRes.Manifest}).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*patchedResource.Version).To(Equal(*resource.Version + 1)) + + }) + + It("ensure the resource is not updated", func() { + + // ensure the "nginx" deployment in the "default" namespace is not updated + Consistently(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return nil + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred()) + }) + + It("start maestro agent", func() { + + // patch marstro agent replicas to 1 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":1}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(1))) + + // ensure maestro agent pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) != 1 { + return fmt.Errorf("unexpected maestro-agent pod count, expected 1, got %d", len(pods.Items)) + } + if pods.Items[0].Status.Phase != "Running" { + return fmt.Errorf("maestro-agent pod not in running state") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource is updated", func() { + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 2 { + return fmt.Errorf("unexpected replicas, expected 2, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + }) + + Context("Resource resync deleted resource spec", func() { + + It("post the nginx resource to the maestro api", func() { + + res := helper.NewAPIResource(consumer_name, 1) + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("shut down maestro agent", func() { + + // patch marstro agent replicas to 0 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running maestro agent pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro-agent pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + }) + + It("ensure the resource is not deleted", func() { + + // ensure the "nginx" deployment in the "default" namespace is not deleted + Consistently(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("nginx deployment is deleted") + } + } + return nil + }, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred()) + }) + + It("start maestro agent", func() { + + // patch marstro agent replicas to 1 + deploy, err := kubeClient.AppsV1().Deployments("maestro-agent").Patch(context.Background(), "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":1}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(1))) + + // ensure maestro agent pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro-agent", + }) + if err != nil { + return err + } + if len(pods.Items) != 1 { + return fmt.Errorf("unexpected maestro-agent pod count, expected 1, got %d", len(pods.Items)) + } + if pods.Items[0].Status.Phase != "Running" { + return fmt.Errorf("maestro-agent pod not in running state") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource is deleted", func() { + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + }) +}) diff --git a/test/e2e/pkg/status_resync_test.go b/test/e2e/pkg/status_resync_test.go new file mode 100644 index 00000000..efb8d222 --- /dev/null +++ b/test/e2e/pkg/status_resync_test.go @@ -0,0 +1,165 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api/openapi" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), func() { + + var resource *openapi.Resource + + Context("Resource resync resource status", func() { + + It("post the nginx resource to the maestro api", func() { + + res := helper.NewAPIResource(consumer_name, 1) + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(*resource.Id)) + Expect(*gotResource.Version).To(Equal(*resource.Version)) + + statusJSON, err := json.Marshal(gotResource.Status) + Expect(err).ShouldNot(HaveOccurred()) + Expect(strings.Contains(string(statusJSON), "testKubeClient")).To(BeFalse()) + }) + + It("shut down maestro server", func() { + + // patch marstro server replicas to 0 + deploy, err := kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running maestro server pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro server pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("patch the resource in the cluster", func() { + + deploy, err := kubeClient.AppsV1().Deployments("default").Patch(context.Background(), "nginx", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + }) + + It("start maestro server", func() { + + // patch marstro server replicas to 1 + deploy, err := kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":1}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(1))) + + // ensure maestro server pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=maestro", + }) + if err != nil { + return err + } + if len(pods.Items) == 0 { + return fmt.Errorf("unable to find maestro server pod") + } + if pods.Items[0].Status.Phase != "Running" { + return fmt.Errorf("maestro server pod not in running state") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource status is resynced", func() { + Eventually(func() error { + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code, expected 200, got %d", resp.StatusCode) + } + if *gotResource.Id != *resource.Id { + return fmt.Errorf("unexpected resource id, expected %s, got %s", *resource.Id, *gotResource.Id) + } + if *gotResource.Version != *resource.Version { + return fmt.Errorf("unexpected resource version, expected %d, got %d", *resource.Version, *gotResource.Version) + } + + statusJSON, err := json.Marshal(gotResource.Status) + if err != nil { + return err + } + // TODO: add a better check if the status is resynced + if !strings.Contains(string(statusJSON), "testKubeClient") { + return fmt.Errorf("unexpected status, expected testKubeClient, got %s", string(statusJSON)) + } + return nil + }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + }) + +}) diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index 9978e543..42d03684 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "flag" "fmt" + "log" "net/http" "testing" "time" @@ -14,6 +15,10 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" + "github.com/openshift-online/maestro/pkg/api/openapi" "github.com/openshift-online/maestro/test" ) @@ -25,6 +30,8 @@ var ( consumer_name string kubeClient *kubernetes.Clientset apiClient *openapi.APIClient + grpcConn *grpc.ClientConn + grpcClient pbv1.CloudEventServiceClient helper *test.Helper T *testing.T ) @@ -73,6 +80,13 @@ var _ = BeforeSuite(func() { } apiClient = openapi.NewAPIClient(cfg) + var err error + grpcConn, err = grpc.Dial(grpcServerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("fail to dial grpc server: %v", err) + } + grpcClient = pbv1.NewCloudEventServiceClient(grpcConn) + // validate the kubeconfig file restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { @@ -91,4 +105,5 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { // later... + grpcConn.Close() }) diff --git a/test/e2e/setup/e2e_setup.sh b/test/e2e/setup/e2e_setup.sh index 189ee557..2185ff24 100755 --- a/test/e2e/setup/e2e_setup.sh +++ b/test/e2e/setup/e2e_setup.sh @@ -72,6 +72,8 @@ make template \ # expose the maestro server via nodeport kubectl patch service maestro -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30080, "port": 8000, "targetPort": 8000}]}}' --type merge +# expose the maestro grpc server via nodeport +kubectl patch service maestro-grpc -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30090, "port": 8090, "targetPort": 8090}]}}' --type merge # expose the maestro grpc server via nodeport kubectl patch service maestro-grpc -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30090, "port": 8090, "targetPort": 8090}]}}' --type merge diff --git a/test/factories.go b/test/factories.go index 90265ecf..6ca2f65e 100755 --- a/test/factories.go +++ b/test/factories.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/openshift-online/maestro/pkg/api" @@ -249,3 +250,125 @@ func (helper *Helper) CreateConsumerList(count int) (consumers []*api.Consumer) } return consumers } + +// ManifestToEvent converts a manifest into a CloudEvent representation with manifest data. +func (helper *Helper) ManifestToEvent(replicas int, source, action, consumerName, resourceID string, + resourceVersion int64, deleting bool) (*cloudevents.Event, error) { + + testManifest := map[string]interface{}{} + if err := json.Unmarshal([]byte(fmt.Sprintf(testManifestJSON, replicas)), &testManifest); err != nil { + return nil, fmt.Errorf("error unmarshalling test manifest: %v", err) + } + + eventType := cetypes.CloudEventsType{ + CloudEventsDataType: workpayload.ManifestEventDataType, + SubResource: cetypes.SubResourceSpec, + Action: cetypes.EventAction(action), + } + evtBuilder := cetypes.NewEventBuilder(source, eventType). + WithClusterName(consumerName). + WithResourceID(resourceID). + WithResourceVersion(resourceVersion) + if deleting { + evtBuilder.WithDeletionTimestamp(time.Now()) + } + evt := evtBuilder.NewEvent() + + eventPayload := &workpayload.Manifest{ + Manifest: unstructured.Unstructured{Object: testManifest}, + DeleteOption: &workv1.DeleteOption{ + PropagationPolicy: workv1.DeletePropagationPolicyTypeForeground, + }, + ConfigOption: &workpayload.ManifestConfigOption{ + FeedbackRules: []workv1.FeedbackRule{ + { + Type: workv1.JSONPathsType, + JsonPaths: []workv1.JsonPath{ + { + Name: "status", + Path: ".status", + }, + }, + }, + }, + UpdateStrategy: &workv1.UpdateStrategy{ + Type: workv1.UpdateStrategyTypeServerSideApply, + }, + }, + } + + if err := evt.SetData(cloudevents.ApplicationJSON, eventPayload); err != nil { + return nil, fmt.Errorf("failed to set cloud event data: %v", err) + } + + return &evt, nil +} + +// ManifestsToBundleEvent converts a list of manifests into a CloudEvent representation with manifest bundle data. +func (helper *Helper) ManifestsToBundleEvent(replicas int, source, action, consumerName, resourceID string, + resourceVersion int64, deleting bool) (*cloudevents.Event, error) { + + testManifest := map[string]interface{}{} + if err := json.Unmarshal([]byte(fmt.Sprintf(testManifestJSON, replicas)), &testManifest); err != nil { + return nil, fmt.Errorf("error unmarshalling test manifest: %v", err) + } + + eventType := cetypes.CloudEventsType{ + CloudEventsDataType: workpayload.ManifestBundleEventDataType, + SubResource: cetypes.SubResourceSpec, + Action: cetypes.EventAction(action), + } + + // create a cloud event with the manifest as the data + evtBuilder := cetypes.NewEventBuilder(source, eventType). + WithClusterName(consumerName). + WithResourceID(resourceID). + WithResourceVersion(resourceVersion) + if deleting { + evtBuilder.WithDeletionTimestamp(time.Now()) + } + evt := evtBuilder.NewEvent() + + eventPayload := &workpayload.ManifestBundle{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Object: &unstructured.Unstructured{Object: testManifest}, + }, + }, + }, + DeleteOption: &workv1.DeleteOption{ + PropagationPolicy: workv1.DeletePropagationPolicyTypeForeground, + }, + ManifestConfigs: []workv1.ManifestConfigOption{ + { + FeedbackRules: []workv1.FeedbackRule{ + { + Type: workv1.JSONPathsType, + JsonPaths: []workv1.JsonPath{ + { + Name: "status", + Path: ".status", + }, + }, + }, + }, + UpdateStrategy: &workv1.UpdateStrategy{ + Type: workv1.UpdateStrategyTypeServerSideApply, + }, + ResourceIdentifier: workv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Name: "nginx", + Namespace: "default", + }, + }, + }, + } + + if err := evt.SetData(cloudevents.ApplicationJSON, eventPayload); err != nil { + return nil, fmt.Errorf("failed to set cloud event data: %v", err) + } + + return &evt, nil +}