Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create v3 #250

Merged
merged 20 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3c8880c
Separate CLI and application (#207)
philipp94831 Jul 19, 2024
632b7fe
Remove guava dependency (#237)
philipp94831 Jul 23, 2024
be8ffc8
Use KAFKA_ as prefix for environment Kafka config (#209)
philipp94831 Jul 23, 2024
8dc843c
Add HostInfo to ImprovedStreamsConfig (#230)
raminqaf Jul 23, 2024
9b67bc7
Remove log4j dependency and debug parameter (#238)
philipp94831 Jul 24, 2024
cc19d5e
Add default serialization config to apps (#239)
philipp94831 Jul 25, 2024
1a4b565
Rename streams section to kafka in Helm charts (#241)
philipp94831 Jul 25, 2024
3a4af0b
Rename extra topics to labeled topics (#240)
philipp94831 Jul 26, 2024
ecf41c8
Rename `--brokers` to `--bootstrap-servers` (#242)
philipp94831 Jul 26, 2024
878e22b
Add CLI parameter to specify streams application ID (#243)
philipp94831 Jul 29, 2024
c347337
Remove unnecessary schema registry configurations in tests (#248)
philipp94831 Jul 29, 2024
957e4c1
Replace Guava usages (#246)
philipp94831 Jul 29, 2024
a88a343
Rename TestTopologyFactory (#249)
philipp94831 Jul 29, 2024
d1bfa89
Make CleanUpRunner closeable (#247)
philipp94831 Jul 29, 2024
8b34869
Rename Streams section in Helm chart docs (#252)
philipp94831 Aug 5, 2024
935b61d
Fix Sonarqube issues (#253)
philipp94831 Aug 7, 2024
6c1db53
Validate autoscaling mandatory fields when it is enabled (#254)
raminqaf Aug 8, 2024
8a75197
Validate persistence mandatory chart values (persistence.size) (#255)
raminqaf Aug 8, 2024
49e7122
Add hook to prepare running of app (#256)
philipp94831 Aug 20, 2024
a41aa8a
Pre-bump version to 3.0.1-SNAPSHOT (#257)
philipp94831 Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 94 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ You can add streams-bootstrap via Maven Central.
#### Gradle

```gradle
compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'
implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '3.0.0'
```

With Kotlin DSL

```gradle
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "3.0.0")
```

#### Maven
Expand All @@ -35,8 +41,8 @@ compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'

<dependency>
<groupId>com.bakdata.kafka</groupId>
<artifactId>streams-bootstrap</artifactId>
<version>2.1.1</version>
<artifactId>streams-bootstrap-cli</artifactId>
<version>3.0.0</version>
</dependency>
```

Expand All @@ -52,46 +58,61 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import java.util.Properties;
import org.apache.kafka.streams.StreamsBuilder;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyStreamsApplication(), args);
}

@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> input =
builder.<String, String>stream(this.getInputTopics());
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();

// your topology
// your topology

input.to(this.getOutputTopic());
}
input.to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId() {
return "streams-bootstrap-app";
}
@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```

The following configuration options are available:

- `--brokers`: List of Kafka brokers (comma-separated) (**required**)
- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--input-topics`: List of input topics (comma-separated)

- `--input-pattern`: Pattern of input topics
Expand All @@ -100,74 +121,91 @@ The following configuration options are available:

- `--error-topic`: A topic to write errors to

- `--streams-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--extra-input-topics`: Additional named input topics if you need to specify multiple topics with different message
types (`<String=String>[,<String=String>...]`)
- `--labeled-input-topics`: Additional labeled input topics if you need to specify multiple topics with different
message types (`<String=String>[,<String=String>...]`)

- `--extra-input-patterns`: Additional named input patterns if you need to specify multiple topics with different
- `--labeled-input-patterns`: Additional labeled input patterns if you need to specify multiple topics with different
message types (`<String=String>[,<String=String>...]`)

- `--extra-output-topics`: Additional named output topics if you need to specify multiple topics with different message
types (`String=String>[,<String=String>...]`)
- `--labeled-output-topics`: Additional labeled output topics if you need to specify multiple topics with different
message types (`String=String>[,<String=String>...]`)

- `--application-id`: Unique application ID to use for Kafka Streams. Can also be provided by
implementing `StreamsApp#getUniqueAppId()`

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--clean-up`: Whether the state of the Kafka Streams app, i.e., offsets and state stores and auto-created topics,
should be cleared instead of running the app
Additionally, the following commands are available:

- `--delete-output`: Whether the output topics with their associated schemas and the consumer group should be deleted
during the cleanup
- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
topics associated with the Kafka Streams application.

- `--debug`: Configure logging to debug
- `reset`: Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams
application.

#### Kafka producer

Create a subclass of `KafkaProducerApplication`.

```java
import com.bakdata.kafka.KafkaProducerApplication;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducerApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyProducerApplication(), args);
}

@Override
protected void runApplication() {
try (final KafkaProducer<Object, Object> producer = this.createProducer()) {
// your producer
public ProducerApp createApp(final boolean cleanUp) {
return new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
try (final Producer<Object, Object> producer = builder.createProducer()) {
// your producer
}
};
}
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```

The following configuration options are available:

- `--brokers`: List of Kafka brokers (comma-separated) (**required**)
- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**)

- `--schema-registry-url`: The URL of the Schema Registry

- `--output-topic`: The output topic
- `--kafka-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)

- `--streams-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)
- `--output-topic`: The output topic

- `--extra-output-topics`: Additional named output topics (`String=String>[,<String=String>...]`)
- `--labeled-output-topics`: Additional labeled output topics (`String=String>[,<String=String>...]`)

- `--clean-up`: Whether the output topics and associated schemas of the producer app should be deleted instead of
running the app
Additionally, the following commands are available:

- `--debug`: Configure logging to debug
- `clean`: Delete all output topics associated with the Kafka Producer application.

### Helm Charts

Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
}

Expand All @@ -16,6 +16,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
}
}

Expand Down
32 changes: 14 additions & 18 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,30 @@ spec:
imagePullPolicy: "{{ .Values.imagePullPolicy }}"
resources:
{{ toYaml .Values.resources | indent 12 }}
args:
- clean
env:
- name: ENV_PREFIX
value: {{ .Values.configurationEnvPrefix }}_
{{- range $key, $value := .Values.streams.config }}
- name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }}
{{- range $key, $value := .Values.kafka.config }}
- name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }}
value: {{ $value | quote }}
{{- end }}
{{- if hasKey .Values.streams "brokers" }}
- name: "{{ .Values.configurationEnvPrefix }}_BROKERS"
value: {{ .Values.streams.brokers | quote }}
{{- if hasKey .Values.kafka "bootstrapServers" }}
- name: "{{ .Values.configurationEnvPrefix }}_BOOTSTRAP_SERVERS"
value: {{ .Values.kafka.bootstrapServers | quote }}
{{- end }}
{{- if hasKey .Values.streams "schemaRegistryUrl" }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
- name: "{{ .Values.configurationEnvPrefix }}_CLEAN_UP"
value: "true"
{{- if hasKey .Values.streams "outputTopic" }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
value: {{ .Values.kafka.outputTopic | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "extraOutputTopics") (.Values.streams.extraOutputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_OUTPUT_TOPICS"
value: "{{- range $key, $value := .Values.streams.extraOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
{{- if and (hasKey .Values.kafka "labeledOutputTopics") (.Values.kafka.labeledOutputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_LABELED_OUTPUT_TOPICS"
value: "{{- range $key, $value := .Values.kafka.labeledOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
Expand Down
10 changes: 4 additions & 6 deletions charts/producer-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ files: {}
# mountPath: app/resources
# content: "foo bar"

streams:
# brokers: "test:9092"
kafka:
# bootstrapServers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
# max.request.size: "1000000"
# outputTopic: output
extraOutputTopics: {}
# role: output
labeledOutputTopics: {}
# label: output

commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
Loading
Loading