Skip to content

Commit

Permalink
update mario and kafka to latest (#151)
Browse files Browse the repository at this point in the history
Signed-off-by: Radai Rosenblatt <[email protected]>
  • Loading branch information
radai-rosenblatt authored Oct 8, 2019
1 parent a329fd5 commit bd85b38
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 55 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ project.ext {
}
}
liKafkaVersion = "2.0.0.21"
marioVersion = "0.0.19"
marioVersion = "0.0.20"
}

subprojects {
Expand All @@ -70,6 +70,8 @@ subprojects {

dependencies {
testCompile 'org.testng:testng:6.11'
testRuntime 'log4j:log4j:1.2.17'
testRuntime 'org.slf4j:slf4j-log4j12:1.7.26'
}

compileJava {
Expand All @@ -86,7 +88,7 @@ subprojects {
useTestNG()
maxHeapSize = "1024m"
testLogging {
events "passed", "failed", "skipped"
events "started", "passed", "failed", "skipped"
}
}

Expand Down
4 changes: 3 additions & 1 deletion integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ plugins {
}

dependencies {
testCompile project(':kafka-test-harness')
testCompile (project(":kafka-test-harness")) {
exclude group: 'org.slf4j', module:'slf4j-log4j12'
}
testCompile ("com.linkedin.mario:mario-integration-tests:${rootProject.ext.marioVersion}") {
exclude group: 'org.apache.kafka'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -114,6 +116,7 @@ public void tearDown() {
@Test
public void advanceOffsetsWhenLargeMessageCanNotBeAssembled() throws Exception {
String topic = "testOffsetDeliveryIssue";
createTopic(topic);
produceSyntheticMessages(topic);

// Bury under a bunch of non-large messages, some of the internal trimming logic is only triggered when
Expand Down Expand Up @@ -171,8 +174,9 @@ public void advanceOffsetsWhenLargeMessageCanNotBeAssembled() throws Exception {
}

@Test
public void advanceOffsetsWhenMessagesNotDelivered() {
public void advanceOffsetsWhenMessagesNotDelivered() throws Exception {
String topic = "testUle";
createTopic(topic);
produceSyntheticMessages(topic);

// Commit offset with very large high watermark
Expand Down Expand Up @@ -221,8 +225,9 @@ private void seedBadOffset(String topic, Properties openSourceConsumerProperties
}
}
@Test
public void testSeek() {
public void testSeek() throws Exception {
String topic = "testSeek";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -295,8 +300,9 @@ private void verifyMessagesAfterSeek(LiKafkaConsumer<String, String> consumer,
}

@Test
public void testCommit() {
public void testCommit() throws Exception {
String topic = "testCommit";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -369,8 +375,9 @@ public void testCommit() {
}

@Test
public void testCommitWithOffsetMap() {
public void testCommitWithOffsetMap() throws Exception {
String topic = "testCommitWithOffsetMap";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -441,8 +448,9 @@ public void testCommitWithOffsetMap() {
}

@Test(expectedExceptions = TimeoutException.class)
public void testCommitWithTimeout() {
public void testCommitWithTimeout() throws Exception {
String topic = "testCommitWithTimeout";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand All @@ -469,8 +477,9 @@ public void testCommitWithTimeout() {
}

@Test
public void testSeekToBeginningAndEnd() {
public void testSeekToBeginningAndEnd() throws Exception {
String topic = "testSeekToBeginningAndEnd";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testSeekToBeginningAndEnd");
Expand Down Expand Up @@ -523,8 +532,9 @@ public void testSeekToBeginningAndEnd() {
* 9: M5_SEG1(END)
*/
@Test
public void testSeekToCommitted() {
public void testSeekToCommitted() throws Exception {
String topic = "testSeekToCommitted";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -577,6 +587,7 @@ private ConsumerRecords<String, String> waitForData(LiKafkaConsumer<String, Stri
@Test
public void testOffsetCommitCallback() throws Exception {
String topic = "testOffsetCommitCallback";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -641,9 +652,10 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffs
}

@Test
public void testCommittedOnOffsetsCommittedByRawConsumer() {
public void testCommittedOnOffsetsCommittedByRawConsumer() throws Exception {
String topic = "testCommittedOnOffsetsCommittedByRawConsumer";
TopicPartition tp = new TopicPartition(topic, 0);
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -684,7 +696,9 @@ public void testCommittedOnOffsetsCommittedByRawConsumer() {
}

@Test
public void testSeekAfterAssignmentChange() {
public void testSeekAfterAssignmentChange() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -719,7 +733,9 @@ public void testSeekAfterAssignmentChange() {
}

@Test
public void testUnsubscribe() {
public void testUnsubscribe() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -752,8 +768,9 @@ public void testUnsubscribe() {
}

@Test
public void testPosition() {
public void testPosition() throws Exception {
String topic = "testSeek";
createTopic(topic);
TopicPartition tp = new TopicPartition(topic, 0);
TopicPartition tp1 = new TopicPartition(topic, 1);
produceSyntheticMessages(topic);
Expand Down Expand Up @@ -795,6 +812,8 @@ public void testPosition() {
*/
@Test
public void testRebalance() throws Throwable {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer(true);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -865,7 +884,9 @@ public void testRebalance() throws Throwable {
* @throws InterruptedException
*/
@Test
public void testCommitAndResume() throws InterruptedException {
public void testCommitAndResume() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
// Make sure we start to consume from the beginning.
Expand Down Expand Up @@ -928,13 +949,15 @@ public void testCommitAndResume() throws InterruptedException {
* Test search offset by timestamp
*/
@Test
public void testSearchOffsetByTimestamp() {
public void testSearchOffsetByTimestamp() throws Exception {
Properties props = new Properties();
// Make sure we start to consume from the beginning.
props.setProperty("auto.offset.reset", "earliest");
// All the consumers should have the same group id.
props.setProperty("group.id", "testSearchOffsetByTimestamp");
props.setProperty("enable.auto.commit", "false");
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();

try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
Expand Down Expand Up @@ -1108,20 +1131,26 @@ public void testExceptionHandlingAndProcessing() {
testFuncList.forEach(
testFunc -> {
String topic = UUID.randomUUID().toString();
try {
createTopic(topic);
} catch (Exception e) {
fail("unable to create topic " + topic, e);
}
produceSyntheticMessages(topic);
LiKafkaConsumer<byte[], byte[]> consumer = createConsumerForExceptionProcessingTest();
try {
testExceptionProcessingByFunction(topic, consumer, testFunc);
} catch (Exception e) {
fail("failed with unexpected exception");
fail("failed with unexpected exception", e);
}
}
);
}

@Test
public void testExceptionInProcessingLargeMessage() {
public void testExceptionInProcessingLargeMessage() throws Exception {
String topic = "testExceptionInProcessing";
createTopic(topic);
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
Expand Down Expand Up @@ -1189,6 +1218,7 @@ public void testGiganticLargeMessages() throws Exception {
new UUIDFactory.DefaultUUIDFactory<>());

String topic = "testGiganticLargeMessages";
createTopic(topic);
TopicPartition tp = new TopicPartition(topic, 0);
Collection<TopicPartition> tps = new ArrayList<>(Collections.singletonList(tp));

Expand Down Expand Up @@ -1267,6 +1297,7 @@ public void testExceptionOnLargeMsgDropped() throws Exception {
new UUIDFactory.DefaultUUIDFactory<>());

String topic = "testExceptionOnLargeMsgDropped";
createTopic(topic);
TopicPartition tp = new TopicPartition(topic, 0);
Collection<TopicPartition> tps = new ArrayList<>(Collections.singletonList(tp));

Expand Down Expand Up @@ -1350,7 +1381,9 @@ public static Object[][] offsetResetStrategies() {
}

@Test(dataProvider = "offsetResetStrategies")
public void testOffsetOutOfRangeForStrategy(LiOffsetResetStrategy strategy) {
public void testOffsetOutOfRangeForStrategy(LiOffsetResetStrategy strategy) throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
props.setProperty("auto.offset.reset", strategy.name());
Expand Down Expand Up @@ -1409,7 +1442,9 @@ public void testOffsetOutOfRangeForStrategy(LiOffsetResetStrategy strategy) {
* if the committed consumer offset expired. This test reproduces the former case (bootstrap) to assert the behavior.
*/
@Test
public void testBootstrapWithLiClosest() {
public void testBootstrapWithLiClosest() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
props.setProperty("auto.offset.reset", LiOffsetResetStrategy.LICLOSEST.name());
Expand Down Expand Up @@ -1443,7 +1478,9 @@ public void testBootstrapWithLiClosest() {
* - Verify that the offset reset returns the earliest available record
*/
@Test
public void testFallOffStartWithLiClosest() throws InterruptedException {
public void testFallOffStartWithLiClosest() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
props.setProperty("auto.offset.reset", LiOffsetResetStrategy.LICLOSEST.name());
Expand Down Expand Up @@ -1477,7 +1514,7 @@ public void testFallOffStartWithLiClosest() throws InterruptedException {
consumer.assign(tpAsCollection);
long giveUp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
while (consumerRecords.isEmpty() && System.currentTimeMillis() < giveUp) {
consumerRecords = consumer.poll(1000);
consumerRecords = consumer.poll(Duration.ofMillis(100));
}
if (consumerRecords.isEmpty()) {
throw new IllegalStateException("failed to consume any records within timeout");
Expand Down Expand Up @@ -1702,6 +1739,12 @@ private void produceSyntheticMessages(String topic) {
producer.close();
}

private void createTopic(String topicName) throws Exception {
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, NUM_PARTITIONS, (short) 1))).all().get(1, TimeUnit.MINUTES);
}
}

private class RebalanceTestConsumerThread extends Thread {
private final LiKafkaConsumer<String, String> _consumer;
private final int _id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -53,7 +55,7 @@ public void tearDown() {
@Test
public void testConsumerLiveConfigReload() throws Exception {
String topic = "testConsumerLiveConfigReload";

createTopic(topic, 1);
Producer<byte[], byte[]> producer = createRawProducer();
for (int i = 0; i < 1000; i++) {
byte[] key = new byte[1024];
Expand Down Expand Up @@ -83,9 +85,8 @@ public void testConsumerLiveConfigReload() throws Exception {
LiKafkaInstrumentedConsumerImpl<byte[], byte[]> consumer = new LiKafkaInstrumentedConsumerImpl<>(
baseConsumerConfig,
null,
(baseConfig, overrideConfig) ->
new LiKafkaConsumerImpl(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> mario.getUrl());
(baseConfig, overrideConfig) -> new LiKafkaConsumerImpl<>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
mario::getUrl);

consumer.subscribe(Collections.singletonList(topic));
AtomicReference<ConsumerRecords<byte[], byte[]>> recordsRef = new AtomicReference<>(null);
Expand Down Expand Up @@ -141,4 +142,10 @@ public void testConsumerLiveConfigReload() throws Exception {
consumer.close(Duration.ofSeconds(30));
mario.close();
}

private void createTopic(String topicName, int numPartitions) throws Exception {
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -68,7 +72,12 @@ public void tearDown() {
}

@Test
public void testLargeMessage() {
public void testLargeMessage() throws Exception {
//create the test topic
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1))).all().get(1, TimeUnit.MINUTES);
}

long startTime = System.currentTimeMillis();
Properties props = new Properties();
props.setProperty("large.message.enabled", "true");
Expand Down
Loading

0 comments on commit bd85b38

Please sign in to comment.