Skip to content

Commit

Permalink
Upgrade to Kafka 3.8 and Avro 1.12
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 7, 2025
1 parent 5a9a531 commit 89e73bc
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 44 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
}

Expand All @@ -16,6 +16,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
}
}

Expand Down
1 change: 0 additions & 1 deletion error-handling-avro/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dependencies {
val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(testFixtures(project(":error-handling-core")))
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val assertJVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,7 +30,7 @@
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -44,7 +44,6 @@
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -81,8 +80,8 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}
Expand All @@ -95,14 +94,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.add(1, "foo")
.add(2, "bar");

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
final List<ProducerRecord<Integer, DeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class)
.toList();

this.softly.assertThat(errors)
Expand Down
4 changes: 2 additions & 2 deletions error-handling-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ dependencies {
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
val avroVersion: String by project
implementation(group = "org.apache.avro", name = "avro", version = avroVersion)
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.14.0")
implementation(group = "org.jooq", name = "jool", version = "0.9.15")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.17.0")

val junitVersion: String by project
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -25,7 +25,8 @@
package com.bakdata.kafka;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serdes.IntegerSerde;
Expand All @@ -48,20 +49,20 @@ void tearDown() {
}
}

protected Properties getKafkaProperties() {
final Properties kafkaConfig = new Properties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaConfig = new HashMap<>();

// exactly once and order
kafkaConfig.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1);

kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// topology
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "fake");
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "fake");
kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, IntegerSerde.class);
kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TestDeadLetterSerde.class);
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "fake");
kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "fake");

return kafkaConfig;
}
Expand Down
3 changes: 1 addition & 2 deletions error-handling-proto/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description = "Transform dead letters in Kafka Streams applications to protobuf."

plugins {
id("com.google.protobuf") version "0.9.1"
id("com.google.protobuf") version "0.9.4"
}

val protobufVersion: String by project
Expand All @@ -11,7 +11,6 @@ dependencies {
val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(testFixtures(project(":error-handling-core")))
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val assertJVersion: String by project
Expand Down
2 changes: 1 addition & 1 deletion error-handling-proto/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
protobufVersion=3.21.12
protobufVersion=3.25.5
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -40,7 +40,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -55,7 +54,6 @@
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -92,9 +90,9 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
kafkaProperties.setProperty(
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class.getName());
kafkaProperties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
ProtoDeadLetter.class);
Expand All @@ -106,7 +104,7 @@ protected void createTopology() {
final StreamsBuilder builder = new StreamsBuilder();
this.buildTopology(builder);
final Topology topology = builder.build();
final Properties kafkaProperties = this.getKafkaProperties();
final Map<String, Object> kafkaProperties = this.getKafkaProperties();
final SchemaRegistryMock schemaRegistryMock = new SchemaRegistryMock(List.of(new ProtobufSchemaProvider()));
this.topology = new TestTopology<Integer, String>(topology, kafkaProperties)
.withSchemaRegistryMock(schemaRegistryMock);
Expand All @@ -124,14 +122,14 @@ void shouldConvertAndSerializeProtoDeadLetter() {
.add(1, "foo")
.add(2, "bar");

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, ProtoDeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(ProtoDeadLetter.class))
final List<ProducerRecord<Integer, ProtoDeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(ProtoDeadLetter.class)
.toList();

this.softly.assertThat(errors)
Expand Down
18 changes: 9 additions & 9 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ version=1.5.1-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx2048m
kafkaVersion=3.5.2
avroVersion=1.11.3
confluentVersion=7.5.1
jacksonVersion=2.17.0
junitVersion=5.10.2
mockitoVersion=5.11.0
log4jVersion=2.23.1
kafkaStreamsTestsVersion=2.11.1
assertJVersion=3.25.3
kafkaVersion=3.8.1
avroVersion=1.12.0
confluentVersion=7.8.0
jacksonVersion=2.18.2
junitVersion=5.11.4
mockitoVersion=5.15.2
log4jVersion=2.24.3
kafkaStreamsTestsVersion=2.15.1-SNAPSHOT
assertJVersion=3.27.2

0 comments on commit 89e73bc

Please sign in to comment.