Skip to content

Commit

Permalink
Merge pull request #2540 from ozangunalp/otel_instance_injection
Browse files Browse the repository at this point in the history
Retreive OpenTelemetry instance through CDI injection instead of relying on GlobalOpenTelemetry.get
  • Loading branch information
ozangunalp committed Mar 18, 2024
2 parents efcc5ac + 30493b4 commit 39f6b25
Show file tree
Hide file tree
Showing 48 changed files with 438 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
Expand Down Expand Up @@ -121,6 +122,9 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt
@Any
private Instance<SSLContext> clientSslContexts;

@Inject
private Instance<OpenTelemetry> openTelemetryInstance;

private final List<AmqpClient> clients = new CopyOnWriteArrayList<>();

/**
Expand Down Expand Up @@ -230,7 +234,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
AmqpFailureHandler onNack = createFailureHandler(ic);

if (tracing && amqpInstrumenter == null) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance);
}

Multi<? extends Message<?>> multi = holder.getOrEstablishConnection()
Expand Down Expand Up @@ -318,7 +322,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
this,
holder,
oc,
getSender);
getSender,
openTelemetryInstance);
processors.put(oc.getChannel(), processor);

return MultiUtils.via(processor, m -> m.onFailure().invoke(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
Expand Down Expand Up @@ -57,7 +60,8 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,
private volatile boolean creditRetrievalInProgress = false;

public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender) {
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender,
Instance<OpenTelemetry> openTelemetryInstance) {
this.connector = connector;
this.holder = holder;
this.retrieveSender = retrieveSender;
Expand All @@ -75,7 +79,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
this.retryInterval = configuration.getReconnectInterval();

if (tracingEnabled) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance);
} else {
amqpInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.amqp.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -19,20 +21,20 @@ private AmqpOpenTelemetryInstrumenter(Instrumenter<AmqpMessage<?>, Void> instrum
this.instrumenter = instrumenter;
}

public static AmqpOpenTelemetryInstrumenter createForConnector() {
return create(false);
public static AmqpOpenTelemetryInstrumenter createForConnector(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

public static AmqpOpenTelemetryInstrumenter createForSender() {
return create(true);
public static AmqpOpenTelemetryInstrumenter createForSender(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

private static AmqpOpenTelemetryInstrumenter create(boolean sender) {
private static AmqpOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean sender) {
MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE;
AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -161,6 +162,9 @@ public SpanBuilder spanBuilder(final String spanName) {
@Any
Instance<KafkaFailureHandler.Factory> failureHandlerFactories;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Inject
KafkaCDIEvents kafkaCDIEvents;

Expand Down Expand Up @@ -209,7 +213,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
});

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, -1);
Expand All @@ -231,7 +235,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
// create an instance of source per partitions.
List<Publisher<? extends Message<?>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, i);
Expand Down Expand Up @@ -268,7 +272,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
if (oc.getHealthReadinessTimeout().isPresent()) {
log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
}
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors);
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
sinks.add(sink);
return sink.getSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
Expand Down Expand Up @@ -78,7 +79,9 @@ public class KafkaSink {

private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter;

public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents,
public KafkaSink(KafkaConnectorOutgoingConfiguration config,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<SerializationFailureHandler<?>> serializationFailureHandlers,
Instance<ProducerInterceptor<?, ?>> producerInterceptors) {
this.isTracingEnabled = config.getTracingEnabled();
Expand Down Expand Up @@ -134,7 +137,7 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk
}));

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Header;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class KafkaSource<K, V> {
public KafkaSource(Vertx vertx,
String consumerGroup,
KafkaConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactories,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners,
Expand Down Expand Up @@ -227,7 +229,7 @@ public KafkaSource(Vertx vertx,
}

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Channel;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.EmitterFactory;
Expand Down Expand Up @@ -75,10 +76,13 @@ public class KafkaRequestReplyFactory implements EmitterFactory<KafkaRequestRepl
@Any
Instance<Map<String, Object>> configurations;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Override
public KafkaRequestReplyImpl<Object, Object> createEmitter(EmitterConfiguration configuration, long defaultBufferSize) {
return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), configurations, holder.vertx(),
kafkaCDIEvents, commitStrategyFactories, failureStrategyFactories, failureHandlers,
kafkaCDIEvents, openTelemetryInstance, commitStrategyFactories, failureStrategyFactories, failureHandlers,
correlationIdHandlers, replyFailureHandlers, rebalanceListeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -81,6 +82,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
Instance<Map<String, Object>> configurations,
Vertx vertx,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactory,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers,
Expand Down Expand Up @@ -117,7 +119,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
String consumerGroup = consumerConfig.getGroupId().orElseGet(() -> UUID.randomUUID().toString());
this.waitForPartitions = getWaitForPartitions(consumerConfig);
this.gracefulShutdown = consumerConfig.getGracefulShutdown();
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig,
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig, openTelemetryInstance,
commitHandlerFactory, failureHandlerFactories, rebalanceListeners, kafkaCDIEvents,
deserializationFailureHandlers, -1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -23,22 +25,22 @@ private KafkaOpenTelemetryInstrumenter(Instrumenter<KafkaTrace, Void> instrument
this.instrumenter = instrumenter;
}

public static KafkaOpenTelemetryInstrumenter createForSource() {
return create(true);
public static KafkaOpenTelemetryInstrumenter createForSource(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

public static KafkaOpenTelemetryInstrumenter createForSink() {
return create(false);
public static KafkaOpenTelemetryInstrumenter createForSink(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

private static KafkaOpenTelemetryInstrumenter create(boolean source) {
private static KafkaOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) {

MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;

KafkaAttributesExtractor kafkaAttributesExtractor = new KafkaAttributesExtractor();
MessagingAttributesGetter<KafkaTrace, Void> messagingAttributesGetter = kafkaAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testSinkUsingInteger() {
.with("channel-name", "testSinkUsingInteger");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -84,6 +85,7 @@ public void testSinkUsingIntegerAndChannelName() {
.with("partition", 0);
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -106,6 +108,7 @@ public void testSinkUsingString() {
.with("channel-name", "testSinkUsingString");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand Down Expand Up @@ -237,7 +240,8 @@ public void testInvalidPayloadType() {
.with("retries", 0L); // disable retry.
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents();
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

await().until(() -> {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
Expand Down Expand Up @@ -286,8 +290,8 @@ public void testInvalidTypeWithDefaultInflightMessages() {
.with("retries", 0L)
.with("channel-name", "testInvalidTypeWithDefaultInflightMessages");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents,
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

Flow.Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 5)
Expand Down
Loading

0 comments on commit 39f6b25

Please sign in to comment.