Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 19, 2024
1 parent 776832c commit 6b58995
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 88 deletions.
35 changes: 22 additions & 13 deletions streams-bootstrap/src/main/java/com/bakdata/kafka/Configurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.Preconfigured.key;
import static com.bakdata.kafka.Preconfigured.value;
import static com.bakdata.kafka.Preconfigured.create;

import java.util.Map;
import lombok.NonNull;
Expand All @@ -48,7 +47,7 @@ public class Configurator {
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureForValues(final Serde<T> serde) {
return this.configure(value(serde));
return this.configureForValues(create(serde));
}

/**
Expand All @@ -59,7 +58,7 @@ public <T> Serde<T> configureForValues(final Serde<T> serde) {
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureForValues(final Serde<T> serde, final Map<String, Object> configOverrides) {
return this.configure(value(serde, configOverrides));
return this.configureForValues(create(serde, configOverrides));
}

/**
Expand All @@ -69,7 +68,7 @@ public <T> Serde<T> configureForValues(final Serde<T> serde, final Map<String, O
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureForKeys(final Serde<T> serde) {
return this.configure(key(serde));
return this.configureForKeys(create(serde));
}

/**
Expand All @@ -80,7 +79,7 @@ public <T> Serde<T> configureForKeys(final Serde<T> serde) {
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureForKeys(final Serde<T> serde, final Map<String, Object> configOverrides) {
return this.configure(key(serde, configOverrides));
return this.configureForKeys(create(serde, configOverrides));
}

/**
Expand All @@ -90,7 +89,7 @@ public <T> Serde<T> configureForKeys(final Serde<T> serde, final Map<String, Obj
* @param <T> type to be (de-)serialized
*/
public <T> Serializer<T> configureForValues(final Serializer<T> serializer) {
return this.configure(value(serializer));
return this.configureForValues(create(serializer));
}

/**
Expand All @@ -102,7 +101,7 @@ public <T> Serializer<T> configureForValues(final Serializer<T> serializer) {
*/
public <T> Serializer<T> configureForValues(final Serializer<T> serializer,
final Map<String, Object> configOverrides) {
return this.configure(value(serializer, configOverrides));
return this.configureForValues(create(serializer, configOverrides));
}

/**
Expand All @@ -112,7 +111,7 @@ public <T> Serializer<T> configureForValues(final Serializer<T> serializer,
* @param <T> type to be (de-)serialized
*/
public <T> Serializer<T> configureForKeys(final Serializer<T> serializer) {
return this.configure(key(serializer));
return this.configureForKeys(create(serializer));
}

/**
Expand All @@ -124,17 +123,27 @@ public <T> Serializer<T> configureForKeys(final Serializer<T> serializer) {
*/
public <T> Serializer<T> configureForKeys(final Serializer<T> serializer,
final Map<String, Object> configOverrides) {
return this.configure(key(serializer, configOverrides));
return this.configureForKeys(create(serializer, configOverrides));
}

/**
* Configure a {@code Preconfigured} object using {@link #kafkaProperties}
* Configure a {@code Preconfigured} for values object using {@link #kafkaProperties}
* @param preconfigured pre-configured {@link Serde} or {@link Serializer}
* @return configured instance
* @param <T> type of configured instance
*/
public <T> T configure(final Preconfigured<T> preconfigured) {
return preconfigured.configure(this.kafkaProperties);
public <T> T configureForValues(final Preconfigured<T> preconfigured) {
return preconfigured.configureForValues(this.kafkaProperties);
}

/**
* Configure a {@code Preconfigured} for keys object using {@link #kafkaProperties}
* @param preconfigured pre-configured {@link Serde} or {@link Serializer}
* @return configured instance
* @param <T> type of configured instance
*/
public <T> T configureForKeys(final Preconfigured<T> preconfigured) {
return preconfigured.configureForKeys(this.kafkaProperties);
}

}
106 changes: 31 additions & 75 deletions streams-bootstrap/src/main/java/com/bakdata/kafka/Preconfigured.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,102 +43,53 @@
public final class Preconfigured<T> {
private final @NonNull Configurable<T> configurable;
private final @NonNull Map<String, Object> configOverrides;
private final boolean isKey;

/**
* Pre-configure a {@code Serde} for values
* Pre-configure a {@code Serde}
* @param serde {@code Serde} to pre-configure
* @return pre-configured serde
* @param <S> type of {@link Serde}
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <S extends Serde<T>, T> Preconfigured<S> value(final S serde) {
return value(configurable(serde));
public static <S extends Serde<T>, T> Preconfigured<S> create(final S serde) {
return create(configurable(serde));
}

/**
* Pre-configure a {@code Serde} for values with config overrides
* Pre-configure a {@code Serde} with config overrides
* @param serde {@code Serde} to pre-configure
* @param configOverrides configs passed to {@link Serde#configure(Map, boolean)}
* @return pre-configured serde
* @param <S> type of {@link Serde}
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <S extends Serde<T>, T> Preconfigured<S> value(final S serde,
public static <S extends Serde<T>, T> Preconfigured<S> create(final S serde,
final Map<String, Object> configOverrides) {
return value(configurable(serde), configOverrides);
return create(configurable(serde), configOverrides);
}

/**
* Pre-configure a {@code Serde} for keys
* @param serde {@code Serde} to pre-configure
* @return pre-configured serde
* @param <S> type of {@link Serde}
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <S extends Serde<T>, T> Preconfigured<S> key(final S serde) {
return key(configurable(serde));
}

/**
* Pre-configure a {@code Serde} for keys with config overrides
* @param serde {@code Serde} to pre-configure
* @param configOverrides configs passed to {@link Serde#configure(Map, boolean)}
* @return pre-configured serde
* @param <S> type of {@link Serde}
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <S extends Serde<T>, T> Preconfigured<S> key(final S serde,
final Map<String, Object> configOverrides) {
return key(configurable(serde), configOverrides);
}

/**
* Pre-configure a {@code Serializer} for values
* Pre-configure a {@code Serializer}
* @param serializer {@code Serializer} to pre-configure
* @return pre-configured serializer
* @param <S> type of {@link Serializer}
* @param <T> type serialized by the {@code Serializer}
*/
public static <S extends Serializer<T>, T> Preconfigured<S> value(final S serializer) {
return value(configurable(serializer));
public static <S extends Serializer<T>, T> Preconfigured<S> create(final S serializer) {
return create(configurable(serializer));
}

/**
* Pre-configure a {@code Serializer} for values
* Pre-configure a {@code Serializer}
* @param serializer {@code Serializer} to pre-configure
* @param configOverrides configs passed to {@link Serializer#configure(Map, boolean)}
* @return pre-configured serializer
* @param <S> type of {@link Serializer}
* @param <T> type serialized by the {@code Serializer}
*/
public static <S extends Serializer<T>, T> Preconfigured<S> value(final S serializer,
public static <S extends Serializer<T>, T> Preconfigured<S> create(final S serializer,
final Map<String, Object> configOverrides) {
return value(configurable(serializer), configOverrides);
}

/**
* Pre-configure a {@code Serializer} for keys
* @param serializer {@code Serializer} to pre-configure
* @return pre-configured serializer
* @param <S> type of {@link Serializer}
* @param <T> type serialized by the {@code Serializer}
*/
public static <S extends Serializer<T>, T> Preconfigured<S> key(final S serializer) {
return key(configurable(serializer));
}

/**
* Pre-configure a {@code Serializer} for keys
* @param serializer {@code Serializer} to pre-configure
* @param configOverrides configs passed to {@link Serializer#configure(Map, boolean)}
* @return pre-configured serializer
* @param <S> type of {@link Serializer}
* @param <T> type serialized by the {@code Serializer}
*/
public static <S extends Serializer<T>, T> Preconfigured<S> key(final S serializer,
final Map<String, Object> configOverrides) {
return key(configurable(serializer), configOverrides);
return create(configurable(serializer), configOverrides);
}

private static <S extends Serde<T>, T> ConfigurableSerde<S, T> configurable(final S serde) {
Expand All @@ -149,32 +100,37 @@ private static <S extends Serializer<T>, T> ConfigurableSerializer<S, T> configu
return new ConfigurableSerializer<>(serializer);
}

private static <T> Preconfigured<T> key(final Configurable<T> configurable) {
return key(configurable, emptyMap());
private static <T> Preconfigured<T> create(final Configurable<T> configurable) {
return create(configurable, emptyMap());
}

private static <T> Preconfigured<T> key(final Configurable<T> configurable,
private static <T> Preconfigured<T> create(final Configurable<T> configurable,
final Map<String, Object> configOverrides) {
return new Preconfigured<>(configurable, configOverrides, true);
}

private static <T> Preconfigured<T> value(final Configurable<T> configurable) {
return value(configurable, emptyMap());
return new Preconfigured<>(configurable, configOverrides);
}

private static <T> Preconfigured<T> value(final Configurable<T> configurable,
final Map<String, Object> configOverrides) {
return new Preconfigured<>(configurable, configOverrides, false);
/**
* Configure using a base config for values
* @param baseConfig Base config. {@link #configOverrides} override properties of base config.
* @return configured instance
*/
public T configureForValues(final Map<String, Object> baseConfig) {
final boolean isKey = false;
return this.configure(baseConfig, isKey);
}

/**
* Configure using a base config
* Configure using a base config for keys
* @param baseConfig Base config. {@link #configOverrides} override properties of base config.
* @return configured instance
*/
public T configure(final Map<String, Object> baseConfig) {
public T configureForKeys(final Map<String, Object> baseConfig) {
return this.configure(baseConfig, true);
}

private T configure(final Map<String, Object> baseConfig, final boolean isKey) {
final Map<String, Object> serializerConfig = this.mergeConfig(baseConfig);
return this.configurable.configure(serializerConfig, this.isKey);
return this.configurable.configure(serializerConfig, isKey);
}

private Map<String, Object> mergeConfig(final Map<String, Object> baseConfig) {
Expand Down

0 comments on commit 6b58995

Please sign in to comment.