Skip to content

Commit

Permalink
feat: PKMode with PKField validation
Browse files Browse the repository at this point in the history
Signed-off-by: Joel Hanson <[email protected]>
Signed-off-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson authored and Joel Hanson committed Jun 7, 2024
1 parent e312a8f commit a79a78f
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 5 deletions.
8 changes: 5 additions & 3 deletions src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -68,8 +68,10 @@ public ConfigDef config() {

@Override
public Config validate(final Map<String, String> connectorConfigs) {
// TODO cross-fields validation here: pkFields against the pkMode
return super.validate(connectorConfigs);
final Config config = super.validate(connectorConfigs);

JdbcSinkConfig.validatePKModeAgainstPKFields(config);
return config;
}

@Override
Expand Down
65 changes: 64 additions & 1 deletion src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -29,8 +29,10 @@
import java.util.TimeZone;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;

import io.aiven.connect.jdbc.config.JdbcConfig;
import io.aiven.connect.jdbc.util.StringUtils;
Expand Down Expand Up @@ -437,4 +439,65 @@ public static void main(final String... args) {
System.out.println();
System.out.println(CONFIG_DEF.toEnrichedRst());
}

public static void validatePKModeAgainstPKFields(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE);
final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS);

if (pkModeConfigValue == null || pkFieldsConfigValue == null) {
return; // If either pkMode or pkFields are not configured, there's nothing to validate
}

final String pkMode = (String) pkModeConfigValue.value();
final List<String> pkFields = (List<String>) pkFieldsConfigValue.value();

if (pkMode == null) {
return; // If pkMode is null, skip validation
}

switch (pkMode.toLowerCase()) {
case "none":
validateNoPKFields(pkFieldsConfigValue, pkFields);
break;
case "kafka":
validateKafkaPKFields(pkFieldsConfigValue, pkFields);
break;
case "record_key":
case "record_value":
validatePKFieldsRequired(pkFieldsConfigValue, pkFields);
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode);
break;
}
}

private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields != null && !pkFields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields should not be set when pkMode is 'none'."
);
}
}

private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.size() != 3) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set with three fields "
+ "(topic, partition, offset) when pkMode is 'kafka'."
);
}
}

private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'."
);
}
}
}
87 changes: 86 additions & 1 deletion src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -20,13 +20,18 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.connect.jdbc.JdbcSinkConnector;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.entry;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class JdbcSinkConfigTest {

Expand Down Expand Up @@ -74,4 +79,84 @@ public void shouldThrowExceptionForEmptyMappingFormat() {
.isInstanceOf(ConfigException.class);
}

@Test
public void shouldValidatePKModeNoneWithPKFieldsSet() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "none");
props.put(JdbcSinkConfig.PK_FIELDS, "id");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("Primary key fields should not be set when pkMode is 'none'")));
}

@Test
public void shouldValidatePKModeKafkaWithInvalidPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "kafka");
props.put(JdbcSinkConfig.PK_FIELDS, "topic,partition");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set with three fields "
+ "(topic, partition, offset) when pkMode is 'kafka'"
)));
}

@Test
public void shouldValidatePKModeRecordKeyWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_key");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidatePKModeRecordValueWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_value");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidateValidPKModeAndPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_key");
props.put(JdbcSinkConfig.PK_FIELDS, "id");

final Config config = new JdbcSinkConnector().validate(props);

assertFalse(config.configValues().stream().allMatch(cv -> cv.errorMessages().size() > 0));
}
}

0 comments on commit a79a78f

Please sign in to comment.