diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 14fc3aaf0..616a6828e 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -467,7 +467,9 @@ func (c *Cluster) syncStream(appId string) error { c.setProcessName("syncing stream with applicationId %s", appId) c.logger.Debugf("syncing stream with applicationId %s", appId) - listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()} + listOptions := metav1.ListOptions{ + LabelSelector: c.labelsSet(false).String(), + } streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) if err != nil { return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err) @@ -492,7 +494,8 @@ func (c *Cluster) syncStream(appId string) error { } if match, reason := c.compareStreams(&stream, desiredStreams); !match { c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) - desiredStreams.ObjectMeta = stream.ObjectMeta + // make sure to keep the old name with randomly generated suffix + desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name updatedStream, err := c.updateStreams(desiredStreams) if err != nil { return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) @@ -527,6 +530,11 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) } + if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) { + match = false + reasons = append(reasons, "new streams labels do not match the current ones") + } + if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed { match = false reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason)) diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 86fd235c7..dac3615c8 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -490,7 +490,7 @@ func TestGenerateFabricEventStream(t *testing.T) { } listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), + LabelSelector: cluster.labelsSet(false).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) @@ -529,7 +529,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin } func TestSyncStreams(t *testing.T) { - pg.Name = fmt.Sprintf("%s-2", pg.Name) + newClusterName := fmt.Sprintf("%s-2", pg.Name) + pg.Name = newClusterName var cluster = New( Config{ OpConfig: config.Config{ @@ -560,7 +561,7 @@ func TestSyncStreams(t *testing.T) { // check that only one stream remains after sync listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), + LabelSelector: cluster.labelsSet(false).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) @@ -812,6 +813,49 @@ func TestDeleteStreams(t *testing.T) { err = cluster.syncStream(appId) assert.NoError(t, err) + // change specs of streams and patch CRD + for i, stream := range pg.Spec.Streams { + if stream.ApplicationId == appId { + streamTable := stream.Tables["data.bar"] + streamTable.EventType = "stream-type-c" + stream.Tables["data.bar"] = streamTable + stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250)) + pg.Spec.Streams[i] = stream + } + } + + // compare stream returned from API with expected stream + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(false).String(), + } + streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) + result := cluster.generateFabricEventStream(appId) + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { + t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) + } + + // change teamId and check that stream is updated + pg.Spec.TeamID = "new-team" + streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) + result = cluster.generateFabricEventStream(appId) + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { + t.Errorf("Malformed FabricEventStream after updating teamId, expected %#v, got %#v", streams.Items[0].ObjectMeta.Labels, result.ObjectMeta.Labels) + } + + // disable recovery + for idx, stream := range pg.Spec.Streams { + if stream.ApplicationId == appId { + stream.EnableRecovery = util.False() + pg.Spec.Streams[idx] = stream + } + } + + streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) + result = cluster.generateFabricEventStream(appId) + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { + t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) + } + // remove streams from manifest pg.Spec.Streams = nil pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( @@ -822,10 +866,7 @@ func TestDeleteStreams(t *testing.T) { cluster.cleanupRemovedStreams(appIds) // check that streams have been deleted - listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), - } - streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))