diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 48e1ce6907..f659070436 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -11,7 +11,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -54,14 +53,10 @@ public class KafkaBufferIT { private AcknowledgementSetManager acknowledgementSetManager; @Mock private BufferTopicConfig topicConfig; - - private PluginMetrics pluginMetrics; private String bootstrapServersCommaDelimited; @BeforeEach void setUp() { - pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); MessageFormat messageFormat = MessageFormat.JSON; @@ -94,7 +89,7 @@ void setUp() { } private KafkaBuffer createObjectUnderTest() { - return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, null, null); + return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, null, null); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java index 9588ccc3bb..32c4ef6cd2 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java @@ -132,7 +132,7 @@ private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile @Test void test_otel_metrics_with_kafka_buffer() throws Exception { - KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, new OTelMetricDecoder(), null, null); + KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelMetricDecoder(), null, null); buffer = new KafkaDelegatingBuffer(kafkaBuffer); oTelMetricsGrpcService = new OTelMetricsGrpcService(10000, buffer, diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index fd8f7365da..7c8f1f7800 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -57,10 +57,10 @@ public class KafkaBuffer extends AbstractBuffer> { @DataPrepperPluginConstructor public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory, - final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager, final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier, final CircuitBreaker circuitBreaker) { - super(pluginSetting); + super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName()); SerializationFactory serializationFactory = new SerializationFactory(); final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier); this.byteDecoder = byteDecoder; @@ -74,7 +74,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker); emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), - innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null); + innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 9a984115d1..bb4f94d972 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -15,7 +15,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.breaker.CircuitBreaker; @@ -80,9 +79,6 @@ class KafkaBufferTest { @Mock private KafkaBufferConfig bufferConfig; - @Mock - private PluginMetrics pluginMetrics; - @Mock private AcknowledgementSetManager acknowledgementSetManager; @@ -151,7 +147,7 @@ public KafkaBuffer createObjectUnderTest(final List consume })) { executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService); - return new KafkaBuffer(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, awsCredentialsSupplier, circuitBreaker); + return new KafkaBuffer(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker); } } @@ -160,7 +156,6 @@ public KafkaBuffer createObjectUnderTest(final List consume @BeforeEach void setUp() { when(pluginSetting.getPipelineName()).thenReturn("pipeline"); - pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); when(topic1.getName()).thenReturn("topic1"); when(topic1.isCreateTopic()).thenReturn(true);