Skip to content

Commit

Permalink
Upgrade to Kafka 3.8 (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 8, 2025
1 parent 677c5a6 commit c062c09
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 30 deletions.
14 changes: 7 additions & 7 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ version=3.2.1-SNAPSHOT
org.gradle.caching=true
# running Kafka Streams in parallel causes problems with colliding consumer groups
org.gradle.parallel=false
kafkaVersion=3.7.1
kafkaVersion=3.8.1
testContainersVersion=1.20.4
confluentVersion=7.7.0
fluentKafkaVersion=2.15.0
junitVersion=5.10.2
mockitoVersion=5.11.0
assertJVersion=3.25.3
log4jVersion=2.23.1
confluentVersion=7.8.0
fluentKafkaVersion=2.16.0
junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
log4jVersion=2.24.3
org.gradle.jvmargs=-Xmx4096m
2 changes: 1 addition & 1 deletion streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {

dependencies {
api(project(":streams-bootstrap-core"))
api(group = "info.picocli", name = "picocli", version = "4.7.5")
api(group = "info.picocli", name = "picocli", version = "4.7.6")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class)
.from(outputTopic, TIMEOUT);
return records.stream()
.map(record -> new KeyValue<>(record.key(), record.value()))
.map(consumerRecord -> new KeyValue<>(consumerRecord.key(), consumerRecord.value()))
.collect(Collectors.toList());
}

Expand All @@ -213,7 +213,7 @@ private KafkaStreamsApplication<?> createWordCountApplication() {
private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
application.setKafkaConfig(Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
return application;
Expand Down
10 changes: 5 additions & 5 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ dependencies {
val confluentVersion: String by project
implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion)
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
api(
implementation(
group = "org.slf4j",
name = "slf4j-api",
version = "2.0.9"
) // required because other dependencies use Slf4j 1.x which is not properly resolved if this library is used in test scope
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
version = "2.0.16"
)
implementation(group = "org.jooq", name = "jool", version = "0.9.15")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testImplementation(group = "org.junit-pioneer", name = "junit-pioneer", version = "2.2.0")
testImplementation(group = "org.junit-pioneer", name = "junit-pioneer", version = "2.3.0")
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public static ConsumerGroupClient create(final Map<String, Object> configs, fina
return new ConsumerGroupClient(AdminClient.create(configs), timeout);
}

private static KafkaAdminException failedToDeleteGroup(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to delete consumer group " + groupName, ex);
}

private static KafkaAdminException failedToListGroups(final Throwable ex) {
return new KafkaAdminException("Failed to list consumer groups", ex);
}

/**
* Delete a consumer group.
*
Expand All @@ -74,14 +82,14 @@ public void deleteConsumerGroup(final String groupName) {
log.info("Deleted consumer group '{}'", groupName);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex);
throw failedToDeleteGroup(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex);
throw failedToDeleteGroup(groupName, ex);
} catch (final TimeoutException ex) {
throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex);
throw failedToDeleteGroup(groupName, ex);
}
}

Expand Down Expand Up @@ -115,14 +123,14 @@ public Collection<ConsumerGroupListing> listGroups() {
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new KafkaAdminException("Failed to list consumer groups", ex);
throw failedToListGroups(ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw new KafkaAdminException("Failed to list consumer groups", ex);
throw failedToListGroups(ex);
} catch (final TimeoutException ex) {
throw new KafkaAdminException("Failed to list consumer groups", ex);
throw failedToListGroups(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private List<KeyValue<String, String>> readOutputTopic(final String outputTopic)
final List<ConsumerRecord<String, String>> records =
this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L));
return records.stream()
.map(record -> new org.apache.kafka.streams.KeyValue<>(record.key(), record.value()))
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private List<KeyValue<String, String>> readOutputTopic(final String outputTopic)
final List<ConsumerRecord<String, String>> records =
this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L));
return records.stream()
.map(record -> new KeyValue<>(record.key(), record.value()))
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class StreamsCleanUpRunnerTest extends KafkaTest {
@Mock
private TopicHook topicHook;

static <K, V> KeyValue<K, V> toKeyValue(final ConsumerRecord<K, V> consumerRecord) {
return new KeyValue<>(consumerRecord.key(), consumerRecord.value());
}

private static ConfiguredStreamsApp<StreamsApp> createWordCountPatternApplication() {
return configureApp(new WordCountPattern(), StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*_topic"))
Expand Down Expand Up @@ -547,7 +551,6 @@ void shouldDeleteSchemaOfInternalTopics()
}
}


@Test
void shouldDeleteSchemaOfIntermediateTopics()
throws InterruptedException, IOException, RestClientException {
Expand Down Expand Up @@ -717,7 +720,7 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class)
.from(outputTopic, TIMEOUT);
return records.stream()
.map(record -> new KeyValue<>(record.key(), record.value()))
.map(StreamsCleanUpRunnerTest::toKeyValue)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ static Thread run(final StreamsRunner runner) {

static ConfiguredStreamsApp<StreamsApp> configureApp(final StreamsApp app, final StreamsTopicConfig topics) {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(topics, Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
return new ConfiguredStreamsApp<>(app, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@
@UtilityClass
public class TestUtil {
public static KafkaContainer newKafkaCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka:3.7.1"));
return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1"));
}
}
2 changes: 1 addition & 1 deletion streams-bootstrap-large-messages/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ description = "Utils for using Large Message SerDe with your Kafka Streams Appli

dependencies {
api(project(":streams-bootstrap-core"))
implementation(group = "com.bakdata.kafka", name = "large-message-core", version = "2.6.0")
implementation(group = "com.bakdata.kafka", name = "large-message-core", version = "2.8.0")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -77,12 +77,16 @@ public static <T> T registerTopicHook(

@RequiredArgsConstructor
private static class LargeMessageTopicHook implements TopicHook {
//TODO: close storer once it implements AutoCloseable
private final @NonNull LargeMessageStoringClient storer;

@Override
public void deleted(final String topic) {
this.storer.deleteAllFiles(topic);
}

@Override
public void close() {
this.storer.close();
}
}
}

0 comments on commit c062c09

Please sign in to comment.