Skip to content

Commit

Permalink
Pass custom metric prefix if present to AbstractBuffer when using Kaf…
Browse files Browse the repository at this point in the history
…kaBuffer (opensearch-project#3638)

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Nov 12, 2023
1 parent cf6b8ee commit 504485c
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand Down Expand Up @@ -54,14 +53,10 @@ public class KafkaBufferIT {
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private BufferTopicConfig topicConfig;

private PluginMetrics pluginMetrics;
private String bootstrapServersCommaDelimited;

@BeforeEach
void setUp() {
pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString());

when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString());

MessageFormat messageFormat = MessageFormat.JSON;
Expand Down Expand Up @@ -94,7 +89,7 @@ void setUp() {
}

private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, null, null);
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, null, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile

@Test
void test_otel_metrics_with_kafka_buffer() throws Exception {
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, new OTelMetricDecoder(), null, null);
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelMetricDecoder(), null, null);
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
oTelMetricsGrpcService = new OTelMetricsGrpcService(10000,
buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier,
final CircuitBreaker circuitBreaker) {
super(pluginSetting);
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName());
SerializationFactory serializationFactory = new SerializationFactory();
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier);
this.byteDecoder = byteDecoder;
Expand All @@ -74,7 +74,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null);
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null);
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
Expand Down Expand Up @@ -80,9 +79,6 @@ class KafkaBufferTest {
@Mock
private KafkaBufferConfig bufferConfig;

@Mock
private PluginMetrics pluginMetrics;

@Mock
private AcknowledgementSetManager acknowledgementSetManager;

Expand Down Expand Up @@ -151,7 +147,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
})) {

executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
return new KafkaBuffer(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null, awsCredentialsSupplier, circuitBreaker);
return new KafkaBuffer(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker);
}
}

Expand All @@ -160,7 +156,6 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
@BeforeEach
void setUp() {
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
pluginMetrics = mock(PluginMetrics.class);
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
when(topic1.getName()).thenReturn("topic1");
when(topic1.isCreateTopic()).thenReturn(true);
Expand Down

0 comments on commit 504485c

Please sign in to comment.