diff --git a/build.gradle b/build.gradle index fbedeba..44c2c3b 100644 --- a/build.gradle +++ b/build.gradle @@ -50,7 +50,7 @@ project.ext { } } liKafkaVersion = "2.0.0.21" - marioVersion = "0.0.21" + marioVersion = "0.0.22" } subprojects { @@ -72,6 +72,7 @@ subprojects { testCompile 'org.testng:testng:6.11' testRuntime 'log4j:log4j:1.2.17' testRuntime 'org.slf4j:slf4j-log4j12:1.7.26' + testRuntime 'org.apache.logging.log4j:log4j-to-slf4j:2.12.0' } compileJava { diff --git a/integration-tests/build.gradle b/integration-tests/build.gradle index 6349bb9..3051e35 100644 --- a/integration-tests/build.gradle +++ b/integration-tests/build.gradle @@ -12,8 +12,11 @@ dependencies { testCompile (project(":kafka-test-harness")) { exclude group: 'org.slf4j', module:'slf4j-log4j12' } - testCompile ("com.linkedin.mario:mario-integration-tests:${rootProject.ext.marioVersion}") { - exclude group: 'org.apache.kafka' + testCompile ("com.linkedin.mario:mario-vertx:${rootProject.ext.marioVersion}") { + } +// testCompile ("com.linkedin.mario:mario-integration-tests:${rootProject.ext.marioVersion}") { +// exclude group: 'org.apache.kafka' +// } testCompile "commons-lang:commons-lang:2.6" } \ No newline at end of file diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java index 788d7d0..50002c2 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java @@ -7,11 +7,11 @@ import com.google.common.collect.ImmutableMap; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; +import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils; import com.linkedin.mario.common.models.v1.ClientConfigRule; import com.linkedin.mario.common.models.v1.ClientConfigRules; import com.linkedin.mario.common.models.v1.ClientPredicates; import com.linkedin.mario.server.MarioApplication; -import com.linkedin.mario.test.TestUtils; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.time.Duration; @@ -91,7 +91,7 @@ public void testConsumerLiveConfigReload() throws Exception { consumer.subscribe(Collections.singletonList(topic)); AtomicReference> recordsRef = new AtomicReference<>(null); AtomicReference> delegateBeforeRef = new AtomicReference<>(null); - TestUtils.waitUntil("1st record batch", () -> { + KafkaTestUtils.waitUntil("1st record batch", () -> { ConsumerRecords recs = consumer.poll(Duration.ofSeconds(10)); if (recs.count() > 0) { recordsRef.set(recs); @@ -115,12 +115,12 @@ public void testConsumerLiveConfigReload() throws Exception { mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList( new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.poll.records", "" + afterBatchSize))))); - TestUtils.waitUntil("delegate recreated", () -> { + KafkaTestUtils.waitUntil("delegate recreated", () -> { Consumer delegateNow = consumer.getDelegate(); return delegateNow != delegate; }, 1, 2, TimeUnit.MINUTES, false); - TestUtils.waitUntil("1nd record batch", () -> { + KafkaTestUtils.waitUntil("1nd record batch", () -> { ConsumerRecords recs = consumer.poll(Duration.ofSeconds(10)); if (recs.count() > 0) { recordsRef.set(recs); diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java index 6a5b40c..74c36e7 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java @@ -8,11 +8,11 @@ import com.google.common.collect.ImmutableMap; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; +import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils; import com.linkedin.mario.common.models.v1.ClientConfigRule; import com.linkedin.mario.common.models.v1.ClientConfigRules; import com.linkedin.mario.common.models.v1.ClientPredicates; import com.linkedin.mario.server.MarioApplication; -import com.linkedin.mario.test.TestUtils; import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -91,7 +91,7 @@ record = new ProducerRecord<>(topic, 0, key, value); mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList( new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.request.size", "" + 9000))))); - TestUtils.waitUntil("delegate recreated", () -> { + KafkaTestUtils.waitUntil("delegate recreated", () -> { Producer delegateNow = producer.getDelegate(); return delegateNow != delegate; }, 1, 2, TimeUnit.MINUTES, false); diff --git a/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/KafkaTestUtils.java b/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/KafkaTestUtils.java index cb012e5..1eeff4c 100644 --- a/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/KafkaTestUtils.java +++ b/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/KafkaTestUtils.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Properties; import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.admin.AdminClient; @@ -131,6 +132,32 @@ public static void quietly(Task task) { } } + public static void waitUntil( + String description, + UsableSupplier condition, + long sleepIncrements, + long timeout, TimeUnit timeoutUnit, + boolean allowExceptions + ) throws InterruptedException { + long start = System.currentTimeMillis(); + long deadline = start + timeoutUnit.toMillis(timeout); + long now = start; + while (now < deadline) { + try { + if (condition.get()) { + return; + } + } catch (Exception e) { + if (!allowExceptions) { + throw new RuntimeException(e); + } + } + Thread.sleep(sleepIncrements); + now = System.currentTimeMillis(); + } + throw new IllegalStateException("condition " + description + " did not turn true within " + timeout + " " + timeoutUnit); + } + public static String getRandomString(int length) { char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; Random random = new Random(); @@ -155,4 +182,9 @@ public static String getExceptionString(int length) { public interface Task { void run() throws Exception; } + + @FunctionalInterface + public interface UsableSupplier { + T get() throws Exception; + } }