diff --git a/control-plane/pkg/reconciler/trigger/v2/controllerv2.go b/control-plane/pkg/reconciler/trigger/v2/controllerv2.go index a76be1ccdb..b0de9e0453 100644 --- a/control-plane/pkg/reconciler/trigger/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/trigger/v2/controllerv2.go @@ -119,7 +119,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf // Filter Brokers and enqueue associated Triggers brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: kafka.BrokerClassFilter(), - Handler: enqueueTriggers(logger, triggerLister, impl.Enqueue), + Handler: enqueueTriggers(logger, triggerLister, impl.Enqueue, coreFeatureStore), }) // ConsumerGroup changes and enqueue associated Trigger @@ -198,21 +198,29 @@ func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) b func enqueueTriggers( logger *zap.Logger, lister eventinglisters.TriggerLister, - enqueue func(obj interface{})) cache.ResourceEventHandler { + enqueue func(obj interface{}), + featureStore *feature.Store) cache.ResourceEventHandler { return controller.HandleAll(func(obj interface{}) { if broker, ok := obj.(*eventing.Broker); ok { + features := featureStore.Load() selector := labels.SelectorFromSet(map[string]string{apiseventing.BrokerLabelKey: broker.Name}) - triggers, err := lister.Triggers(broker.Namespace).List(selector) + triggers, err := lister.Triggers(metav1.NamespaceAll).List(selector) if err != nil { logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err)) return } for _, trigger := range triggers { - enqueue(trigger) + if features.IsCrossNamespaceEventLinks() { + if trigger.Spec.BrokerRef != nil && trigger.Spec.BrokerRef.Namespace == broker.Namespace { + enqueue(trigger) + } + } else if trigger.Namespace == broker.Namespace { + enqueue(trigger) + } } } }) diff --git a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go index d933b17d64..14a0f8f129 100644 --- a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go +++ b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go @@ -94,7 +94,16 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigge return fmt.Errorf("failed to setup OIDC service account: %w", err) } - broker, err := r.BrokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + var brokerName, brokerNamespace string + if trigger.Spec.BrokerRef != nil && feature.FromContext(ctx).IsCrossNamespaceEventLinks() { + brokerName = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerName = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := r.BrokerLister.Brokers(brokerNamespace).Get(brokerName) if err != nil && !apierrors.IsNotFound(err) { trigger.Status.MarkBrokerFailed("Failed to get broker", "%v", err) return fmt.Errorf("failed to get broker: %w", err) @@ -104,7 +113,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigge // Actually check if the broker doesn't exist. // Note: do not introduce another `broker` variable with `:` - broker, err = r.EventingClient.EventingV1().Brokers(trigger.Namespace).Get(ctx, trigger.Spec.Broker, metav1.GetOptions{}) + broker, err = r.EventingClient.EventingV1().Brokers(brokerNamespace).Get(ctx, brokerName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return fmt.Errorf("failed to get broker: %w", err)