Skip to content

Commit

Permalink
postgres-source: remove LEGACY state flag (#33437)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Dec 19, 2023
1 parent bbec0a1 commit b3cb243
Show file tree
Hide file tree
Showing 19 changed files with 14 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.1'
cdkVersionRequired = '0.7.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.27
dockerImageTag: 3.3.0
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
final Map<String, String> connectionProperties = getConnectionProperties(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
driverClassName,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
getConnectionProperties(sourceConfig));
connectionProperties,
getConnectionTimeout(connectionProperties, driverClassName));
// Record the data source so that it can be closed.
dataSources.add(dataSource);

Expand Down Expand Up @@ -688,9 +690,6 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set<String>

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return AirbyteStateType.LEGACY;
}
return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand All @@ -34,11 +32,6 @@ public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends

protected PostgresTestDatabase testdb;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
testdb = PostgresTestDatabase.in(BaseImage.POSTGRES_16, ContainerModifier.CERT)
Expand Down Expand Up @@ -105,9 +98,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer;
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
Expand Down Expand Up @@ -77,11 +75,6 @@ private static Database getDatabaseFromConfig(final JsonNode config) {

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

// todo (cgardens) - dynamically create data by generating a database with a random name instead of
// requiring data to already be in place.
@Override
Expand Down Expand Up @@ -141,9 +134,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDataHolder;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier;
Expand All @@ -18,11 +16,6 @@ public class CdcInitialSnapshotPostgresSourceDatatypeTest extends AbstractPostgr

private static final String SCHEMA_NAME = "test";

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected Database setupDatabase() throws Exception {
testdb = PostgresTestDatabase.in(BaseImage.POSTGRES_16, ContainerModifier.CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand Down Expand Up @@ -42,11 +40,6 @@ public class CdcPostgresSourceAcceptanceTest extends AbstractPostgresSourceAccep

protected PostgresTestDatabase testdb;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
testdb = PostgresTestDatabase.in(getServerImage(), ContainerModifier.CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDataHolder;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand Down Expand Up @@ -66,11 +64,6 @@ protected void postSetup() throws Exception {
}
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected Database setupDatabase() {
testdb = PostgresTestDatabase.in(BaseImage.POSTGRES_16, ContainerModifier.CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ public class CloudDeploymentPostgresSourceAcceptanceTest extends SourceAcceptanc
@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingDeploymentMode(
FeatureFlagsWrapper.overridingUseStreamCapableState(
super.featureFlags(),
true),
super.featureFlags(),
AdaptiveSourceRunner.CLOUD_MODE);
}

Expand Down Expand Up @@ -123,9 +121,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand Down Expand Up @@ -43,11 +41,6 @@ public class PostgresSourceAcceptanceTest extends AbstractPostgresSourceAcceptan
private PostgresTestDatabase testdb;
private JsonNode config;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
testdb = PostgresTestDatabase.in(getServerImage());
Expand Down Expand Up @@ -98,11 +91,6 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

@Test
public void testFullRefreshWithRevokingSchemaPermissions() throws Exception {
prepareEnvForUserWithoutPermissions(testdb.getDatabase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier;
import java.sql.SQLException;

public class PostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected Database setupDatabase() throws SQLException {
testdb = PostgresTestDatabase.in(BaseImage.POSTGRES_16, ContainerModifier.CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand Down Expand Up @@ -40,11 +38,6 @@ protected JsonNode getConfig() throws Exception {
.build();
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
testdb = PostgresTestDatabase.in(BaseImage.POSTGRES_12)
Expand Down Expand Up @@ -100,9 +93,4 @@ protected JsonNode getState() throws Exception {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import io.airbyte.cdk.integrations.debezium.internals.postgres.PostgresReplicationConnection;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -82,9 +80,7 @@ protected PostgresTestDatabase createTestDatabase() {

@Override
protected PostgresSource source() {
final var source = new PostgresSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new PostgresSource();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ private Source source() {
PostgresSource source = new PostgresSource();
source.setFeatureFlags(
FeatureFlagsWrapper.overridingDeploymentMode(
FeatureFlagsWrapper.overridingUseStreamCapableState(
new EnvVariableFeatureFlags(),
true),
AdaptiveSourceRunner.CLOUD_MODE));
new EnvVariableFeatureFlags(), AdaptiveSourceRunner.CLOUD_MODE));
return PostgresSource.sshWrappedSource(source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
package io.airbyte.integrations.source.postgres;

import io.airbyte.cdk.integrations.debug.DebugUtil;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;

public class PostgresDebugger {

@SuppressWarnings({"unchecked", "deprecation", "resource"})
public static void main(final String[] args) throws Exception {
final PostgresSource postgresSource = new PostgresSource();
postgresSource.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
DebugUtil.debug(postgresSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreIterators;
Expand Down Expand Up @@ -83,9 +81,7 @@ protected JsonNode config() {

@Override
protected PostgresSource source() {
final var source = new PostgresSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new PostgresSource();
}

@Override
Expand Down Expand Up @@ -367,11 +363,6 @@ void incrementalTimestampCheck() throws Exception {
getTestMessages().get(2)));
}

@Override
protected boolean supportsPerStream() {
return true;
}

/**
* Postgres Source Error Codes:
* <p>
Expand Down Expand Up @@ -447,7 +438,8 @@ public void testUserHasNoPermissionToDataBase() throws Exception {
}

@Test
void testReadMultipleTablesIncrementally() throws Exception {
@Override
protected void testReadMultipleTablesIncrementally() throws Exception {
final var config = config();
((ObjectNode) config).put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1);
final String namespace = getDefaultNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
Expand Down Expand Up @@ -155,9 +153,7 @@ void tearDown() {
}

public PostgresSource source() {
final var source = new PostgresSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new PostgresSource();
}

private static DSLContext getDslContextWithSpecifiedUser(final JsonNode config, final String username, final String password) {
Expand Down
Loading

0 comments on commit b3cb243

Please sign in to comment.