diff --git a/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java b/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java index 4c84a82e..caa12537 100644 --- a/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java +++ b/src/main/java/io/aiven/connect/jdbc/JdbcSinkConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -68,8 +68,10 @@ public ConfigDef config() { @Override public Config validate(final Map connectorConfigs) { - // TODO cross-fields validation here: pkFields against the pkMode - return super.validate(connectorConfigs); + final Config config = super.validate(connectorConfigs); + + JdbcSinkConfig.validatePKModeAgainstPKFields(config); + return config; } @Override diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 04be94c1..2be238a6 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -29,8 +29,10 @@ import java.util.TimeZone; import java.util.stream.Collectors; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; import io.aiven.connect.jdbc.config.JdbcConfig; import io.aiven.connect.jdbc.util.StringUtils; @@ -437,4 +439,65 @@ public static void main(final String... args) { System.out.println(); System.out.println(CONFIG_DEF.toEnrichedRst()); } + + public static void validatePKModeAgainstPKFields(final Config config) { + // Collect all configuration values + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); + final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS); + + if (pkModeConfigValue == null || pkFieldsConfigValue == null) { + return; // If either pkMode or pkFields are not configured, there's nothing to validate + } + + final String pkMode = (String) pkModeConfigValue.value(); + final List pkFields = (List) pkFieldsConfigValue.value(); + + if (pkMode == null) { + return; // If pkMode is null, skip validation + } + + switch (pkMode.toLowerCase()) { + case "none": + validateNoPKFields(pkFieldsConfigValue, pkFields); + break; + case "kafka": + validateKafkaPKFields(pkFieldsConfigValue, pkFields); + break; + case "record_key": + case "record_value": + validatePKFieldsRequired(pkFieldsConfigValue, pkFields); + break; + default: + pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode); + break; + } + } + + private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { + if (pkFields != null && !pkFields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields should not be set when pkMode is 'none'." + ); + } + } + + private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { + if (pkFields == null || pkFields.size() != 3) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set with three fields " + + "(topic, partition, offset) when pkMode is 'kafka'." + ); + } + } + + private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List pkFields) { + if (pkFields == null || pkFields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set when pkMode is 'record_key' or 'record_value'." + ); + } + } } diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java index b4e7527f..990f1e64 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -20,13 +20,18 @@ import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigException; +import io.aiven.connect.jdbc.JdbcSinkConnector; + import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class JdbcSinkConfigTest { @@ -74,4 +79,84 @@ public void shouldThrowExceptionForEmptyMappingFormat() { .isInstanceOf(ConfigException.class); } + @Test + public void shouldValidatePKModeNoneWithPKFieldsSet() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.PK_MODE, "none"); + props.put(JdbcSinkConfig.PK_FIELDS, "id"); + + final Config config = new JdbcSinkConnector().validate(props); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("Primary key fields should not be set when pkMode is 'none'"))); + } + + @Test + public void shouldValidatePKModeKafkaWithInvalidPKFields() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.PK_MODE, "kafka"); + props.put(JdbcSinkConfig.PK_FIELDS, "topic,partition"); + + final Config config = new JdbcSinkConnector().validate(props); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains( + "Primary key fields must be set with three fields " + + "(topic, partition, offset) when pkMode is 'kafka'" + ))); + } + + @Test + public void shouldValidatePKModeRecordKeyWithNoPKFields() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.PK_MODE, "record_key"); + + final Config config = new JdbcSinkConnector().validate(props); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains( + "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" + ))); + } + + @Test + public void shouldValidatePKModeRecordValueWithNoPKFields() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.PK_MODE, "record_value"); + + final Config config = new JdbcSinkConnector().validate(props); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains( + "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" + ))); + } + + @Test + public void shouldValidateValidPKModeAndPKFields() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.PK_MODE, "record_key"); + props.put(JdbcSinkConfig.PK_FIELDS, "id"); + + final Config config = new JdbcSinkConnector().validate(props); + + assertFalse(config.configValues().stream().allMatch(cv -> cv.errorMessages().size() > 0)); + } }