Skip to content

Commit

Permalink
update kafka clients and mario client, switch to shaded mario client (#…
Browse files Browse the repository at this point in the history
…160)

Signed-off-by: Radai Rosenblatt <[email protected]>
  • Loading branch information
radai-rosenblatt authored Nov 25, 2019
1 parent ef41616 commit 174bc75
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ project.ext {
url "https://github.com/linkedin/li-apache-kafka-clients"
}
}
liKafkaVersion = "2.0.0.21"
marioVersion = "0.0.23"
liKafkaVersion = "2.0.0.23"
marioVersion = "0.0.24"
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public Properties overridingProps() {
Properties props = new Properties();
props.setProperty(KafkaConfig.NumPartitionsProp(), Integer.toString(NUM_PARTITIONS));
props.setProperty(KafkaConfig.LogRetentionTimeMillisProp(), "" + TimeUnit.DAYS.toMillis(1));
props.setProperty(KafkaConfig.LogRollTimeMillisProp(), "" + TimeUnit.SECONDS.toMillis(5)); //makes retention kick-in faster
props.setProperty(KafkaConfig.LogDeleteDelayMsProp(), "100"); //makes retention kick-in faster
props.setProperty(KafkaConfig.LogCleanerBackoffMsProp(), "" + TimeUnit.SECONDS.toMillis(1)); //makes retention kick-in faster
props.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), "" + TimeUnit.SECONDS.toMillis(10)); //makes retention kick-in faster

return props;
}

Expand Down Expand Up @@ -1497,15 +1502,16 @@ public void testFallOffStartWithLiClosest() throws Exception {
long currentLso = initialLso;
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(initialLso, String.valueOf(consumer.safeOffset(tp)))));
//wait for broker to truncate data that we have not read (we never called poll())
long giveUp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
long timeout = TimeUnit.MINUTES.toMillis(5);
long giveUp = System.currentTimeMillis() + timeout;
while (currentLso == initialLso && System.currentTimeMillis() < giveUp) {
Thread.sleep(5000);
Thread.sleep(1000);
currentLso = consumer.beginningOffsets(tpAsCollection).get(tp);
produceRecordsWithKafkaProducer();
}
if (currentLso == initialLso) {
throw new IllegalStateException("nothing was truncated broker-side within timeout. LogStartOffset = " +
currentLso + " remains the same after " + giveUp + "ms.");
currentLso + " remains the same after " + timeout + "ms.");
}
truncatedStartOffset = currentLso;
}
Expand Down
4 changes: 2 additions & 2 deletions li-apache-kafka-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ plugins {
}

dependencies {
compile "org.apache.httpcomponents:httpclient:4.5.7"
compile "com.linkedin.kafka:kafka-clients:${rootProject.ext.liKafkaVersion}"
compile "com.linkedin.mario:mario-client:${rootProject.ext.marioVersion}"
compile "com.linkedin.mario:mario-client-all:${rootProject.ext.marioVersion}"
compile "com.linkedin.mario:common:${rootProject.ext.marioVersion}"

testCompile "org.mockito:mockito-core:2.24.0"
}

Expand Down

0 comments on commit 174bc75

Please sign in to comment.