diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 3d9cbae11..6e940820d 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -453,15 +453,6 @@ func (c *Cluster) syncStream(appId string) error { if stream.Spec.ApplicationId != appId { continue } - if streamExists { - c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId) - if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { - c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err) - } else { - c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId) - } - continue - } streamExists = true desiredStreams := c.generateFabricEventStream(appId) if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { @@ -484,6 +475,7 @@ func (c *Cluster) syncStream(appId string) error { c.Streams[appId] = updatedStream c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) } + break } if !streamExists { diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 92d28663e..77710aa19 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -90,7 +90,7 @@ var ( Namespace: namespace, Labels: map[string]string{ "application": "spilo", - "cluster-name": fmt.Sprintf("%s-2", clusterName), + "cluster-name": clusterName, "team": "acid", }, OwnerReferences: []metav1.OwnerReference{ @@ -494,14 +494,13 @@ func TestSyncStreams(t *testing.T) { OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - EnableOwnerReferences: util.True(), - PodRoleLabel: "spilo-role", + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", }, }, }, client, pg, logger, eventRecorder) @@ -514,33 +513,17 @@ func TestSyncStreams(t *testing.T) { err = cluster.syncStream(appId) assert.NoError(t, err) - // create a second stream with same spec but with different name - createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( - context.TODO(), fes, metav1.CreateOptions{}) + // sync the stream again + err = cluster.syncStream(appId) assert.NoError(t, err) - assert.Equal(t, createdStream.Spec.ApplicationId, appId) - // check that two streams exist + // check that only one stream remains after sync listOptions := metav1.ListOptions{ LabelSelector: cluster.labelsSet(true).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items)) - - // sync the stream which should remove the redundant stream - err = cluster.syncStream(appId) - assert.NoError(t, err) - - // check that only one stream remains after sync - streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) - - // check owner references - if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { - t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) - } } func TestSameStreams(t *testing.T) { @@ -663,13 +646,14 @@ func TestUpdateStreams(t *testing.T) { OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - PodRoleLabel: "spilo-role", + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + EnableOwnerReferences: util.True(), + PodRoleLabel: "spilo-role", }, }, }, client, pg, logger, eventRecorder) @@ -678,10 +662,31 @@ func TestUpdateStreams(t *testing.T) { context.TODO(), &pg, metav1.CreateOptions{}) assert.NoError(t, err) - // create the stream + // create stream with different owner reference + fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name) + fes.ObjectMeta.Labels["cluster-name"] = pg.Name + createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( + context.TODO(), fes, metav1.CreateOptions{}) + assert.NoError(t, err) + assert.Equal(t, createdStream.Spec.ApplicationId, appId) + + // sync the stream which should update the owner reference err = cluster.syncStream(appId) assert.NoError(t, err) + // check that only one stream exists after sync + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), + } + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) + + // compare owner references + if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { + t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) + } + // change specs of streams and patch CRD for i, stream := range pg.Spec.Streams { if stream.ApplicationId == appId { @@ -694,10 +699,7 @@ func TestUpdateStreams(t *testing.T) { } // compare stream returned from API with expected stream - listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), - } - streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) + streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result := cluster.generateFabricEventStream(appId) if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) @@ -716,9 +718,51 @@ func TestUpdateStreams(t *testing.T) { if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) } +} - mockClient := k8sutil.NewMockKubernetesClient() - cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter +func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) { + patchData, err := specPatch(pgSpec) + assert.NoError(t, err) + + pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( + context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgPatched.Spec + err = cluster.syncStream(appId) + assert.NoError(t, err) + + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + + return streams +} + +func TestDeleteStreams(t *testing.T) { + pg.Name = fmt.Sprintf("%s-4", pg.Name) + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + _, err := cluster.KubeClient.Postgresqls(namespace).Create( + context.TODO(), &pg, metav1.CreateOptions{}) + assert.NoError(t, err) + + // create the stream + err = cluster.syncStream(appId) + assert.NoError(t, err) // remove streams from manifest pg.Spec.Streams = nil @@ -729,26 +773,32 @@ func TestUpdateStreams(t *testing.T) { appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams) cluster.cleanupRemovedStreams(appIds) - streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) - if len(streams.Items) > 0 || err != nil { - t.Errorf("stream resource has not been removed or unexpected error %v", err) + // check that streams have been deleted + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), } -} - -func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) { - patchData, err := specPatch(pgSpec) + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) + assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items)) - pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( - context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") + // create stream to test deleteStreams code + fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name) + fes.ObjectMeta.Labels["cluster-name"] = pg.Name + _, err = cluster.KubeClient.FabricEventStreams(namespace).Create( + context.TODO(), fes, metav1.CreateOptions{}) assert.NoError(t, err) - cluster.Postgresql.Spec = pgPatched.Spec + // sync it once to cluster struct err = cluster.syncStream(appId) assert.NoError(t, err) + // we need a mock client because deleteStreams checks for CRD existance + mockClient := k8sutil.NewMockKubernetesClient() + cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter + cluster.deleteStreams() + + // check that streams have been deleted streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - - return streams + assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items)) }