diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 7fa7758ebfab..ae108cbd31a0 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -51,7 +51,17 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.SyncMode; import io.debezium.connector.sqlserver.Lsn; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; @@ -67,6 +77,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -599,30 +610,43 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { private void readSsl(final JsonNode sslMethod, final List additionalParameters) { final JsonNode config = sslMethod.get("ssl_method"); switch (config.get("ssl_method").asText()) { - case "unencrypted" -> additionalParameters.add("encrypt=false"); + case "unencrypted" -> { + additionalParameters.add("encrypt=false"); + additionalParameters.add("trustServerCertificate=true"); + } case "encrypted_trust_server_certificate" -> { additionalParameters.add("encrypt=true"); additionalParameters.add("trustServerCertificate=true"); } case "encrypted_verify_certificate" -> { additionalParameters.add("encrypt=true"); + additionalParameters.add("trustServerCertificate=false"); // trust store location code found at https://stackoverflow.com/a/56570588 - final String trustStoreLocation = Optional - .ofNullable(System.getProperty("javax.net.ssl.trustStore")) - .orElseGet(() -> System.getProperty("java.home") + "/lib/security/cacerts"); - final File trustStoreFile = new File(trustStoreLocation); - if (!trustStoreFile.exists()) { - throw new RuntimeException( - "Unable to locate the Java TrustStore: the system property javax.net.ssl.trustStore is undefined or " - + trustStoreLocation + " does not exist."); - } - final String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); - additionalParameters.add("trustStore=" + trustStoreLocation); - if (trustStorePassword != null && !trustStorePassword.isEmpty()) { + if (config.has("certificate")) { + byte[] certificate = config.get("certificate").asText().getBytes(StandardCharsets.US_ASCII); + String password = RandomStringUtils.randomAlphanumeric(100); + char[] pwdArray = password.toCharArray(); + final File trustStoreFile; + try { + trustStoreFile = File.createTempFile("mssqlTrustStoreFile", "jks"); + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, pwdArray); + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Certificate cert = cf.generateCertificate(new ByteArrayInputStream(certificate)); + ks.setCertificateEntry("cert", cert); + try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) { + ks.store(fos, pwdArray); + } + } catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) { + throw new RuntimeException(e); + } + additionalParameters + .add("trustStore=" + trustStoreFile.getAbsolutePath()); additionalParameters - .add("trustStorePassword=" + config.get("trustStorePassword").asText()); + .add("trustStorePassword=" + password); } + if (config.has("hostNameInCertificate")) { additionalParameters .add("hostNameInCertificate=" + config.get("hostNameInCertificate").asText()); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json index 4ee4554d009e..f4ed5b698924 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json @@ -4,7 +4,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "MSSQL Source Spec", "type": "object", - "required": ["host", "port", "database", "username"], + "required": ["host", "port", "database", "username", "password"], "properties": { "host": { "description": "The hostname of the database.", @@ -90,7 +90,7 @@ { "title": "Encrypted (verify certificate)", "description": "Verify and use the certificate provided by the server.", - "required": ["ssl_method", "trustStoreName", "trustStorePassword"], + "required": ["ssl_method"], "properties": { "ssl_method": { "type": "string", @@ -100,7 +100,15 @@ "title": "Host Name In Certificate", "type": "string", "description": "Specifies the host name of the server. The value of this property must match the subject property of the certificate.", - "order": 7 + "order": 0 + }, + "certificate": { + "title": "Certificate", + "type": "string", + "description": "certificate of the server, or of the CA that signed the server certificate", + "order": 1, + "airbyte_secret": true, + "multiline": true } } } diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/resources/dummy_config.json b/airbyte-integrations/connectors/source-mssql/src/test-integration/resources/dummy_config.json index 560e55333378..1f42c042e746 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/resources/dummy_config.json +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/resources/dummy_config.json @@ -2,5 +2,6 @@ "host": "default", "port": 5555, "database": "default", - "username": "default" + "username": "default", + "password": "default" } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSslSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSslSourceTest.java new file mode 100644 index 000000000000..c59d6a925ed5 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSslSourceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mssql; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.commons.exceptions.ConnectionErrorException; +import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage; +import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.CertificateKey; +import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import java.net.InetAddress; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MssqlSslSourceTest { + + private MsSQLTestDatabase testDb; + private static final Logger LOGGER = LoggerFactory.getLogger(MssqlSslSourceTest.class); + + @BeforeEach + void setup() { + testDb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.WITH_SSL_CERTIFICATES); + } + + @AfterEach + public void tearDown() { + testDb.close(); + } + + @ParameterizedTest + @EnumSource(CertificateKey.class) + public void testDiscoverWithCertificateTrustHostname(CertificateKey certificateKey) throws Exception { + String certificate = testDb.getCertificate(certificateKey); + JsonNode config = testDb.testConfigBuilder() + .withSsl(Map.of("ssl_method", "encrypted_verify_certificate", + "certificate", certificate)) + .build(); + try { + AirbyteCatalog catalog = new MssqlSource().discover(config); + assertTrue(certificateKey.isValid); + } catch (Exception e) { + if (certificateKey.isValid) { + throw e; + } + } + } + + @ParameterizedTest + @EnumSource(CertificateKey.class) + public void testDiscoverWithCertificateNoTrustHostnameWrongHostname(CertificateKey certificateKey) throws Throwable { + if (certificateKey.isValid) { + String containerIp = InetAddress.getByName(testDb.getContainer().getHost()).getHostAddress(); + String certificate = testDb.getCertificate(certificateKey); + JsonNode config = testDb.configBuilder() + .withSsl(Map.of("ssl_method", "encrypted_verify_certificate", + "certificate", certificate)) + .with(JdbcUtils.HOST_KEY, containerIp) + .with(JdbcUtils.PORT_KEY, testDb.getContainer().getFirstMappedPort()) + .withCredentials() + .withDatabase() + .build(); + try { + AirbyteCatalog catalog = new MssqlSource().discover(config); + fail("discover should have failed!"); + } catch (ConnectionErrorException e) { + String expectedMessage = + "Failed to validate the server name \"" + containerIp + "\"in a certificate during Secure Sockets Layer (SSL) initialization."; + if (!e.getExceptionMessage().contains(expectedMessage)) { + fail("exception message was " + e.getExceptionMessage() + "\n expected: " + expectedMessage); + } + } + } + } + + @ParameterizedTest + @EnumSource(CertificateKey.class) + public void testDiscoverWithCertificateNoTrustHostnameAlternateHostname(CertificateKey certificateKey) throws Exception { + final String containerIp = InetAddress.getByName(testDb.getContainer().getHost()).getHostAddress(); + if (certificateKey.isValid) { + String certificate = testDb.getCertificate(certificateKey); + JsonNode config = testDb.configBuilder() + .withSsl(Map.of("ssl_method", "encrypted_verify_certificate", + "certificate", certificate, + "hostNameInCertificate", testDb.getContainer().getHost())) + .with(JdbcUtils.HOST_KEY, containerIp) + .with(JdbcUtils.PORT_KEY, testDb.getContainer().getFirstMappedPort()) + .withCredentials() + .withDatabase() + .build(); + AirbyteCatalog catalog = new MssqlSource().discover(config); + } + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLContainerFactory.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLContainerFactory.java index c28cf5a96dd2..74f6cce2c3f4 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLContainerFactory.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLContainerFactory.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.mssql; import io.airbyte.cdk.testutils.ContainerFactory; +import org.apache.commons.lang3.StringUtils; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; @@ -35,4 +36,35 @@ public void withAgent(MSSQLServerContainer container) { container.addEnv("MSSQL_AGENT_ENABLED", "True"); } + public void withSslCertificates(MSSQLServerContainer container) { + // yes, this is uglier than sin. The reason why I'm doing this is because there's no command to + // reload a SqlServer config. So I need to create all the necessary files before I start the + // SQL server. Hence this horror + String command = StringUtils.replace( + """ + mkdir /tmp/certs/ && + openssl req -nodes -new -x509 -sha256 -keyout /tmp/certs/ca.key -out /tmp/certs/ca.crt -subj "/CN=ca" && + openssl req -nodes -new -x509 -sha256 -keyout /tmp/certs/dummy_ca.key -out /tmp/certs/dummy_ca.crt -subj "/CN=ca" && + openssl req -nodes -new -sha256 -keyout /tmp/certs/server.key -out /tmp/certs/server.csr -subj "/CN={hostName}" && + openssl req -nodes -new -sha256 -keyout /tmp/certs/dummy_server.key -out /tmp/certs/dummy_server.csr -subj "/CN={hostName}" && + + openssl x509 -req -in /tmp/certs/server.csr -CA /tmp/certs/ca.crt -CAkey /tmp/certs/ca.key -out /tmp/certs/server.crt -days 365 -sha256 && + openssl x509 -req -in /tmp/certs/dummy_server.csr -CA /tmp/certs/ca.crt -CAkey /tmp/certs/ca.key -out /tmp/certs/dummy_server.crt -days 365 -sha256 && + openssl x509 -req -in /tmp/certs/server.csr -CA /tmp/certs/dummy_ca.crt -CAkey /tmp/certs/dummy_ca.key -out /tmp/certs/server_dummy_ca.crt -days 365 -sha256 && + chmod 440 /tmp/certs/* && + { + cat > /var/opt/mssql/mssql.conf <<- EOF + [network] + tlscert = /tmp/certs/server.crt + tlskey = /tmp/certs/server.key + tlsprotocols = 1.2 + forceencryption = 1 + EOF + } && /opt/mssql/bin/sqlservr + """, + "{hostName}", container.getHost()); + container.withCommand("bash", "-c", command) + .withUrlParam("trustServerCertificate", "true"); + } + } diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java index 873f42cd40a8..8b570846cb69 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java @@ -8,7 +8,10 @@ import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.testutils.TestDatabase; import io.debezium.connector.sqlserver.Lsn; +import java.io.IOException; +import java.io.UncheckedIOException; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -201,6 +204,44 @@ public SQLDialect getSqlDialect() { return SQLDialect.DEFAULT; } + public static enum CertificateKey { + + CA(true), + DUMMY_CA(false), + SERVER(true), + DUMMY_SERVER(false), + SERVER_DUMMY_CA(false), + ; + + public final boolean isValid; + + CertificateKey(boolean isValid) { + this.isValid = isValid; + } + + } + + private Map cachedCerts; + + public synchronized String getCertificate(CertificateKey certificateKey) { + if (cachedCerts == null) { + Map cachedCerts = new HashMap<>(); + try { + for (CertificateKey key : CertificateKey.values()) { + String command = "cat /tmp/certs/" + key.name().toLowerCase() + ".crt"; + String certificate = getContainer().execInContainer("bash", "-c", command).getStdout().trim(); + cachedCerts.put(key, certificate); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cachedCerts = cachedCerts; + } + return cachedCerts.get(certificateKey); + } + @Override public MsSQLConfigBuilder configBuilder() { return new MsSQLConfigBuilder(this); @@ -209,7 +250,10 @@ public MsSQLConfigBuilder configBuilder() { static public class MsSQLConfigBuilder extends ConfigBuilder { protected MsSQLConfigBuilder(MsSQLTestDatabase testDatabase) { + super(testDatabase); + with(JdbcUtils.JDBC_URL_PARAMS_KEY, "loginTimeout=2"); + } public MsSQLConfigBuilder withCdcReplication() {