Skip to content

Commit

Permalink
add testDockerFile for MS SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Dec 19, 2023
1 parent 26d3746 commit bb32bf5
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.connector.sqlserver.Lsn;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand All @@ -67,6 +77,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -599,30 +610,43 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
private void readSsl(final JsonNode sslMethod, final List<String> additionalParameters) {
final JsonNode config = sslMethod.get("ssl_method");
switch (config.get("ssl_method").asText()) {
case "unencrypted" -> additionalParameters.add("encrypt=false");
case "unencrypted" -> {
additionalParameters.add("encrypt=false");
additionalParameters.add("trustServerCertificate=true");
}
case "encrypted_trust_server_certificate" -> {
additionalParameters.add("encrypt=true");
additionalParameters.add("trustServerCertificate=true");
}
case "encrypted_verify_certificate" -> {
additionalParameters.add("encrypt=true");
additionalParameters.add("trustServerCertificate=false");

// trust store location code found at https://stackoverflow.com/a/56570588
final String trustStoreLocation = Optional
.ofNullable(System.getProperty("javax.net.ssl.trustStore"))
.orElseGet(() -> System.getProperty("java.home") + "/lib/security/cacerts");
final File trustStoreFile = new File(trustStoreLocation);
if (!trustStoreFile.exists()) {
throw new RuntimeException(
"Unable to locate the Java TrustStore: the system property javax.net.ssl.trustStore is undefined or "
+ trustStoreLocation + " does not exist.");
}
final String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
additionalParameters.add("trustStore=" + trustStoreLocation);
if (trustStorePassword != null && !trustStorePassword.isEmpty()) {
if (config.has("certificate")) {
byte[] certificate = config.get("certificate").asText().getBytes(StandardCharsets.US_ASCII);
String password = RandomStringUtils.randomAlphanumeric(100);
char[] pwdArray = password.toCharArray();
final File trustStoreFile;
try {
trustStoreFile = File.createTempFile("mssqlTrustStoreFile", "jks");
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, pwdArray);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate cert = cf.generateCertificate(new ByteArrayInputStream(certificate));
ks.setCertificateEntry("cert", cert);
try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) {
ks.store(fos, pwdArray);
}
} catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
throw new RuntimeException(e);
}
additionalParameters
.add("trustStore=" + trustStoreFile.getAbsolutePath());
additionalParameters
.add("trustStorePassword=" + config.get("trustStorePassword").asText());
.add("trustStorePassword=" + password);
}

if (config.has("hostNameInCertificate")) {
additionalParameters
.add("hostNameInCertificate=" + config.get("hostNameInCertificate").asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"required": ["host", "port", "database", "username", "password"],
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down Expand Up @@ -90,7 +90,7 @@
{
"title": "Encrypted (verify certificate)",
"description": "Verify and use the certificate provided by the server.",
"required": ["ssl_method", "trustStoreName", "trustStorePassword"],
"required": ["ssl_method"],
"properties": {
"ssl_method": {
"type": "string",
Expand All @@ -100,7 +100,15 @@
"title": "Host Name In Certificate",
"type": "string",
"description": "Specifies the host name of the server. The value of this property must match the subject property of the certificate.",
"order": 7
"order": 0
},
"certificate": {
"title": "Certificate",
"type": "string",
"description": "certificate of the server, or of the CA that signed the server certificate",
"order": 1,
"airbyte_secret": true,
"multiline": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"host": "default",
"port": 5555,
"database": "default",
"username": "default"
"username": "default",
"password": "default"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mssql;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.CertificateKey;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import java.net.InetAddress;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlSslSourceTest {

private MsSQLTestDatabase testDb;
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlSslSourceTest.class);

@BeforeEach
void setup() {
testDb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.WITH_SSL_CERTIFICATES);

Check failure on line 33 in airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSslSourceTest.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-integrations:connectors:source-mssql:compileTestJava FAILED] cannot find symbol testDb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.WITH_SSL_CERTIFICATES); ^ symbol: variable WITH_SSL_CERTIFICATES
}

@AfterEach
public void tearDown() {
testDb.close();
}

@ParameterizedTest
@EnumSource(CertificateKey.class)
public void testDiscoverWithCertificateTrustHostname(CertificateKey certificateKey) throws Exception {
String certificate = testDb.getCertificate(certificateKey);
JsonNode config = testDb.testConfigBuilder()
.withSsl(Map.of("ssl_method", "encrypted_verify_certificate",
"certificate", certificate))
.build();
try {
AirbyteCatalog catalog = new MssqlSource().discover(config);
assertTrue(certificateKey.isValid);
} catch (Exception e) {
if (certificateKey.isValid) {
throw e;
}
}
}

@ParameterizedTest
@EnumSource(CertificateKey.class)
public void testDiscoverWithCertificateNoTrustHostnameWrongHostname(CertificateKey certificateKey) throws Throwable {
if (certificateKey.isValid) {
String containerIp = InetAddress.getByName(testDb.getContainer().getHost()).getHostAddress();
String certificate = testDb.getCertificate(certificateKey);
JsonNode config = testDb.configBuilder()
.withSsl(Map.of("ssl_method", "encrypted_verify_certificate",
"certificate", certificate))
.with(JdbcUtils.HOST_KEY, containerIp)
.with(JdbcUtils.PORT_KEY, testDb.getContainer().getFirstMappedPort())
.withCredentials()
.withDatabase()
.build();
try {
AirbyteCatalog catalog = new MssqlSource().discover(config);
fail("discover should have failed!");
} catch (ConnectionErrorException e) {
String expectedMessage =
"Failed to validate the server name \"" + containerIp + "\"in a certificate during Secure Sockets Layer (SSL) initialization.";
if (!e.getExceptionMessage().contains(expectedMessage)) {
fail("exception message was " + e.getExceptionMessage() + "\n expected: " + expectedMessage);
}
}
}
}

@ParameterizedTest
@EnumSource(CertificateKey.class)
public void testDiscoverWithCertificateNoTrustHostnameAlternateHostname(CertificateKey certificateKey) throws Exception {
final String containerIp = InetAddress.getByName(testDb.getContainer().getHost()).getHostAddress();
if (certificateKey.isValid) {
String certificate = testDb.getCertificate(certificateKey);
JsonNode config = testDb.configBuilder()
.withSsl(Map.of("ssl_method", "encrypted_verify_certificate",
"certificate", certificate,
"hostNameInCertificate", testDb.getContainer().getHost()))
.with(JdbcUtils.HOST_KEY, containerIp)
.with(JdbcUtils.PORT_KEY, testDb.getContainer().getFirstMappedPort())
.withCredentials()
.withDatabase()
.build();
AirbyteCatalog catalog = new MssqlSource().discover(config);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.mssql;

import io.airbyte.cdk.testutils.ContainerFactory;
import org.apache.commons.lang3.StringUtils;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -35,4 +36,35 @@ public void withAgent(MSSQLServerContainer<?> container) {
container.addEnv("MSSQL_AGENT_ENABLED", "True");
}

public void withSslCertificates(MSSQLServerContainer<?> container) {
// yes, this is uglier than sin. The reason why I'm doing this is because there's no command to
// reload a SqlServer config. So I need to create all the necessary files before I start the
// SQL server. Hence this horror
String command = StringUtils.replace(
"""
mkdir /tmp/certs/ &&
openssl req -nodes -new -x509 -sha256 -keyout /tmp/certs/ca.key -out /tmp/certs/ca.crt -subj "/CN=ca" &&
openssl req -nodes -new -x509 -sha256 -keyout /tmp/certs/dummy_ca.key -out /tmp/certs/dummy_ca.crt -subj "/CN=ca" &&
openssl req -nodes -new -sha256 -keyout /tmp/certs/server.key -out /tmp/certs/server.csr -subj "/CN={hostName}" &&
openssl req -nodes -new -sha256 -keyout /tmp/certs/dummy_server.key -out /tmp/certs/dummy_server.csr -subj "/CN={hostName}" &&
openssl x509 -req -in /tmp/certs/server.csr -CA /tmp/certs/ca.crt -CAkey /tmp/certs/ca.key -out /tmp/certs/server.crt -days 365 -sha256 &&
openssl x509 -req -in /tmp/certs/dummy_server.csr -CA /tmp/certs/ca.crt -CAkey /tmp/certs/ca.key -out /tmp/certs/dummy_server.crt -days 365 -sha256 &&
openssl x509 -req -in /tmp/certs/server.csr -CA /tmp/certs/dummy_ca.crt -CAkey /tmp/certs/dummy_ca.key -out /tmp/certs/server_dummy_ca.crt -days 365 -sha256 &&
chmod 440 /tmp/certs/* &&
{
cat > /var/opt/mssql/mssql.conf <<- EOF
[network]
tlscert = /tmp/certs/server.crt
tlskey = /tmp/certs/server.key
tlsprotocols = 1.2
forceencryption = 1
EOF
} && /opt/mssql/bin/sqlservr
""",
"{hostName}", container.getHost());
container.withCommand("bash", "-c", command)
.withUrlParam("trustServerCertificate", "true");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.testutils.TestDatabase;
import io.debezium.connector.sqlserver.Lsn;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -201,6 +204,44 @@ public SQLDialect getSqlDialect() {
return SQLDialect.DEFAULT;
}

public static enum CertificateKey {

CA(true),
DUMMY_CA(false),
SERVER(true),
DUMMY_SERVER(false),
SERVER_DUMMY_CA(false),
;

public final boolean isValid;

CertificateKey(boolean isValid) {
this.isValid = isValid;
}

}

private Map<CertificateKey, String> cachedCerts;

public synchronized String getCertificate(CertificateKey certificateKey) {
if (cachedCerts == null) {
Map<CertificateKey, String> cachedCerts = new HashMap<>();
try {
for (CertificateKey key : CertificateKey.values()) {
String command = "cat /tmp/certs/" + key.name().toLowerCase() + ".crt";
String certificate = getContainer().execInContainer("bash", "-c", command).getStdout().trim();
cachedCerts.put(key, certificate);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cachedCerts = cachedCerts;
}
return cachedCerts.get(certificateKey);
}

@Override
public MsSQLConfigBuilder configBuilder() {
return new MsSQLConfigBuilder(this);
Expand All @@ -209,7 +250,10 @@ public MsSQLConfigBuilder configBuilder() {
static public class MsSQLConfigBuilder extends ConfigBuilder<MsSQLTestDatabase, MsSQLConfigBuilder> {

protected MsSQLConfigBuilder(MsSQLTestDatabase testDatabase) {

super(testDatabase);
with(JdbcUtils.JDBC_URL_PARAMS_KEY, "loginTimeout=2");

}

public MsSQLConfigBuilder withCdcReplication() {
Expand Down

0 comments on commit bb32bf5

Please sign in to comment.