Skip to content

Commit

Permalink
mssql-source: remove LEGACY state flag (#33481)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Dec 19, 2023
1 parent 5b915c6 commit 7c9394f
Show file tree
Hide file tree
Showing 16 changed files with 25 additions and 85 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.5'
cdkVersionRequired = '0.7.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.3.2
dockerImageTag: 3.4.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ public MssqlSource() {

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return AirbyteStateType.LEGACY;
}

return MssqlCdcHelper.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

Expand Down Expand Up @@ -639,6 +635,10 @@ private boolean cloudDeploymentMode() {
return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(featureFlags.deploymentMode());
}

public Duration getConnectionTimeoutMssql(final Map<String, String> connectionProperties) {
return getConnectionTimeout(connectionProperties);
}

public static void main(final String[] args) throws Exception {
final Source source = MssqlSource.sshWrappedSource(new MssqlSource());
LOGGER.info("starting source: {}", MssqlSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
Expand All @@ -33,11 +31,6 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
private static final String STREAM_NAME = "dbo.id_and_name";
private static final String STREAM_NAME2 = "dbo.starships";

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

public abstract SshTunnel.TunnelMethod getTunnelMethod();

protected MsSQLTestDatabase testdb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
Expand Down Expand Up @@ -47,11 +45,6 @@ protected String getImageName() {
return "airbyte/source-mssql:dev";
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;

Expand All @@ -22,11 +20,6 @@ protected JsonNode getConfig() {
.build();
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected Database setupDatabase() {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.protocol.models.Field;
Expand Down Expand Up @@ -42,11 +40,6 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {

protected MsSQLTestDatabase testdb;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws SQLException {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;

public class MssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest {
Expand All @@ -18,11 +16,6 @@ protected Database setupDatabase() {
return testdb.getDatabase();
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import java.util.Map;

public class SslEnabledMssqlSourceAcceptanceTest extends MssqlSourceAcceptanceTest {
Expand All @@ -19,11 +17,6 @@ protected JsonNode getConfig() {
.build();
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) {
final var container = new MsSQLContainerFactory().shared("mcr.microsoft.com/mssql/server:2022-latest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -62,6 +60,7 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
// state.
static private final MSSQLServerContainer<?> UNSHARED_CONTAINER = new MsSQLContainerFactory()
.createNewContainer(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"));
private static final Duration CONNECTION_TIME = Duration.ofSeconds(60);

private DataSource testDataSource;

Expand Down Expand Up @@ -94,9 +93,7 @@ protected MsSQLTestDatabase createTestDatabase() {

@Override
protected MssqlSource source() {
final var source = new MssqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MssqlSource();
}

@Override
Expand Down Expand Up @@ -147,7 +144,8 @@ protected void setup() {
testdb.getPassword(),
testdb.getDatabaseDriver().getDriverClassName(),
testdb.getJdbcUrl(),
Map.of("encrypt", "false"));
Map.of("encrypt", "false"),
CONNECTION_TIME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.google.common.collect.Lists;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.Field;
Expand Down Expand Up @@ -148,9 +146,7 @@ private ConfiguredAirbyteCatalog getConfiguredCatalog() {
}

private MssqlSource source() {
final var source = new MssqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MssqlSource();
}

private JsonNode config() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ public class MssqlDataSourceFactoryTest {

@Test
protected void testCreatingDataSourceWithConnectionTimeoutSetBelowDefault() {
MSSQLServerContainer container = new MsSQLContainerFactory().shared("mcr.microsoft.com/mssql/server:2019-latest");
Map<String, String> connectionProperties = Map.of("loginTimeout", String.valueOf(5));
final MSSQLServerContainer container = new MsSQLContainerFactory().shared("mcr.microsoft.com/mssql/server:2019-latest");
final Map<String, String> connectionProperties = Map.of("loginTimeout", String.valueOf(5));
final DataSource dataSource = DataSourceFactory.create(
container.getUsername(),
container.getPassword(),
container.getDriverClassName(),
container.getJdbcUrl(),
connectionProperties);
connectionProperties,
new MssqlSource().getConnectionTimeoutMssql(connectionProperties));
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(5000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
Expand Down Expand Up @@ -45,9 +43,7 @@ protected JsonNode config() {

@Override
protected MssqlSource source() {
final MssqlSource source = new MssqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MssqlSource();
}

@Override
Expand Down Expand Up @@ -157,9 +153,4 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.protocol.models.Field;
Expand Down Expand Up @@ -42,9 +40,7 @@ class MssqlSourceTest {
private MsSQLTestDatabase testdb;

private MssqlSource source() {
final MssqlSource source = new MssqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MssqlSource();
}

// how to interact with the mssql test container manaully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcStressTest;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import java.sql.JDBCType;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import javax.sql.DataSource;
Expand All @@ -31,6 +30,7 @@
@Disabled
public class MssqlStressTest extends JdbcStressTest {

private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60);
private static MSSQLServerContainer<?> dbContainer;
private JsonNode config;

Expand All @@ -56,7 +56,8 @@ public void setup() throws Exception {
String.format("jdbc:sqlserver://%s:%d;",
configWithoutDbName.get(JdbcUtils.HOST_KEY).asText(),
configWithoutDbName.get(JdbcUtils.PORT_KEY).asInt()),
Map.of("encrypt", "false"));
Map.of("encrypt", "false"),
CONNECTION_TIMEOUT);

try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
Expand Down Expand Up @@ -92,9 +93,7 @@ public JsonNode getConfig() {

@Override
public AbstractJdbcSource<JDBCType> getSource() {
final MssqlSource source = new MssqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MssqlSource();
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.2 | 2023-12-14 | [33505](https://github.com/airbytehq/airbyte/pull/33225) | Using the released CDK. |
| 3.3.1 | 2023-12-12 | [33225](https://github.com/airbytehq/airbyte/pull/33225) | extracting MsSql specific files out of the CDK. |
| 3.4.0 | 2023-12-19 | [33481](https://github.com/airbytehq/airbyte/pull/33481) | Remove LEGACY state flag |
| 3.3.2 | 2023-12-14 | [33505](https://github.com/airbytehq/airbyte/pull/33225) | Using the released CDK. |
| 3.3.1 | 2023-12-12 | [33225](https://github.com/airbytehq/airbyte/pull/33225) | extracting MsSql specific files out of the CDK. |
| 3.3.0 | 2023-12-12 | [33018](https://github.com/airbytehq/airbyte/pull/33018) | Migrate to Per-stream/Global states and away from Legacy states |
| 3.2.1 | 2023-12-11 | [33330](https://github.com/airbytehq/airbyte/pull/33330) | Parse DatetimeOffset fields with the correct format when used as cursor |
| 3.2.0 | 2023-12-07 | [33225](https://github.com/airbytehq/airbyte/pull/33225) | CDC : Enable compression of schema history blob in state. |
Expand Down

0 comments on commit 7c9394f

Please sign in to comment.