Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Nov 14, 2023
1 parent 3d01d1b commit 821d7cb
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.util.SSLCertificateUtils;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
Expand All @@ -26,14 +22,12 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSSLConnectionUtils {

public static final String SSL_MODE = "sslMode";

public static final String TRUST_KEY_STORE_URL = "trustCertificateKeyStoreUrl";
public static final String TRUST_KEY_STORE_PASS = "trustCertificateKeyStorePassword";
Expand Down Expand Up @@ -75,11 +69,28 @@ private static Optional<SslMode> bySpec(final String spec) {
public static final String PARAM_CLIENT_KEY = "client_key";
public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password";

public enum ConfigKeys {
SSL_MODE("ssl_mode"),
TRUST_KEY_STORE_URI("trustCertificateKeyStoreUri"),
TRUST_KEY_STORE_PASS("trustCertificateKeyStorePassword"),
TRUST_KEY_STORE_TYPE("trustCertificateKeyStoreType"),

CLIENT_KEY_STORE_URI("clientCertificateKeyStoreUri"),
CLIENT_KEY_STORE_PASS("clientCertificateKeyStorePassword"),
CLIENT_KEY_STORE_TYPE("clientCertificateKeyStoreType"),

public static record SSLConfig(SslMode sslMode, URL keyStoreUrl, String keyStorePassword, String keyStoreType) {
CA_CERTIFICATE_PATH("ca_certificate_path"),
;
ConfigKeys(String val) {

}
}

public static record KeyStoreInfo (URI uri, String password, String type) {}

public static record SSLConfig(SslMode sslMode, KeyStoreInfo clientKeyStoreInfo, KeyStoreInfo caKeyStoreInfo) {
SSLConfig(SslMode sslMode) {
this(sslMode, null, null, null);
this(sslMode, null, null);
}
}

Expand All @@ -90,94 +101,45 @@ public static record SSLConfig(SslMode sslMode, URL keyStoreUrl, String keyStore
* @return map containing relevant parsed values including location of keystore or an empty map
*/
@Deprecated
public static Map<String, String> parseSSLConfigDeprecated(final JsonNode config) {
public static Map<ConfigKeys, String> parseSSLConfig(final JsonNode config) {
LOGGER.debug("source config: {}", config);

Pair<URI, String> caCertKeyStorePair = null;
Pair<URI, String> clientCertKeyStorePair = null;
final Map<String, String> additionalParameters = new HashMap<>();
final Map<ConfigKeys, String> additionalParameters = new HashMap<>();
// assume ssl if not explicitly mentioned.
if (!config.has(JdbcUtils.SSL_KEY) || config.get(JdbcUtils.SSL_KEY).asBoolean()) {
if (config.has(JdbcUtils.SSL_MODE_KEY)) {
final String specMode = config.get(JdbcUtils.SSL_MODE_KEY).get(PARAM_MODE).asText();
additionalParameters.put(SSL_MODE,
additionalParameters.put(ConfigKeys.SSL_MODE,
SslMode.bySpec(specMode).orElseThrow(() -> new IllegalArgumentException("unexpected ssl mode")).name());
if (Objects.isNull(caCertKeyStorePair)) {
caCertKeyStorePair = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(config);
}

KeyStoreInfo caCertKeyStorePair = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(config);
if (Objects.nonNull(caCertKeyStorePair)) {
LOGGER.debug("uri for ca cert keystore: {}", caCertKeyStorePair.getLeft().toString());
try {
additionalParameters.putAll(Map.of(
TRUST_KEY_STORE_URL, caCertKeyStorePair.getLeft().toURL().toString(),
TRUST_KEY_STORE_PASS, caCertKeyStorePair.getRight(),
TRUST_KEY_STORE_TYPE, KEY_STORE_TYPE_PKCS12));
} catch (final MalformedURLException e) {
throw new RuntimeException("Unable to get a URL for trust key store");
}
LOGGER.debug("uri for ca cert keystore: {}", caCertKeyStorePair.uri().toString());

}
additionalParameters.putAll(Map.of(
ConfigKeys.TRUST_KEY_STORE_URI, caCertKeyStorePair.uri.toString(),
ConfigKeys.TRUST_KEY_STORE_PASS, caCertKeyStorePair.password,
ConfigKeys.TRUST_KEY_STORE_TYPE, caCertKeyStorePair.type));

if (Objects.isNull(clientCertKeyStorePair)) {
clientCertKeyStorePair = JdbcSSLConnectionUtils.prepareClientCertificateKeyStore(config);
}

KeyStoreInfo clientCertKeyStorePair = JdbcSSLConnectionUtils.prepareClientCertificateKeyStore(config);
if (Objects.nonNull(clientCertKeyStorePair)) {
LOGGER.debug("uri for client cert keystore: {} / {}", clientCertKeyStorePair.getLeft().toString(), clientCertKeyStorePair.getRight());
try {
additionalParameters.putAll(Map.of(
CLIENT_KEY_STORE_URL, clientCertKeyStorePair.getLeft().toURL().toString(),
CLIENT_KEY_STORE_PASS, clientCertKeyStorePair.getRight(),
CLIENT_KEY_STORE_TYPE, KEY_STORE_TYPE_PKCS12));
} catch (final MalformedURLException e) {
throw new RuntimeException("Unable to get a URL for client key store");
}
LOGGER.debug("uri for client cert keystore: {} / {}", clientCertKeyStorePair.uri, clientCertKeyStorePair.password);
additionalParameters.putAll(Map.of(
ConfigKeys.CLIENT_KEY_STORE_URI, clientCertKeyStorePair.uri.toString(),
ConfigKeys.CLIENT_KEY_STORE_PASS, clientCertKeyStorePair.password,
ConfigKeys.CLIENT_KEY_STORE_TYPE, clientCertKeyStorePair.type));
}
} else {
additionalParameters.put(SSL_MODE, SslMode.DISABLED.name());
additionalParameters.put(ConfigKeys.SSL_MODE, SslMode.DISABLED.name());
}
}
LOGGER.debug("additional params: {}", additionalParameters);
return additionalParameters;
}

public static SSLConfig parseSSLConfig(final JsonNode config) {
LOGGER.debug("source config: {}", config);
// assume ssl if not explicitly mentioned.
if (config.has(JdbcUtils.SSL_KEY) && ! config.get(JdbcUtils.SSL_KEY).asBoolean()) {
return new SSLConfig(SslMode.DISABLED);
}
if (!config.has(JdbcUtils.SSL_MODE_KEY)) {
return new SSLConfig(SslMode.DISABLED);
}

JsonNode sslConfigAsJson = config.get(JdbcUtils.SSL_MODE_KEY);
final String specMode = sslConfigAsJson.get(PARAM_MODE).asText();
SslMode sslMode = SslMode.bySpec(specMode).orElseThrow(() -> new IllegalArgumentException("unexpected ssl mode"));

Pair<URI, String> certificateKeyStore = prepareClientCertificateKeyStore(sslConfigAsJson);
if (certificateKeyStore == null) {
LOGGER.debug("no client certificate keystore found. Looking for CA certificate key store.");
certificateKeyStore = JdbcSSLConnectionUtils.prepareCACertificateKeyStore(sslConfigAsJson);
}
if (certificateKeyStore == null) {
LOGGER.debug("no CA certificate keystore found. Returning SSLConfig.");
return new SSLConfig(sslMode);
}

LOGGER.debug("uri for cert keystore: {}", certificateKeyStore.getLeft().toString());
try {
return new SSLConfig(sslMode,
certificateKeyStore.getLeft().toURL(),
certificateKeyStore.getRight(),
KEY_STORE_TYPE_PKCS12);
} catch (final MalformedURLException e) {
throw new RuntimeException("Unable to get a URL for trust key store");
}
}

private static @Nullable Pair<URI, String> prepareCACertificateKeyStore(final JsonNode sslConfigAsJson) {
private static @Nullable KeyStoreInfo prepareCACertificateKeyStore(final JsonNode sslConfigAsJson) {
// if has CA cert - make keystore
// if has client cert
// if has client password - make keystore using password
Expand All @@ -191,7 +153,7 @@ public static SSLConfig parseSSLConfig(final JsonNode config) {
clientKeyPassword,
null,
null);
return new ImmutablePair<>(caCertKeyStoreUri, clientKeyPassword);
return new KeyStoreInfo(caCertKeyStoreUri, clientKeyPassword, KEY_STORE_TYPE_PKCS12);
} catch (final CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to create keystore for CA certificate", e);
}
Expand All @@ -210,7 +172,7 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) {
return clientKeyPassword;
}

private static @Nullable Pair<URI, String> prepareClientCertificateKeyStore(final @Nonnull JsonNode sslConfigAsJson) {
private static @Nullable KeyStoreInfo prepareClientCertificateKeyStore(final @Nonnull JsonNode sslConfigAsJson) {
Pair<URI, String> clientCertKeyStorePair = null;
if (sslConfigAsJson.has(PARAM_CLIENT_CERTIFICATE) && !sslConfigAsJson.get(PARAM_CLIENT_CERTIFICATE).asText().isEmpty()
&& sslConfigAsJson.has(PARAM_CLIENT_KEY) && !sslConfigAsJson.get(PARAM_CLIENT_KEY).asText().isEmpty()) {
Expand All @@ -219,7 +181,7 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) {
final URI clientCertKeyStoreUri = SSLCertificateUtils.keyStoreFromClientCertificate(sslConfigAsJson.get(PARAM_CLIENT_CERTIFICATE).asText(),
sslConfigAsJson.get(PARAM_CLIENT_KEY).asText(),
clientKeyPassword, null);
return new ImmutablePair<>(clientCertKeyStoreUri, clientKeyPassword);
return new KeyStoreInfo(clientCertKeyStoreUri, clientKeyPassword, KEY_STORE_TYPE_PKCS12);
} catch (final CertificateException | IOException
| KeyStoreException | NoSuchAlgorithmException
| InvalidKeySpecException | InterruptedException e) {
Expand All @@ -228,16 +190,4 @@ private static String getOrGeneratePassword(final JsonNode sslModeConfig) {
}
return null;
}

// SGX TODO: should be private
public static Path fileFromCertPem(final String certPem) {
try {
final Path path = Files.createTempFile(null, ".crt");
Files.writeString(path, certPem);
path.toFile().deleteOnExit();
return path;
} catch (final IOException e) {
throw new RuntimeException("Cannot save root certificate to file", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.cdk.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.ConfigKeys;
import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SSLConfig;
import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
Expand Down Expand Up @@ -86,6 +88,7 @@
import io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.XminStreams;
import io.airbyte.integrations.source.postgres.xmin.XminStateManager;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Config;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -97,8 +100,10 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -108,6 +113,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -129,7 +135,6 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_ROOT_CERT = "sslrootcert";

static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
public static final String CA_CERTIFICATE_PATH = "ca_certificate_path";
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
Expand Down Expand Up @@ -169,6 +174,16 @@ public ConnectorSpecification spec() throws Exception {
return super.spec();
}

private static Path createFileForCertPem(final String certPem) {
try {
final Path path = Files.createTempFile(null, ".crt");
Files.writeString(path, certPem);
path.toFile().deleteOnExit();
return path;
} catch (final IOException e) {
throw new RuntimeException("Cannot save root certificate to file", e);
}
}
@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();
Expand All @@ -187,11 +202,10 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
jdbcUrl.append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()).append(AMPERSAND);
}
@SuppressWarnings("deprecation")
final Map<String, String> sslParameters = parseSSLConfigDeprecated(config);
final Map<ConfigKeys, String> sslParameters = parseSSLConfig(config);
if (config.has(PARAM_SSL_MODE) && config.get(PARAM_SSL_MODE).has(PARAM_CA_CERTIFICATE)) {
sslParameters.put(CA_CERTIFICATE_PATH,
JdbcSSLConnectionUtils.fileFromCertPem(config.get(PARAM_SSL_MODE).get(PARAM_CA_CERTIFICATE).asText()).toString());
LOGGER.debug("root ssl ca crt file: {}", sslParameters.get(CA_CERTIFICATE_PATH));
Path fileForCertPem = createFileForCertPem(config.get(PARAM_SSL_MODE).get(PARAM_CA_CERTIFICATE).asText());
LOGGER.debug("root ssl ca crt file: {}", sslParameters.get(ConfigKeys.CA_CERTIFICATE_PATH));
}

if (config.has(JdbcUtils.SCHEMAS_KEY) && config.get(JdbcUtils.SCHEMAS_KEY).isArray()) {
Expand Down Expand Up @@ -221,22 +235,18 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

public String toJDBCQueryParams(final Map<String, String> sslParams) {
public String toJDBCQueryParams(final Map<ConfigKeys, String> sslParams) {
return Objects.isNull(sslParams) ? ""
: sslParams.entrySet()
.stream()
.map((entry) -> {
try {
return switch (entry.getKey()) {
case JdbcSSLConnectionUtils.SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()));
case CA_CERTIFICATE_PATH -> SSL_ROOT_CERT + EQUALS + entry.getValue();
case CLIENT_KEY_STORE_URL -> SSL_KEY + EQUALS + Path.of(new URI(entry.getValue()));
case CLIENT_KEY_STORE_PASS -> SSL_PASSWORD + EQUALS + entry.getValue();
default -> "";
};
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("unable to convert to URI", e);
}
return switch (entry.getKey()) {
case SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()));
case TRUST_KEY_STORE_URI -> SSL_ROOT_CERT + EQUALS + entry.getValue();
case CLIENT_KEY_STORE_URI -> SSL_KEY + EQUALS + entry.getValue();
case CLIENT_KEY_STORE_PASS -> SSL_PASSWORD + EQUALS + entry.getValue();
default -> "";
};
})
.filter(s -> Objects.nonNull(s) && !s.isEmpty())
.collect(Collectors.joining(JdbcUtils.AMPERSAND));
Expand Down

0 comments on commit 821d7cb

Please sign in to comment.