Skip to content

Commit

Permalink
do not use extra labels to list stream CRDs (#2803)
Browse files Browse the repository at this point in the history
* do not use extra labels to list stream CRDs
* add diff on labels for streams + unit test coverage
  • Loading branch information
FxKu authored Dec 17, 2024
1 parent 80ef38f commit d44bfab
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 9 deletions.
12 changes: 10 additions & 2 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ func (c *Cluster) syncStream(appId string) error {
c.setProcessName("syncing stream with applicationId %s", appId)
c.logger.Debugf("syncing stream with applicationId %s", appId)

listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()}
listOptions := metav1.ListOptions{
LabelSelector: c.labelsSet(false).String(),
}
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
if err != nil {
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
Expand All @@ -492,7 +494,8 @@ func (c *Cluster) syncStream(appId string) error {
}
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
desiredStreams.ObjectMeta = stream.ObjectMeta
// make sure to keep the old name with randomly generated suffix
desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name
updatedStream, err := c.updateStreams(desiredStreams)
if err != nil {
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
Expand Down Expand Up @@ -527,6 +530,11 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}

if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) {
match = false
reasons = append(reasons, "new streams labels do not match the current ones")
}

if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
Expand Down
55 changes: 48 additions & 7 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
}

listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
LabelSelector: cluster.labelsSet(false).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
Expand Down Expand Up @@ -529,7 +529,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin
}

func TestSyncStreams(t *testing.T) {
pg.Name = fmt.Sprintf("%s-2", pg.Name)
newClusterName := fmt.Sprintf("%s-2", pg.Name)
pg.Name = newClusterName
var cluster = New(
Config{
OpConfig: config.Config{
Expand Down Expand Up @@ -560,7 +561,7 @@ func TestSyncStreams(t *testing.T) {

// check that only one stream remains after sync
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
LabelSelector: cluster.labelsSet(false).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
Expand Down Expand Up @@ -812,6 +813,49 @@ func TestDeleteStreams(t *testing.T) {
err = cluster.syncStream(appId)
assert.NoError(t, err)

// change specs of streams and patch CRD
for i, stream := range pg.Spec.Streams {
if stream.ApplicationId == appId {
streamTable := stream.Tables["data.bar"]
streamTable.EventType = "stream-type-c"
stream.Tables["data.bar"] = streamTable
stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250))
pg.Spec.Streams[i] = stream
}
}

// compare stream returned from API with expected stream
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(false).String(),
}
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
result := cluster.generateFabricEventStream(appId)
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
}

// change teamId and check that stream is updated
pg.Spec.TeamID = "new-team"
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
result = cluster.generateFabricEventStream(appId)
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after updating teamId, expected %#v, got %#v", streams.Items[0].ObjectMeta.Labels, result.ObjectMeta.Labels)
}

// disable recovery
for idx, stream := range pg.Spec.Streams {
if stream.ApplicationId == appId {
stream.EnableRecovery = util.False()
pg.Spec.Streams[idx] = stream
}
}

streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
result = cluster.generateFabricEventStream(appId)
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
}

// remove streams from manifest
pg.Spec.Streams = nil
pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update(
Expand All @@ -822,10 +866,7 @@ func TestDeleteStreams(t *testing.T) {
cluster.cleanupRemovedStreams(appIds)

// check that streams have been deleted
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))

Expand Down

0 comments on commit d44bfab

Please sign in to comment.