diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 81d48982e2..ccf516828b 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -202,7 +202,32 @@ func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{})) { ss := obj.(*appsv1.StatefulSet) return ss.GetNamespace() == systemNamespace && kafkainternals.IsKnownStatefulSet(ss.GetName()) }, - Handler: controller.HandleAll(handle), + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: handle, + UpdateFunc: func(oldObj, newObj interface{}) { + o, ok := oldObj.(*appsv1.StatefulSet) + if !ok { + return + } + n, ok := newObj.(*appsv1.StatefulSet) + if !ok { + return + } + + // This should never happen, but we check for nil to be sure. + if o.Spec.Replicas == nil || n.Spec.Replicas == nil { + return + } + + // Only handle when replicas change + if *o.Spec.Replicas == *n.Spec.Replicas && o.Status.ReadyReplicas == n.Status.ReadyReplicas { + return + } + + handle(newObj) + }, + DeleteFunc: handle, + }, }) }