From 3865e1344c4358b4c25f044fbb0d80f267453536 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jan 2024 09:18:32 +0100 Subject: [PATCH] Add --application-id parameter --- azure-pipelines.yml | 2 ++ build.gradle.kts | 4 +-- .../templates/job.yaml | 4 +++ charts/streams-app-cleanup-job/values.yaml | 4 +++ charts/streams-app/templates/deployment.yaml | 4 +++ charts/streams-app/values.yaml | 1 + .../kafka/KafkaStreamsApplication.java | 29 ++++++++++++++++--- .../com/bakdata/kafka/CleanUpRunnerTest.java | 4 +-- .../kafka/integration/StreamsCleanUpTest.java | 24 +++++++-------- .../kafka/util/TopologyInformationTest.java | 6 ++-- 10 files changed, 59 insertions(+), 23 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 989872b1..fc9eca7b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -23,6 +23,8 @@ resources: jobs: - template: azure/gradle/build.yml@templates + parameters: + jdkVersion: '1.17' - template: azure/gradle/create_tag_version.yml@templates - template: azure/gradle/upload_release.yml@templates - template: azure/gradle/upload_snapshot.yml@templates diff --git a/build.gradle.kts b/build.gradle.kts index 2e6f7933..2184f083 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,7 @@ plugins { id("net.researchgate.release") version "3.0.2" - id("com.bakdata.sonar") version "1.1.7" - id("com.bakdata.sonatype") version "1.1.7" + id("com.bakdata.sonar") version "1.1.9" + id("com.bakdata.sonatype") version "1.1.9" id("org.hildan.github.changelog") version "1.12.1" id("io.freefair.lombok") version "6.6.1" } diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index d074490a..eaee0047 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -112,6 +112,10 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + value: {{ .Values.autoscaling.consumerGroup | quote }} + {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" valueFrom: diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index 9fa76774..cb091e05 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -15,6 +15,7 @@ configurationEnvPrefix: "APP" streams: # brokers: "test:9092" # schemaRegistryUrl: "url:1234" + passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. @@ -59,6 +60,9 @@ labels: {} # serviceAccountName: foo +autoscaling: {} + # consumerGroup: foo + tolerations: [] # - key: "foo" # operator: "Exists" diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index cd900fa2..9f357aad 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -175,6 +175,10 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS" value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} + {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }} + - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID" + value: {{ .Values.autoscaling.consumerGroup | quote }} + {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" valueFrom: diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 0309adb6..479bb98b 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -27,6 +27,7 @@ streams: # schemaRegistryUrl: "url:1234" staticMembership: false optimizeLeaveGroupBehavior: true + passApplicationId: false config: {} # max.poll.records: 500 # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 33427103..4e66b77e 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; + @CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams") + private String applicationId; private KafkaStreams streams; private Throwable lastException; @@ -159,7 +161,9 @@ public void close() { * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal * state encapsulation. Could be set to: className-inputTopic-outputTopic */ - public abstract String getUniqueAppId(); + public String getUniqueAppId() { + return null; + } /** * Create the topology of the Kafka Streams app @@ -235,6 +239,23 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { return new DefaultStreamsUncaughtExceptionHandler(); } + public final String getStreamsApplicationId() { + final String uniqueAppId = this.getUniqueAppId(); + if(uniqueAppId == null) { + if(this.applicationId == null) { + throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()"); + } + return this.applicationId; + } + if(this.applicationId == null) { + return uniqueAppId; + } + if(!uniqueAppId.equals(this.applicationId)) { + throw new IllegalArgumentException("Application ID provided via --application-id does not match #getUniqueAppId()"); + } + return uniqueAppId; + } + /** *

This method should give a default configuration to run your streaming application with.

* If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value @@ -271,7 +292,7 @@ protected Properties createKafkaProperties() { kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); // topology - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId()); + kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId()); this.configureDefaultSerde(kafkaConfig); kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers()); @@ -322,7 +343,7 @@ protected void runCleanUp() { try (final ImprovedAdminClient adminClient = this.createAdminClient()) { final CleanUpRunner cleanUpRunner = CleanUpRunner.builder() .topology(this.createTopology()) - .appId(this.getUniqueAppId()) + .appId(this.getApplicationId()) .adminClient(adminClient) .streams(this.streams) .build(); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java index 667028a7..1553cc3e 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -40,7 +40,7 @@ class CleanUpRunnerTest { void createTemporaryPropertiesFile() throws IOException { final WordCount wordCount = new WordCount(); wordCount.setInputTopics(List.of("input")); - final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(), + final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(), wordCount.getKafkaProperties()); assertThat(file).exists(); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 8e21f9eb..03be66ee 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getUniqueAppId()); + .contains(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getUniqueAppId()); + .doesNotContain(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -211,7 +211,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group exists") - .contains(this.app.getUniqueAppId()); + .contains(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error retrieving consumer groups", e); } @@ -219,12 +219,12 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) { - adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all() + adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all() .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) .extracting(ConsumerGroupListing::groupId) .as("Consumer group is deleted") - .doesNotContain(this.app.getUniqueAppId()); + .doesNotContain(this.app.getStreamsApplicationId()); } catch (final TimeoutException | ExecutionException e) { throw new RuntimeException("Error deleting consumer group", e); } @@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException { final String inputTopic = this.app.getInputTopic(); final String internalTopic = - this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; + this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; final String backingTopic = - this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; + this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); @@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics() final String inputSubject = this.app.getInputTopic() + "-value"; final String internalSubject = - this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; + this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; final String backingSubject = - this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; + this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); @@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUp(); - final String uniqueAppId = this.app.getUniqueAppId(); + final String uniqueAppId = this.app.getStreamsApplicationId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); @@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() { this.app = this.createComplexCleanUpHookApplication(); this.runCleanUpWithDeletion(); - final String uniqueAppId = this.app.getUniqueAppId(); + final String uniqueAppId = this.app.getStreamsApplicationId(); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index 6ea870a3..1fc24a5a 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -49,7 +49,7 @@ void setup() { this.app = new ComplexTopologyApplication(); this.app.setInputTopics(List.of("input", "input2")); this.app.setOutputTopic("output"); - this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId()); + this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId()); } @Test @@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() { void shouldReturnAllInternalTopics() { assertThat(this.topologyInformation.getInternalTopics()) .hasSize(3) - .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId()) + .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId()) || topic.startsWith("KSTREAM-")) .allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition")); }