From 40318f6d9219f9e512abd452ed4dfa1516ed220f Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 7 Jan 2025 13:29:49 +0100 Subject: [PATCH] Upgrade to Kafka 3.8 and Avro 1.12 --- ...rorCapturingFlatKeyValueMapperTopologyTest.java | 10 +++++----- .../ErrorCapturingFlatTransformerTopologyTest.java | 14 +++++++------- ...rCapturingFlatValueTransformerTopologyTest.java | 6 +++--- ...ingFlatValueTransformerWithKeyTopologyTest.java | 6 +++--- .../ErrorCapturingKeyValueMapperTopologyTest.java | 10 +++++----- .../kafka/ErrorCapturingProcessorTopologyTest.java | 12 ++++++------ .../ErrorCapturingTransformerTopologyTest.java | 14 +++++++------- .../ErrorCapturingValueProcessorTopologyTest.java | 8 ++++---- ...ErrorCapturingValueTransformerTopologyTest.java | 8 ++++---- ...pturingValueTransformerWithKeyTopologyTest.java | 8 ++++---- ...orDescribingFlatKeyValueMapperTopologyTest.java | 10 +++++----- ...scribingFlatValueMapperWithKeyTopologyTest.java | 2 +- .../ErrorDescribingKeyValueMapperTopologyTest.java | 10 +++++----- .../ErrorDescribingProcessorTopologyTest.java | 12 ++++++------ .../ErrorDescribingTransformerTopologyTest.java | 14 +++++++------- ...orDescribingValueMapperWithKeyTopologyTest.java | 2 +- .../ErrorDescribingValueProcessorTopologyTest.java | 10 +++++----- ...rrorDescribingValueTransformerTopologyTest.java | 8 ++++---- ...cribingValueTransformerWithKeyTopologyTest.java | 10 +++++----- .../kafka/ErrorHeaderProcessorTopologyTest.java | 6 +++--- .../kafka/ErrorHeaderTransformerTopologyTest.java | 6 +++--- ...ErrorLoggingFlatKeyValueMapperTopologyTest.java | 4 ++-- .../ErrorLoggingFlatTransformerTopologyTest.java | 10 +++++----- .../ErrorLoggingFlatValueMapperTopologyTest.java | 4 ++-- ...rorLoggingFlatValueTransformerTopologyTest.java | 6 +++--- ...ingFlatValueTransformerWithKeyTopologyTest.java | 6 +++--- .../ErrorLoggingKeyValueMapperTopologyTest.java | 10 +++++----- .../kafka/ErrorLoggingProcessorTopologyTest.java | 12 ++++++------ .../kafka/ErrorLoggingTransformerTopologyTest.java | 14 +++++++------- .../ErrorLoggingValueProcessorTopologyTest.java | 8 ++++---- .../ErrorLoggingValueTransformerTopologyTest.java | 8 ++++---- ...LoggingValueTransformerWithKeyTopologyTest.java | 8 ++++---- .../com/bakdata/kafka/ProcessingErrorTest.java | 6 +----- 33 files changed, 139 insertions(+), 143 deletions(-) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java index 01007de..05c0f6d 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java @@ -100,7 +100,7 @@ void shouldForwardRecoverableException(final SoftAssertions softly) { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -136,7 +136,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -179,7 +179,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -204,7 +204,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -241,7 +241,7 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java index 6b4b5f1..d9ff9d9 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java @@ -125,7 +125,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -206,7 +206,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -266,7 +266,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -312,7 +312,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -355,7 +355,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -410,7 +410,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -456,7 +456,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java index 57cb37f..dbac5df 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java @@ -193,7 +193,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -251,7 +251,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -293,7 +293,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java index 770268a..e7e42e7 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java @@ -196,7 +196,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -254,7 +254,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -296,7 +296,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java index a52cc3d..e660206 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java @@ -100,7 +100,7 @@ void shouldForwardRecoverableException(final SoftAssertions softly) { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -134,7 +134,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -176,7 +176,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -202,7 +202,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -238,7 +238,7 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java index fdb3937..5e3907f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java @@ -121,7 +121,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -197,7 +197,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -262,7 +262,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -308,7 +308,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -366,7 +366,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -420,7 +420,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java index 2b2d60c..e458868 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java @@ -123,7 +123,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -203,7 +203,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -272,7 +272,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -318,7 +318,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -375,7 +375,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -427,7 +427,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -485,7 +485,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java index eed020f..4e9589b 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java @@ -195,7 +195,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -260,7 +260,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -306,7 +306,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -364,7 +364,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java index 9146d07..cd15b61 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java @@ -190,7 +190,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -252,7 +252,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -298,7 +298,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -353,7 +353,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java index d536419..c96f428 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java @@ -193,7 +193,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -255,7 +255,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -301,7 +301,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -356,7 +356,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatKeyValueMapperTopologyTest.java index b1fd07d..9b03a99 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatKeyValueMapperTopologyTest.java @@ -101,7 +101,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -121,7 +121,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -141,10 +141,10 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -159,7 +159,7 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatValueMapperWithKeyTopologyTest.java index 59262ed..5eca42c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingFlatValueMapperWithKeyTopologyTest.java @@ -134,7 +134,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingKeyValueMapperTopologyTest.java index e271851..3ddd052 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingKeyValueMapperTopologyTest.java @@ -99,7 +99,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -120,7 +120,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -142,10 +142,10 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -160,7 +160,7 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java index ea7b21d..ec65f3c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingProcessorTopologyTest.java @@ -127,7 +127,7 @@ public void process(final Record record) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -165,7 +165,7 @@ public void process(final Record record) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -201,10 +201,10 @@ public void process(final Record record) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -235,7 +235,7 @@ public void process(final Record record) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -278,7 +278,7 @@ public void process(final Record record) { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingTransformerTopologyTest.java index f2daa89..c384d82 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingTransformerTopologyTest.java @@ -142,7 +142,7 @@ public KeyValue transform(final Integer key, final String value) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -190,7 +190,7 @@ public KeyValue transform(final Integer key, final String value) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -232,10 +232,10 @@ public KeyValue transform(final Integer key, final String value) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -271,7 +271,7 @@ public KeyValue transform(final Integer key, final String value) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -318,7 +318,7 @@ public KeyValue transform(final Integer key, final String value) { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -371,7 +371,7 @@ public KeyValue transform(final Integer key, final String value) { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueMapperWithKeyTopologyTest.java index 6bd3963..d03920f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueMapperWithKeyTopologyTest.java @@ -139,7 +139,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java index 67c84f7..58ed2e4 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueProcessorTopologyTest.java @@ -128,7 +128,7 @@ public void process(final FixedKeyRecord record) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -166,7 +166,7 @@ public void process(final FixedKeyRecord record) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -202,10 +202,10 @@ public void process(final FixedKeyRecord record) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -236,7 +236,7 @@ public void process(final FixedKeyRecord record) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerTopologyTest.java index c9d70e3..dbb5719 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerTopologyTest.java @@ -134,7 +134,7 @@ public Long transform(final String value) { .hasMessage("Cannot process " + ErrorUtil.toString("foo")) ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -175,7 +175,7 @@ public Long transform(final String value) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -219,7 +219,7 @@ public Long transform(final String value) { .hasMessage("Cannot process " + ErrorUtil.toString(null)) ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -253,7 +253,7 @@ public Long transform(final String value) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerWithKeyTopologyTest.java index e3793e0..1296374 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorDescribingValueTransformerWithKeyTopologyTest.java @@ -135,7 +135,7 @@ public Long transform(final Integer key, final String value) { "Cannot process ('" + ErrorUtil.toString(1) + "', '" + ErrorUtil.toString("foo") + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -176,7 +176,7 @@ public Long transform(final Integer key, final String value) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -218,10 +218,10 @@ public Long transform(final Integer key, final String value) { .add(null, null)) .satisfies(e -> softly.assertThat(e.getCause()) .hasMessage("Cannot process ('" + ErrorUtil.toString(null) + "', '" + ErrorUtil.toString(null) - + "')") + + "')") ); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -255,7 +255,7 @@ public Long transform(final Integer key, final String value) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java index 46aafb4..829bf9f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java @@ -90,7 +90,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -134,7 +134,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -160,7 +160,7 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(1, "foo"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderTransformerTopologyTest.java index 172283b..b087b01 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderTransformerTopologyTest.java @@ -90,7 +90,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -134,7 +134,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -160,7 +160,7 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(1, "foo"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java index e3ab0c9..bbb3df6 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java @@ -106,8 +106,8 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) { when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() - .withValueSerde(STRING_SERDE) - .add(1, "foo")) + .withValueSerde(STRING_SERDE) + .add(1, "foo")) .isEqualTo(throwable); } diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java index 37f8d7c..f775339 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java @@ -116,7 +116,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -184,7 +184,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -222,7 +222,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -260,7 +260,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -294,7 +294,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java index a22ec11..0a1ee9b 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java @@ -104,8 +104,8 @@ void shouldNotCaptureThrowable(final SoftAssertions softly) { when(this.mapper.apply("foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() - .withValueSerde(STRING_SERDE) - .add(1, "foo")) + .withValueSerde(STRING_SERDE) + .add(1, "foo")) .isEqualTo(throwable); } diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java index 7d17eea..fb36651 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java @@ -181,7 +181,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -218,7 +218,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -255,7 +255,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java index e3d0911..240c469 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java @@ -183,7 +183,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -220,7 +220,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -257,7 +257,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java index 9ee3d19..15f444c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java @@ -93,7 +93,7 @@ void shouldForwardRecoverableException(final SoftAssertions softly) { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -121,7 +121,7 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -142,7 +142,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -163,7 +163,7 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -178,7 +178,7 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java index 8c1e7da..99a2c22 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java @@ -113,7 +113,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -182,7 +182,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -225,7 +225,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -265,7 +265,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -301,7 +301,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -349,7 +349,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java index cee547d..f704da3 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java @@ -114,7 +114,7 @@ public void close() { .add(1, "foo")) .hasCause(throwable); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -188,7 +188,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -236,7 +236,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -277,7 +277,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -313,7 +313,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -360,7 +360,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -413,7 +413,7 @@ public void close() { .add(2, "bar") .add(3, "baz"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) + .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java index cb5f8c2..6edc0d2 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java @@ -182,7 +182,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -225,7 +225,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -265,7 +265,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -301,7 +301,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java index fa77f3b..bb7a605 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java @@ -181,7 +181,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -222,7 +222,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -263,7 +263,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -297,7 +297,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java index 3cb103b..65e7898 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java @@ -181,7 +181,7 @@ public void close() { .add(1, "foo") .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -222,7 +222,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -263,7 +263,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(null, null); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) @@ -297,7 +297,7 @@ public void close() { .withValueSerde(STRING_SERDE) .add(2, "bar"); final List> records = this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(INTEGER_SERDE) + .withKeySerde(INTEGER_SERDE) .withValueSerde(LONG_SERDE) .toList(); softly.assertThat(records) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ProcessingErrorTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ProcessingErrorTest.java index 96a13cd..c3d9914 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ProcessingErrorTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ProcessingErrorTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -26,11 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNullPointerException; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; -import java.io.PrintWriter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock;