diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.java index 6423c4024e96e..bf4d507ab30ea 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.java @@ -8,11 +8,7 @@ import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.db.util.SSLCertificateUtils; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; @@ -26,14 +22,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JdbcSSLConnectionUtils { - public static final String SSL_MODE = "sslMode"; public static final String TRUST_KEY_STORE_URL = "trustCertificateKeyStoreUrl"; public static final String TRUST_KEY_STORE_PASS = "trustCertificateKeyStorePassword"; @@ -75,11 +69,28 @@ private static Optional<SslMode> bySpec(final String spec) { public static final String PARAM_CLIENT_KEY = "client_key"; public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password"; + public enum ConfigKeys { + SSL_MODE("ssl_mode"), + TRUST_KEY_STORE_URI("trustCertificateKeyStoreUri"), + TRUST_KEY_STORE_PASS("trustCertificateKeyStorePassword"), + TRUST_KEY_STORE_TYPE("trustCertificateKeyStoreType"), + CLIENT_KEY_STORE_URI("clientCertificateKeyStoreUri"), + CLIENT_KEY_STORE_PASS("clientCertificateKeyStorePassword"), + CLIENT_KEY_STORE_TYPE("clientCertificateKeyStoreType"), - public static record SSLConfig(SslMode sslMode, URL keyStoreUrl, String keyStorePassword, String keyStoreType) { + CA_CERTIFICATE_PATH("ca_certificate_path"), + ; + ConfigKeys(String val) { + + } + } + + public static record KeyStoreInfo (URI uri, String password, String type) {} + + public static record SSLConfig(SslMode sslMode, KeyStoreInfo clientKeyStoreInfo, KeyStoreInfo caKeyStoreInfo) { SSLConfig(SslMode sslMode) { - this(sslMode, null, null, null); + this(sslMode, null, null); } } @@ -90,94 +101,45 @@ public static record SSLConfig(SslMode sslMode, URL keyStoreUrl, String keyStore * @return map containing relevant parsed values including location of keystore or an empty map */ @Deprecated - public static Map<String, String> parseSSLConfigDeprecated(final JsonNode config) { + public static Map<ConfigKeys, String> parseSSLConfig(final JsonNode config) { LOGGER.debug("source config: {}", config); - Pair<URI, String> caCertKeyStorePair = null; - Pair<URI, String> clientCertKeyStorePair = null; - final Map<String, String> additionalParameters = new HashMap<>(); + final Map<ConfigKeys, String> additionalParameters = new HashMap<>(); // assume ssl if not explicitly mentioned. if (!config.has(JdbcUtils.SSL_KEY) || config.get(JdbcUtils.SSL_KEY).asBoolean()) { if (config.has(JdbcUtils.SSL_MODE_KEY)) { final String specMode = config.get(JdbcUtils.SSL_MODE_KEY).get(PARAM_MODE).asText(); - additionalParameters.put(SSL_MODE, + additionalParameters.put(ConfigKeys.SSL_MODE, SslMode.bySpec(specMode).orElseThrow(() -> new IllegalArgumentException("unexpected ssl mode")).name()); - if (Objects.isNull(caCertKeyStorePair)) { - caCertKeyStorePair = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(config); - } + KeyStoreInfo caCertKeyStorePair = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(config); if (Objects.nonNull(caCertKeyStorePair)) { - LOGGER.debug("uri for ca cert keystore: {}", caCertKeyStorePair.getLeft().toString()); - try { - additionalParameters.putAll(Map.of( - TRUST_KEY_STORE_URL, caCertKeyStorePair.getLeft().toURL().toString(), - TRUST_KEY_STORE_PASS, caCertKeyStorePair.getRight(), - TRUST_KEY_STORE_TYPE, KEY_STORE_TYPE_PKCS12)); - } catch (final MalformedURLException e) { - throw new RuntimeException("Unable to get a URL for trust key store"); - } + LOGGER.debug("uri for ca cert keystore: {}", caCertKeyStorePair.uri().toString()); - } + additionalParameters.putAll(Map.of( + ConfigKeys.TRUST_KEY_STORE_URI, caCertKeyStorePair.uri.toString(), + ConfigKeys.TRUST_KEY_STORE_PASS, caCertKeyStorePair.password, + ConfigKeys.TRUST_KEY_STORE_TYPE, caCertKeyStorePair.type)); - if (Objects.isNull(clientCertKeyStorePair)) { - clientCertKeyStorePair = JdbcSSLConnectionUtils.prepareClientCertificateKeyStore(config); } + KeyStoreInfo clientCertKeyStorePair = JdbcSSLConnectionUtils.prepareClientCertificateKeyStore(config); if (Objects.nonNull(clientCertKeyStorePair)) { - LOGGER.debug("uri for client cert keystore: {} / {}", clientCertKeyStorePair.getLeft().toString(), clientCertKeyStorePair.getRight()); - try { - additionalParameters.putAll(Map.of( - CLIENT_KEY_STORE_URL, clientCertKeyStorePair.getLeft().toURL().toString(), - CLIENT_KEY_STORE_PASS, clientCertKeyStorePair.getRight(), - CLIENT_KEY_STORE_TYPE, KEY_STORE_TYPE_PKCS12)); - } catch (final MalformedURLException e) { - throw new RuntimeException("Unable to get a URL for client key store"); - } + LOGGER.debug("uri for client cert keystore: {} / {}", clientCertKeyStorePair.uri, clientCertKeyStorePair.password); + additionalParameters.putAll(Map.of( + ConfigKeys.CLIENT_KEY_STORE_URI, clientCertKeyStorePair.uri.toString(), + ConfigKeys.CLIENT_KEY_STORE_PASS, clientCertKeyStorePair.password, + ConfigKeys.CLIENT_KEY_STORE_TYPE, clientCertKeyStorePair.type)); } } else { - additionalParameters.put(SSL_MODE, SslMode.DISABLED.name()); + additionalParameters.put(ConfigKeys.SSL_MODE, SslMode.DISABLED.name()); } } LOGGER.debug("additional params: {}", additionalParameters); return additionalParameters; } - public static SSLConfig parseSSLConfig(final JsonNode config) { - LOGGER.debug("source config: {}", config); - // assume ssl if not explicitly mentioned. - if (config.has(JdbcUtils.SSL_KEY) && ! config.get(JdbcUtils.SSL_KEY).asBoolean()) { - return new SSLConfig(SslMode.DISABLED); - } - if (!config.has(JdbcUtils.SSL_MODE_KEY)) { - return new SSLConfig(SslMode.DISABLED); - } - - JsonNode sslConfigAsJson = config.get(JdbcUtils.SSL_MODE_KEY); - final String specMode = sslConfigAsJson.get(PARAM_MODE).asText(); - SslMode sslMode = SslMode.bySpec(specMode).orElseThrow(() -> new IllegalArgumentException("unexpected ssl mode")); - - Pair<URI, String> certificateKeyStore = prepareClientCertificateKeyStore(sslConfigAsJson); - if (certificateKeyStore == null) { - LOGGER.debug("no client certificate keystore found. Looking for CA certificate key store."); - certificateKeyStore = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(sslConfigAsJson); - } - if (certificateKeyStore == null) { - LOGGER.debug("no CA certificate keystore found. Returning SSLConfig."); - return new SSLConfig(sslMode); - } - - LOGGER.debug("uri for cert keystore: {}", certificateKeyStore.getLeft().toString()); - try { - return new SSLConfig(sslMode, - certificateKeyStore.getLeft().toURL(), - certificateKeyStore.getRight(), - KEY_STORE_TYPE_PKCS12); - } catch (final MalformedURLException e) { - throw new RuntimeException("Unable to get a URL for trust key store"); - } - } - - private static @Nullable Pair<URI, String> prepareCACertificateKeyStore(final JsonNode sslConfigAsJson) { + private static @Nullable KeyStoreInfo prepareCACertificateKeyStore(final JsonNode sslConfigAsJson) { // if has CA cert - make keystore // if has client cert // if has client password - make keystore using password @@ -191,7 +153,7 @@ public static SSLConfig parseSSLConfig(final JsonNode config) { clientKeyPassword, null, null); - return new ImmutablePair<>(caCertKeyStoreUri, clientKeyPassword); + return new KeyStoreInfo(caCertKeyStoreUri, clientKeyPassword, KEY_STORE_TYPE_PKCS12); } catch (final CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException e) { throw new RuntimeException("Failed to create keystore for CA certificate", e); } @@ -210,7 +172,7 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) { return clientKeyPassword; } - private static @Nullable Pair<URI, String> prepareClientCertificateKeyStore(final @Nonnull JsonNode sslConfigAsJson) { + private static @Nullable KeyStoreInfo prepareClientCertificateKeyStore(final @Nonnull JsonNode sslConfigAsJson) { Pair<URI, String> clientCertKeyStorePair = null; if (sslConfigAsJson.has(PARAM_CLIENT_CERTIFICATE) && !sslConfigAsJson.get(PARAM_CLIENT_CERTIFICATE).asText().isEmpty() && sslConfigAsJson.has(PARAM_CLIENT_KEY) && !sslConfigAsJson.get(PARAM_CLIENT_KEY).asText().isEmpty()) { @@ -219,7 +181,7 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) { final URI clientCertKeyStoreUri = SSLCertificateUtils.keyStoreFromClientCertificate(sslConfigAsJson.get(PARAM_CLIENT_CERTIFICATE).asText(), sslConfigAsJson.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword, null); - return new ImmutablePair<>(clientCertKeyStoreUri, clientKeyPassword); + return new KeyStoreInfo(clientCertKeyStoreUri, clientKeyPassword, KEY_STORE_TYPE_PKCS12); } catch (final CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException | InvalidKeySpecException | InterruptedException e) { @@ -228,16 +190,4 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) { } return null; } - - // SGX TODO: should be private - public static Path fileFromCertPem(final String certPem) { - try { - final Path path = Files.createTempFile(null, ".crt"); - Files.writeString(path, certPem); - path.toFile().deleteOnExit(); - return path; - } catch (final IOException e) { - throw new RuntimeException("Cannot save root certificate to file", e); - } - } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 75dc72c4ee242..9f5d8ae3720c7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -58,6 +58,8 @@ import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.cdk.integrations.source.jdbc.JdbcDataSourceUtils; import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils; +import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.ConfigKeys; +import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SSLConfig; import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode; import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto; import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; @@ -86,6 +88,7 @@ import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.XminStreams; import io.airbyte.integrations.source.postgres.xmin.XminStateManager; import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.Config; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -97,8 +100,10 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.ConnectorSpecification; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.Files; import java.nio.file.Path; import java.sql.Connection; import java.sql.PreparedStatement; @@ -108,6 +113,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -129,7 +135,6 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements public static final String SSL_ROOT_CERT = "sslrootcert"; static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName(); - public static final String CA_CERTIFICATE_PATH = "ca_certificate_path"; public static final String SSL_KEY = "sslkey"; public static final String SSL_PASSWORD = "sslpassword"; public static final String MODE = "mode"; @@ -169,6 +174,16 @@ public ConnectorSpecification spec() throws Exception { return super.spec(); } + private static Path createFileForCertPem(final String certPem) { + try { + final Path path = Files.createTempFile(null, ".crt"); + Files.writeString(path, certPem); + path.toFile().deleteOnExit(); + return path; + } catch (final IOException e) { + throw new RuntimeException("Cannot save root certificate to file", e); + } + } @Override public JsonNode toDatabaseConfig(final JsonNode config) { final List<String> additionalParameters = new ArrayList<>(); @@ -187,11 +202,10 @@ public JsonNode toDatabaseConfig(final JsonNode config) { jdbcUrl.append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()).append(AMPERSAND); } @SuppressWarnings("deprecation") - final Map<String, String> sslParameters = parseSSLConfigDeprecated(config); + final Map<ConfigKeys, String> sslParameters = parseSSLConfig(config); if (config.has(PARAM_SSL_MODE) && config.get(PARAM_SSL_MODE).has(PARAM_CA_CERTIFICATE)) { - sslParameters.put(CA_CERTIFICATE_PATH, - JdbcSSLConnectionUtils.fileFromCertPem(config.get(PARAM_SSL_MODE).get(PARAM_CA_CERTIFICATE).asText()).toString()); - LOGGER.debug("root ssl ca crt file: {}", sslParameters.get(CA_CERTIFICATE_PATH)); + Path fileForCertPem = createFileForCertPem(config.get(PARAM_SSL_MODE).get(PARAM_CA_CERTIFICATE).asText()); + LOGGER.debug("root ssl ca crt file: {}", sslParameters.get(ConfigKeys.CA_CERTIFICATE_PATH)); } if (config.has(JdbcUtils.SCHEMAS_KEY) && config.get(JdbcUtils.SCHEMAS_KEY).isArray()) { @@ -221,22 +235,18 @@ public JsonNode toDatabaseConfig(final JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } - public String toJDBCQueryParams(final Map<String, String> sslParams) { + public String toJDBCQueryParams(final Map<ConfigKeys, String> sslParams) { return Objects.isNull(sslParams) ? "" : sslParams.entrySet() .stream() .map((entry) -> { - try { - return switch (entry.getKey()) { - case JdbcSSLConnectionUtils.SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue())); - case CA_CERTIFICATE_PATH -> SSL_ROOT_CERT + EQUALS + entry.getValue(); - case CLIENT_KEY_STORE_URL -> SSL_KEY + EQUALS + Path.of(new URI(entry.getValue())); - case CLIENT_KEY_STORE_PASS -> SSL_PASSWORD + EQUALS + entry.getValue(); - default -> ""; - }; - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("unable to convert to URI", e); - } + return switch (entry.getKey()) { + case SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue())); + case TRUST_KEY_STORE_URI -> SSL_ROOT_CERT + EQUALS + entry.getValue(); + case CLIENT_KEY_STORE_URI -> SSL_KEY + EQUALS + entry.getValue(); + case CLIENT_KEY_STORE_PASS -> SSL_PASSWORD + EQUALS + entry.getValue(); + default -> ""; + }; }) .filter(s -> Objects.nonNull(s) && !s.isEmpty()) .collect(Collectors.joining(JdbcUtils.AMPERSAND));