Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SSL options to mssql #33071

Merged
merged 9 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------|
| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' |
| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage |
| 0.10.1 | 2023-12-21 | [\#33723](https://github.com/airbytehq/airbyte/pull/33723) | Make memory-manager log message less scary |
| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public static URI keyStoreFromCertificate(final String certString,
return keyStoreFromCertificate(fromPEMString(certString), keyStorePassword, filesystem, directory);
}

public static URI keyStoreFromCertificate(final String certString,
final String keyStorePassword)
throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException {
return keyStoreFromCertificate(fromPEMString(certString), keyStorePassword, null, null);
}

public static URI keyStoreFromCertificate(final String certString, final String keyStorePassword, final String directory)
throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException {
return keyStoreFromCertificate(certString, keyStorePassword, FileSystems.getDefault(), directory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.10.3
version=0.10.4
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,12 @@ protected long getActualCursorRecordCount(final Connection connection,

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER);
}

public JdbcDatabase createDatabase(final JsonNode sourceConfig, String delimiter) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
Map<String, String> connectionProperties = JdbcDataSourceUtils.getConnectionProperties(sourceConfig);
Map<String, String> connectionProperties = JdbcDataSourceUtils.getConnectionProperties(sourceConfig, delimiter);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public static void assertCustomParametersDontOverwriteDefaultParameters(final Ma
* @return A mapping of connection properties
*/
public static Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY);
return getConnectionProperties(config, DEFAULT_JDBC_PARAMETERS_DELIMITER);
}

public static Map<String, String> getConnectionProperties(final JsonNode config, String parameterDelimiter) {
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY, parameterDelimiter);
final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.7'
cdkVersionRequired = '0.10.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.4.1
dockerImageTag: 3.5.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,17 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi
props.setProperty("driver.trustServerCertificate", "true");
} else if ("encrypted_verify_certificate".equals(sslMethod)) {
props.setProperty("driver.encrypt", "true");
props.setProperty("driver.trustServerCertificate", "false");
if (dbConfig.has("trustStore") && !dbConfig.get("trustStore").asText().isEmpty()) {
props.setProperty("database.ssl.truststore", dbConfig.get("trustStore").asText());
props.setProperty("database.trustStore", dbConfig.get("trustStore").asText());
}

if (dbConfig.has("trustStorePassword") && !dbConfig.get("trustStorePassword").asText().isEmpty()) {
props.setProperty("database.ssl.truststore.password", dbConfig.get("trustStorePassword").asText());
props.setProperty("database.trustStorePassword", dbConfig.get("trustStorePassword").asText());
}

if (dbConfig.has("hostNameInCertificate") && !dbConfig.get("hostNameInCertificate").asText().isEmpty()) {
props.setProperty("driver.hostNameInCertificate", dbConfig.get("hostNameInCertificate").asText());
props.setProperty("database.hostNameInCertificate", dbConfig.get("hostNameInCertificate").asText());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.cdk.db.util.SSLCertificateUtils;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.Source;
import io.airbyte.cdk.integrations.base.adaptive.AdaptiveSourceRunner;
Expand All @@ -51,7 +52,11 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.connector.sqlserver.Lsn;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand All @@ -67,6 +72,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,6 +100,8 @@ SELECT CAST(IIF(EXISTS(SELECT TOP 1 1 FROM "%s"."%s" WHERE "%s" IS NULL), 1, 0)
public static final String NO_TUNNEL = "NO_TUNNEL";
public static final String SSL_METHOD = "ssl_method";
public static final String SSL_METHOD_UNENCRYPTED = "unencrypted";

public static final String JDBC_DELIMITER = ";";
private List<String> schemas;

public static Source sshWrappedSource(MssqlSource source) {
Expand Down Expand Up @@ -599,30 +607,33 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
private void readSsl(final JsonNode sslMethod, final List<String> additionalParameters) {
final JsonNode config = sslMethod.get("ssl_method");
switch (config.get("ssl_method").asText()) {
case "unencrypted" -> additionalParameters.add("encrypt=false");
case "unencrypted" -> {
additionalParameters.add("encrypt=false");
additionalParameters.add("trustServerCertificate=true");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only added for clarity. By default, encrypt=false implies trustServerCertificate=true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trustServerCertificate is trust all essentially.

This will trust any server certificate. I assume for the exchange of credentials which is still encrypted event when encrypt=false (you mentioned this offline)

}
case "encrypted_trust_server_certificate" -> {
additionalParameters.add("encrypt=true");
additionalParameters.add("trustServerCertificate=true");
}
case "encrypted_verify_certificate" -> {
additionalParameters.add("encrypt=true");

// trust store location code found at https://stackoverflow.com/a/56570588
final String trustStoreLocation = Optional
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was really not doing anything. If anything, it'd guarantee that the connection would fail (because with this, the sever certificate would have to be signed by one of the root authorities in the default trust store)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old behavior is equivalent to not pushing anything as a trustStorelocation JDBC parameer

.ofNullable(System.getProperty("javax.net.ssl.trustStore"))
.orElseGet(() -> System.getProperty("java.home") + "/lib/security/cacerts");
final File trustStoreFile = new File(trustStoreLocation);
if (!trustStoreFile.exists()) {
throw new RuntimeException(
"Unable to locate the Java TrustStore: the system property javax.net.ssl.trustStore is undefined or "
+ trustStoreLocation + " does not exist.");
}
final String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
additionalParameters.add("trustStore=" + trustStoreLocation);
if (trustStorePassword != null && !trustStorePassword.isEmpty()) {
additionalParameters.add("trustServerCertificate=false");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added for clarity. By default, encrypt=true implies trustServerCertificate=false


if (config.has("certificate")) {
stephane-airbyte marked this conversation as resolved.
Show resolved Hide resolved
String certificate = config.get("certificate").asText();
String password = RandomStringUtils.randomAlphanumeric(100);
final URI keyStoreUri;
try {
keyStoreUri = SSLCertificateUtils.keyStoreFromCertificate(certificate, password, null, null);
} catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
throw new RuntimeException(e);
}
additionalParameters
.add("trustStorePassword=" + config.get("trustStorePassword").asText());
.add("trustStore=" + keyStoreUri.getPath());
additionalParameters
.add("trustStorePassword=" + password);
}

if (config.has("hostNameInCertificate")) {
additionalParameters
.add("hostNameInCertificate=" + config.get("hostNameInCertificate").asText());
Expand All @@ -639,6 +650,11 @@ public Duration getConnectionTimeoutMssql(final Map<String, String> connectionPr
return getConnectionTimeout(connectionProperties);
}

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
return createDatabase(sourceConfig, JDBC_DELIMITER);
}

public static void main(final String[] args) throws Exception {
final Source source = MssqlSource.sshWrappedSource(new MssqlSource());
LOGGER.info("starting source: {}", MssqlSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"required": ["host", "port", "database", "username", "password"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the password is required in the parameter parsing logic in MssqlSource.java. Making it mandatory in the UI as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change? What is the expected behavior here?
I'm thinking of existing configs for example saved without a password

"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down Expand Up @@ -90,7 +90,7 @@
{
"title": "Encrypted (verify certificate)",
"description": "Verify and use the certificate provided by the server.",
"required": ["ssl_method", "trustStoreName", "trustStorePassword"],
"required": ["ssl_method"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless I'm missing something, the previous required fields didn't even exist in the spec.json. Removing them...

"properties": {
"ssl_method": {
"type": "string",
Expand All @@ -100,7 +100,15 @@
"title": "Host Name In Certificate",
"type": "string",
"description": "Specifies the host name of the server. The value of this property must match the subject property of the certificate.",
"order": 7
"order": 0
},
"certificate": {
"title": "Certificate",
"type": "string",
"description": "certificate of the server, or of the CA that signed the server certificate",
"order": 1,
"airbyte_secret": true,
"multiline": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"host": "default",
"port": 5555,
"database": "default",
"username": "default"
"username": "default",
"password": "default"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that the password is a mandatory field, it needs to be added here too

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"required": ["host", "port", "database", "username", "password"],
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down Expand Up @@ -90,7 +90,7 @@
{
"title": "Encrypted (verify certificate)",
"description": "Verify and use the certificate provided by the server.",
"required": ["ssl_method", "trustStoreName", "trustStorePassword"],
"required": ["ssl_method"],
"properties": {
"ssl_method": {
"type": "string",
Expand All @@ -100,7 +100,15 @@
"title": "Host Name In Certificate",
"type": "string",
"description": "Specifies the host name of the server. The value of this property must match the subject property of the certificate.",
"order": 7
"order": 0
},
"certificate": {
"title": "Certificate",
"type": "string",
"description": "certificate of the server, or of the CA that signed the server certificate",
"order": 1,
"airbyte_secret": true,
"multiline": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.cdk.integrations.JdbcConnector;
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
Expand All @@ -47,9 +48,12 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.utility.DockerImageName;

@TestInstance(Lifecycle.PER_CLASS)
public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestDatabase> {

static private final String CDC_ROLE_NAME = "cdc_selector";
Expand All @@ -58,30 +62,37 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData

// Deliberately do not share this test container, as we're going to mutate the global SQL Server
// state.
static private final MSSQLServerContainer<?> UNSHARED_CONTAINER = new MsSQLContainerFactory()
.createNewContainer(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"));
private static final Duration CONNECTION_TIME = Duration.ofSeconds(60);
protected final MSSQLServerContainer<?> privateContainer;

private DataSource testDataSource;

CdcMssqlSourceTest() {
this.privateContainer = createContainer();
}

protected MSSQLServerContainer<?> createContainer() {
return new MsSQLContainerFactory()
.createNewContainer(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"));
}

@BeforeAll
static public void beforeAll() {
new MsSQLContainerFactory().withAgent(UNSHARED_CONTAINER);
UNSHARED_CONTAINER.start();
public void beforeAll() {
new MsSQLContainerFactory().withAgent(privateContainer);
privateContainer.start();
}

@AfterAll
static void afterAll() {
UNSHARED_CONTAINER.close();
void afterAll() {
privateContainer.close();
}

private String testUserName() {
protected final String testUserName() {
return testdb.withNamespace(TEST_USER_NAME_PREFIX);
}

@Override
protected MsSQLTestDatabase createTestDatabase() {
final var testdb = new MsSQLTestDatabase(UNSHARED_CONTAINER);
final var testdb = new MsSQLTestDatabase(privateContainer);
return testdb
.withConnectionProperty("encrypt", "false")
.withConnectionProperty("databaseName", testdb.getDatabaseName())
Expand Down Expand Up @@ -139,13 +150,17 @@ protected void setup() {
.with("USE [%s]", testdb.getDatabaseName())
.with("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, testUserName());

testDataSource = DataSourceFactory.create(
testDataSource = createTestDataSource();
}

protected DataSource createTestDataSource() {
return DataSourceFactory.create(
testUserName(),
testdb.getPassword(),
testdb.getDatabaseDriver().getDriverClassName(),
testdb.getJdbcUrl(),
Map.of("encrypt", "false"),
CONNECTION_TIME);
JdbcConnector.CONNECT_TIMEOUT_DEFAULT);
}

@Override
Expand Down
Loading
Loading