Skip to content

Commit

Permalink
bump mario, simplify dependencies for tests (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
radai-rosenblatt authored Oct 12, 2019
1 parent f095338 commit 7a4de22
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ project.ext {
}
}
liKafkaVersion = "2.0.0.21"
marioVersion = "0.0.21"
marioVersion = "0.0.22"
}

subprojects {
Expand All @@ -72,6 +72,7 @@ subprojects {
testCompile 'org.testng:testng:6.11'
testRuntime 'log4j:log4j:1.2.17'
testRuntime 'org.slf4j:slf4j-log4j12:1.7.26'
testRuntime 'org.apache.logging.log4j:log4j-to-slf4j:2.12.0'
}

compileJava {
Expand Down
7 changes: 5 additions & 2 deletions integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ dependencies {
testCompile (project(":kafka-test-harness")) {
exclude group: 'org.slf4j', module:'slf4j-log4j12'
}
testCompile ("com.linkedin.mario:mario-integration-tests:${rootProject.ext.marioVersion}") {
exclude group: 'org.apache.kafka'
testCompile ("com.linkedin.mario:mario-vertx:${rootProject.ext.marioVersion}") {

}
// testCompile ("com.linkedin.mario:mario-integration-tests:${rootProject.ext.marioVersion}") {
// exclude group: 'org.apache.kafka'
// }
testCompile "commons-lang:commons-lang:2.6"
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.google.common.collect.ImmutableMap;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import com.linkedin.mario.common.models.v1.ClientConfigRule;
import com.linkedin.mario.common.models.v1.ClientConfigRules;
import com.linkedin.mario.common.models.v1.ClientPredicates;
import com.linkedin.mario.server.MarioApplication;
import com.linkedin.mario.test.TestUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.time.Duration;
Expand Down Expand Up @@ -91,7 +91,7 @@ public void testConsumerLiveConfigReload() throws Exception {
consumer.subscribe(Collections.singletonList(topic));
AtomicReference<ConsumerRecords<byte[], byte[]>> recordsRef = new AtomicReference<>(null);
AtomicReference<Consumer<byte[], byte[]>> delegateBeforeRef = new AtomicReference<>(null);
TestUtils.waitUntil("1st record batch", () -> {
KafkaTestUtils.waitUntil("1st record batch", () -> {
ConsumerRecords<byte[], byte[]> recs = consumer.poll(Duration.ofSeconds(10));
if (recs.count() > 0) {
recordsRef.set(recs);
Expand All @@ -115,12 +115,12 @@ public void testConsumerLiveConfigReload() throws Exception {
mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList(
new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.poll.records", "" + afterBatchSize)))));

TestUtils.waitUntil("delegate recreated", () -> {
KafkaTestUtils.waitUntil("delegate recreated", () -> {
Consumer<byte[], byte[]> delegateNow = consumer.getDelegate();
return delegateNow != delegate;
}, 1, 2, TimeUnit.MINUTES, false);

TestUtils.waitUntil("1nd record batch", () -> {
KafkaTestUtils.waitUntil("1nd record batch", () -> {
ConsumerRecords<byte[], byte[]> recs = consumer.poll(Duration.ofSeconds(10));
if (recs.count() > 0) {
recordsRef.set(recs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import com.google.common.collect.ImmutableMap;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import com.linkedin.mario.common.models.v1.ClientConfigRule;
import com.linkedin.mario.common.models.v1.ClientConfigRules;
import com.linkedin.mario.common.models.v1.ClientPredicates;
import com.linkedin.mario.server.MarioApplication;
import com.linkedin.mario.test.TestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Expand Down Expand Up @@ -91,7 +91,7 @@ record = new ProducerRecord<>(topic, 0, key, value);
mario.setConfigPolicy(new ClientConfigRules(Collections.singletonList(
new ClientConfigRule(ClientPredicates.ALL, ImmutableMap.of("max.request.size", "" + 9000)))));

TestUtils.waitUntil("delegate recreated", () -> {
KafkaTestUtils.waitUntil("delegate recreated", () -> {
Producer<byte[], byte[]> delegateNow = producer.getDelegate();
return delegateNow != delegate;
}, 1, 2, TimeUnit.MINUTES, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -131,6 +132,32 @@ public static void quietly(Task task) {
}
}

public static void waitUntil(
String description,
UsableSupplier<Boolean> condition,
long sleepIncrements,
long timeout, TimeUnit timeoutUnit,
boolean allowExceptions
) throws InterruptedException {
long start = System.currentTimeMillis();
long deadline = start + timeoutUnit.toMillis(timeout);
long now = start;
while (now < deadline) {
try {
if (condition.get()) {
return;
}
} catch (Exception e) {
if (!allowExceptions) {
throw new RuntimeException(e);
}
}
Thread.sleep(sleepIncrements);
now = System.currentTimeMillis();
}
throw new IllegalStateException("condition " + description + " did not turn true within " + timeout + " " + timeoutUnit);
}

public static String getRandomString(int length) {
char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
Random random = new Random();
Expand All @@ -155,4 +182,9 @@ public static String getExceptionString(int length) {
public interface Task {
void run() throws Exception;
}

@FunctionalInterface
public interface UsableSupplier<T> {
T get() throws Exception;
}
}

0 comments on commit 7a4de22

Please sign in to comment.