diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 7ed06b3fa0..ec82ecebf0 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -13,6 +13,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -30,6 +31,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; @@ -40,9 +43,9 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -52,6 +55,7 @@ import static org.mockito.Mockito.when; public class KafkaSourceJsonTypeIT { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceJsonTypeIT.class); private static final int TEST_ID = 123456; @Mock private KafkaSourceConfig sourceConfig; @@ -92,7 +96,7 @@ public KafkaSource createObjectUnderTest() { } @BeforeEach - public void setup() { + public void setup() throws Throwable { sourceConfig = mock(KafkaSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); @@ -108,15 +112,16 @@ public void setup() { when(pipelineDescription.getPipelineName()).thenReturn("testPipeline"); try { doAnswer(args -> { - Collection> bufferedRecords = (Collection>)args.getArgument(0); + Collection> bufferedRecords = (Collection>) args.getArgument(0); receivedRecords.addAll(bufferedRecords); return null; }).when(buffer).writeAll(any(Collection.class), any(Integer.class)); - } catch (Exception e){} + } catch (Exception e) { + } testKey = RandomStringUtils.randomAlphabetic(5); - testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); - testTopic = "TestJsonTopic_"+RandomStringUtils.randomAlphabetic(5); + testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(6); + testTopic = "TestJsonTopic_" + RandomStringUtils.randomAlphabetic(5); jsonTopic = mock(TopicConfig.class); when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); @@ -129,8 +134,42 @@ public void setup() { when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest"); when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); + LOG.info("Using Kafka bootstrap servers: {}", bootstrapServers); when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean created = new AtomicBoolean(false); + Throwable[] createThrowable = new Throwable[1]; + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.createTopics( + Collections.singleton(new NewTopic(testTopic, 1, (short) 1))) + .all().whenComplete((v, throwable) -> { + created.set(true); + createThrowable[0] = throwable; + }); + } + await().atMost(Duration.ofSeconds(30)) + .until(created::get); + + if(createThrowable[0] != null) + throw createThrowable[0]; + } + + @AfterEach + void tearDown() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean deleted = new AtomicBoolean(false); + Throwable[] createThrowable = new Throwable[1]; + final String topicName = jsonTopic.getName(); + try (AdminClient adminClient = AdminClient.create(props)) { + adminClient.deleteTopics(Collections.singleton(topicName)) + .all().whenComplete((v, throwable) -> deleted.set(true)); + } + await().atMost(Duration.ofSeconds(30)) + .until(deleted::get); } @Test @@ -143,26 +182,9 @@ public void TestJsonRecordsWithNullKey() throws Exception { when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = jsonTopic.getName(); - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.createTopics( - Collections.singleton(new NewTopic(topicName, 1, (short)1))) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(true); - } - while (created.get() != true) { - Thread.sleep(1000); - } kafkaSource.start(buffer); testKey = null; - produceJsonRecords(bootstrapServers, topicName, numRecords); + produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { Thread.sleep(1000); @@ -170,28 +192,16 @@ public void TestJsonRecordsWithNullKey() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); assertThat(map.get("kafka_key"), equalTo(null)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); } - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } } @Test @@ -205,25 +215,8 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(true); kafkaSource = createObjectUnderTest(); - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = jsonTopic.getName(); - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.createTopics( - Collections.singleton(new NewTopic(topicName, 1, (short)1))) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(true); - } - while (created.get() != true) { - Thread.sleep(1000); - } kafkaSource.start(buffer); - produceJsonRecords(bootstrapServers, topicName, numRecords); + produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { Thread.sleep(1000); @@ -231,13 +224,13 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); event.getEventHandle().release(false); } @@ -249,28 +242,16 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); event.getEventHandle().release(true); } - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } } @Test @@ -283,25 +264,8 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception { when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = jsonTopic.getName(); - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.createTopics( - Collections.singleton(new NewTopic(topicName, 1, (short)1))) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(true); - } - while (created.get() != true) { - Thread.sleep(1000); - } kafkaSource.start(buffer); - produceJsonRecords(bootstrapServers, topicName, numRecords); + produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { Thread.sleep(1000); @@ -309,27 +273,15 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); } - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } } @Test @@ -342,25 +294,8 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception { when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = jsonTopic.getName(); - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.createTopics( - Collections.singleton(new NewTopic(topicName, 1, (short)1))) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(true); - } - while (created.get() != true) { - Thread.sleep(1000); - } kafkaSource.start(buffer); - produceJsonRecords(bootstrapServers, topicName, numRecords); + produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { Thread.sleep(1000); @@ -368,28 +303,16 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); assertThat(map.get("kafka_key"), equalTo(testKey)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); } - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } } @Test @@ -402,26 +325,9 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { when(sourceConfig.getAuthConfig()).thenReturn(null); kafkaSource = createObjectUnderTest(); - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - AtomicBoolean created = new AtomicBoolean(false); - final String topicName = jsonTopic.getName(); - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.createTopics( - Collections.singleton(new NewTopic(topicName, 1, (short)1))) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(true); - } - while (created.get() != true) { - Thread.sleep(1000); - } kafkaSource.start(buffer); assertThat(kafkaSource.getConsumer().groupMetadata().groupId(), equalTo(testGroup)); - produceJsonRecords(bootstrapServers, topicName, numRecords); + produceJsonRecords(bootstrapServers, testTopic, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { Thread.sleep(1000); @@ -429,46 +335,35 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { assertThat(receivedRecords.size(), equalTo(numRecords)); for (int i = 0; i < numRecords; i++) { Record record = receivedRecords.get(i); - Event event = (Event)record.getData(); + Event event = (Event) record.getData(); EventMetadata metadata = event.getMetadata(); Map map = event.toMap(); - assertThat(map.get("name"), equalTo("testName"+i)); - assertThat(map.get("id"), equalTo(TEST_ID+i)); + assertThat(map.get("name"), equalTo("testName" + i)); + assertThat(map.get("id"), equalTo(TEST_ID + i)); assertThat(map.get("status"), equalTo(true)); assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey)); - assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName)); + assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); } - try (AdminClient adminClient = AdminClient.create(props)) { - try { - adminClient.deleteTopics(Collections.singleton(topicName)) - .all().get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - created.set(false); - } - while (created.get() != false) { - Thread.sleep(1000); - } } public void produceJsonRecords(final String servers, final String topicName, final int numRecords) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.StringSerializer.class); + org.apache.kafka.common.serialization.StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.StringSerializer.class); + org.apache.kafka.common.serialization.StringSerializer.class); KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i < numRecords; i++) { - String value = "{\"name\":\"testName"+i+"\", \"id\":"+(TEST_ID+i)+", \"status\":true}"; + String value = "{\"name\":\"testName" + i + "\", \"id\":" + (TEST_ID + i) + ", \"status\":true}"; ProducerRecord record = - new ProducerRecord<>(topicName, testKey, value); + new ProducerRecord<>(topicName, testKey, value); producer.send(record); try { Thread.sleep(100); - } catch (Exception e){} + } catch (Exception e) { + } } producer.close(); }