From e26b8994a3a8549f867d2b0e8915ee72b2858f12 Mon Sep 17 00:00:00 2001 From: Craig Day Date: Wed, 29 Mar 2023 11:22:34 -0700 Subject: [PATCH] introduce two CDI helper extension to add beans explicitly --- .../messaging/kafka/KafkaCDIExtension.java | 41 +++++++++++++++++++ .../ReactiveMessagingCDIExtension.java | 41 +++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaCDIExtension.java create mode 100644 smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ReactiveMessagingCDIExtension.java diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaCDIExtension.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaCDIExtension.java new file mode 100644 index 0000000000..2975201880 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaCDIExtension.java @@ -0,0 +1,41 @@ +package io.smallrye.reactive.messaging.kafka; + +import io.smallrye.reactive.messaging.kafka.commit.FileCheckpointStateStore; +import io.smallrye.reactive.messaging.kafka.commit.KafkaCheckpointCommit; +import io.smallrye.reactive.messaging.kafka.commit.KafkaIgnoreCommit; +import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit; +import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit; +import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; +import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop; +import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure; +import io.smallrye.reactive.messaging.kafka.impl.KafkaClientServiceImpl; +import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactionsFactory; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.inject.spi.BeforeBeanDiscovery; +import jakarta.enterprise.inject.spi.Extension; +import java.util.Set; + +public class KafkaCDIExtension implements Extension { + + private static final Set> DEFAULT_BEAN_CLASSES = Set.of( + KafkaTransactionsFactory.class, + KafkaThrottledLatestProcessedCommit.Factory.class, + KafkaLatestCommit.Factory.class, + KafkaIgnoreCommit.Factory.class, + KafkaCheckpointCommit.Factory.class, + FileCheckpointStateStore.Factory.class, + KafkaFailStop.Factory.class, + KafkaIgnoreFailure.Factory.class, + KafkaDeadLetterQueue.Factory.class, + KafkaCDIEvents.class, + KafkaConnector.class, + KafkaClientServiceImpl.class + ); + + void addDefaultBeans(@Observes BeforeBeanDiscovery bbd, BeanManager bm) { + for (Class clazz : DEFAULT_BEAN_CLASSES) { + bbd.addAnnotatedType(bm.createAnnotatedType(clazz), clazz.getName()); + } + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ReactiveMessagingCDIExtension.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ReactiveMessagingCDIExtension.java new file mode 100644 index 0000000000..1a79a51dfb --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ReactiveMessagingCDIExtension.java @@ -0,0 +1,41 @@ +package io.smallrye.reactive.messaging.providers.extension; + +import io.smallrye.reactive.messaging.providers.MediatorFactory; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; +import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; +import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; +import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; +import io.smallrye.reactive.messaging.providers.wiring.Wiring; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.inject.spi.BeforeBeanDiscovery; +import jakarta.enterprise.inject.spi.Extension; +import java.util.Set; + +public class ReactiveMessagingCDIExtension implements Extension { + + private static final Set> DEFAULT_BEAN_CLASSES = Set.of( + MediatorFactory.class, + MediatorManager.class, + InternalChannelRegistry.class, + ConnectorFactories.class, + ConfiguredChannelFactory.class, + ChannelProducer.class, + ExecutionHolder.class, + WorkerPoolRegistry.class, + HealthCenter.class, + Wiring.class, + EmitterFactoryImpl.class, + MutinyEmitterFactoryImpl.class, + LegacyEmitterFactoryImpl.class, + ContextDecorator.class + ); + + void addDefaultBeans(@Observes BeforeBeanDiscovery bbd, BeanManager bm) { + for (Class clazz : DEFAULT_BEAN_CLASSES) { + bbd.addAnnotatedType(bm.createAnnotatedType(clazz), clazz.getName()); + } + } +}