Skip to content

Commit

Permalink
Add --application-id parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 26, 2024
1 parent 0b020e0 commit 3865e13
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 23 deletions.
2 changes: 2 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ resources:

jobs:
- template: azure/gradle/build.yml@templates
parameters:
jdkVersion: '1.17'
- template: azure/gradle/create_tag_version.yml@templates
- template: azure/gradle/upload_release.yml@templates
- template: azure/gradle/upload_snapshot.yml@templates
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.7"
id("com.bakdata.sonatype") version "1.1.7"
id("com.bakdata.sonar") version "1.1.9"
id("com.bakdata.sonatype") version "1.1.9"
id("org.hildan.github.changelog") version "1.12.1"
id("io.freefair.lombok") version "6.6.1"
}
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ configurationEnvPrefix: "APP"
streams:
# brokers: "test:9092"
# schemaRegistryUrl: "url:1234"
passApplicationId: false
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down Expand Up @@ -59,6 +60,9 @@ labels: {}

# serviceAccountName: foo

autoscaling: {}
# consumerGroup: foo

tolerations: []
# - key: "foo"
# operator: "Exists"
Expand Down
4 changes: 4 additions & 0 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.autoscaling.consumerGroup | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
1 change: 1 addition & 0 deletions charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ streams:
# schemaRegistryUrl: "url:1234"
staticMembership: false
optimizeLeaveGroupBehavior: true
passApplicationId: false
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams")
private String applicationId;
private KafkaStreams streams;
private Throwable lastException;

Expand Down Expand Up @@ -159,7 +161,9 @@ public void close() {
* This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
* state encapsulation. Could be set to: className-inputTopic-outputTopic
*/
public abstract String getUniqueAppId();
public String getUniqueAppId() {
return null;
}

/**
* Create the topology of the Kafka Streams app
Expand Down Expand Up @@ -235,6 +239,23 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
return new DefaultStreamsUncaughtExceptionHandler();
}

public final String getStreamsApplicationId() {
final String uniqueAppId = this.getUniqueAppId();
if(uniqueAppId == null) {
if(this.applicationId == null) {
throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()");
}
return this.applicationId;
}
if(this.applicationId == null) {
return uniqueAppId;
}
if(!uniqueAppId.equals(this.applicationId)) {
throw new IllegalArgumentException("Application ID provided via --application-id does not match #getUniqueAppId()");
}
return uniqueAppId;
}

/**
* <p>This method should give a default configuration to run your streaming application with.</p>
* If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value
Expand Down Expand Up @@ -271,7 +292,7 @@ protected Properties createKafkaProperties() {
kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

// topology
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId());

this.configureDefaultSerde(kafkaConfig);
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers());
Expand Down Expand Up @@ -322,7 +343,7 @@ protected void runCleanUp() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final CleanUpRunner cleanUpRunner = CleanUpRunner.builder()
.topology(this.createTopology())
.appId(this.getUniqueAppId())
.appId(this.getApplicationId())
.adminClient(adminClient)
.streams(this.streams)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -40,7 +40,7 @@ class CleanUpRunnerTest {
void createTemporaryPropertiesFile() throws IOException {
final WordCount wordCount = new WordCount();
wordCount.setInputTopics(List.of("input"));
final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(),
final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(),
wordCount.getKafkaProperties());

assertThat(file).exists();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group exists")
.contains(this.app.getUniqueAppId());
.contains(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}
Expand All @@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group is deleted")
.doesNotContain(this.app.getUniqueAppId());
.doesNotContain(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}
Expand All @@ -211,20 +211,20 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group exists")
.contains(this.app.getUniqueAppId());
.contains(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error retrieving consumer groups", e);
}

delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);

try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) {
adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all()
adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all()
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.extracting(ConsumerGroupListing::groupId)
.as("Consumer group is deleted")
.doesNotContain(this.app.getUniqueAppId());
.doesNotContain(this.app.getStreamsApplicationId());
} catch (final TimeoutException | ExecutionException e) {
throw new RuntimeException("Error deleting consumer group", e);
}
Expand All @@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException {

final String inputTopic = this.app.getInputTopic();
final String internalTopic =
this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
final String backingTopic =
this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC;

final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
Expand Down Expand Up @@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics()

final String inputSubject = this.app.getInputTopic() + "-value";
final String internalSubject =
this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
final String backingSubject =
this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value";

final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient();
Expand Down Expand Up @@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() {
this.app = this.createComplexCleanUpHookApplication();

this.runCleanUp();
final String uniqueAppId = this.app.getUniqueAppId();
final String uniqueAppId = this.app.getStreamsApplicationId();
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
Expand All @@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() {
this.app = this.createComplexCleanUpHookApplication();

this.runCleanUpWithDeletion();
final String uniqueAppId = this.app.getUniqueAppId();
final String uniqueAppId = this.app.getStreamsApplicationId();
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -49,7 +49,7 @@ void setup() {
this.app = new ComplexTopologyApplication();
this.app.setInputTopics(List.of("input", "input2"));
this.app.setOutputTopic("output");
this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId());
this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId());
}

@Test
Expand Down Expand Up @@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() {
void shouldReturnAllInternalTopics() {
assertThat(this.topologyInformation.getInternalTopics())
.hasSize(3)
.allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId())
.allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId())
|| topic.startsWith("KSTREAM-"))
.allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition"));
}
Expand Down

0 comments on commit 3865e13

Please sign in to comment.