Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 9, 2024
1 parent 335d736 commit 9c35f21
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
*/
@Override
public void clean() {
final ImprovedAdminClient adminClient = this.createAdminClient();
final Task task = new Task(adminClient);
task.clean();
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final Task task = new Task(adminClient);
task.clean();
}
}

private ImprovedAdminClient createAdminClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,21 @@ private static Collection<String> filterExistingTopics(final Collection<String>
*/
@Override
public void clean() {
final ImprovedAdminClient adminClient = this.createAdminClient();
final Task task = new Task(adminClient);
task.cleanAndReset();
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final Task task = new Task(adminClient);
task.cleanAndReset();
}
}

/**
* Clean up your Streams app by resetting all state stores, consumer group offsets, and internal topics, deleting
* local state.
*/
public void reset() {
final ImprovedAdminClient adminClient = this.createAdminClient();
final Task task = new Task(adminClient);
task.reset();
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final Task task = new Task(adminClient);
task.reset();
}
}

private Map<String, Object> getKafkaProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.util.SchemaTopicClient.createSchemaRegistryClient;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -41,11 +46,12 @@
* Provide methods for common operations when performing administrative actions on a Kafka cluster
*/
@Builder(access = AccessLevel.PRIVATE)
public final class ImprovedAdminClient {
public final class ImprovedAdminClient implements Closeable {

private static final Duration ADMIN_TIMEOUT = Duration.ofSeconds(10L);
private final @NonNull Supplier<AdminClient> adminClientFactory;
private final @NonNull Supplier<Optional<SchemaRegistryClient>> schemaRegistryClientFactory;
@Getter
private final @NonNull AdminClient adminClient;
private final SchemaRegistryClient schemaRegistryClient;
private final @NonNull Duration timeout;

/**
Expand All @@ -67,37 +73,43 @@ public static ImprovedAdminClient create(@NonNull final Map<String, Object> prop
@NonNull final Duration timeout) {
Preconditions.checkNotNull(properties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
"%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
final AdminClient adminClient = AdminClient.create(properties);
final String schemaRegistryUrl =
(String) properties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
final SchemaRegistryClient schemaRegistryClient =
schemaRegistryUrl == null ? null : createSchemaRegistryClient(properties, schemaRegistryUrl);
return builder()
.adminClientFactory(() -> AdminClient.create(properties))
.schemaRegistryClientFactory(() -> createSchemaRegistryClient(properties))
.adminClient(adminClient)
.schemaRegistryClient(schemaRegistryClient)
.timeout(timeout)
.build();
}

private static Optional<SchemaRegistryClient> createSchemaRegistryClient(final Map<String, Object> properties) {
final String schemaRegistryUrl =
(String) properties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
return schemaRegistryUrl == null ? Optional.empty()
: Optional.of(SchemaTopicClient.createSchemaRegistryClient(properties, schemaRegistryUrl));
}

public Optional<SchemaRegistryClient> getSchemaRegistryClient() {
return this.schemaRegistryClientFactory.get();
return Optional.ofNullable(this.schemaRegistryClient);
}

public SchemaTopicClient getSchemaTopicClient() {
return new SchemaTopicClient(this.getTopicClient(), this.getSchemaRegistryClient().orElse(null));
return new SchemaTopicClient(this.getTopicClient(), this.schemaRegistryClient);
}

public TopicClient getTopicClient() {
return new TopicClient(this.getAdminClient(), this.timeout);
return new TopicClient(this.adminClient, this.timeout);
}

public ConsumerGroupClient getConsumerGroupClient() {
return new ConsumerGroupClient(this.getAdminClient(), this.timeout);
return new ConsumerGroupClient(this.adminClient, this.timeout);
}

public AdminClient getAdminClient() {
return this.adminClientFactory.get();
@Override
public void close() {
this.adminClient.close();
if (this.schemaRegistryClient != null) {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ public void resetSchemaRegistry(final String topic) {
@Override
public void close() {
this.topicClient.close();
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
if (this.schemaRegistryClient != null) {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}
}

0 comments on commit 9c35f21

Please sign in to comment.