Skip to content

Commit

Permalink
java CDK, source-postgres: parallelize tests (#32314)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Nov 8, 2023
1 parent 49043b5 commit 5013f95
Show file tree
Hide file tree
Showing 40 changed files with 1,419 additions and 1,617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
public static final String CONCURRENT_SOURCE_STREAM_READ = "CONCURRENT_SOURCE_STREAM_READ";
public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";
public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";

@Override
public boolean useStreamCapableState() {
Expand Down Expand Up @@ -63,6 +64,11 @@ public String strictComparisonNormalizationTag() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison2", (arg) -> arg);
}

@Override
public String deploymentMode() {
return getEnvOrDefault(DEPLOYMENT_MODE, "", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
final String value = System.getenv(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ public interface FeatureFlags {
*/
String strictComparisonNormalizationTag();

/**
* Get the deployment mode used to deploy a connector.
*
* @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode.
*/
String deploymentMode();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.features;

public class FeatureFlagsWrapper implements FeatureFlags {

/**
* Overrides the {@link FeatureFlags#useStreamCapableState} method in the feature flags.
*/
static public FeatureFlags overridingUseStreamCapableState(
final FeatureFlags wrapped,
final boolean useStreamCapableState) {
return new FeatureFlagsWrapper(wrapped) {

@Override
public boolean useStreamCapableState() {
return useStreamCapableState;
}

};
}

/**
* Overrides the {@link FeatureFlags#deploymentMode} method in the feature flags.
*/
static public FeatureFlags overridingDeploymentMode(
final FeatureFlags wrapped,
final String deploymentMode) {
return new FeatureFlagsWrapper(wrapped) {

@Override
public String deploymentMode() {
return deploymentMode;
}

};
}

private final FeatureFlags wrapped;

public FeatureFlagsWrapper(FeatureFlags wrapped) {
this.wrapped = wrapped;
}

@Override
public boolean useStreamCapableState() {
return wrapped.useStreamCapableState();
}

@Override
public boolean autoDetectSchema() {
return wrapped.autoDetectSchema();
}

@Override
public boolean logConnectorMessages() {
return wrapped.logConnectorMessages();
}

@Override
public boolean concurrentSourceStreamRead() {
return wrapped.concurrentSourceStreamRead();
}

@Override
public boolean applyFieldSelection() {
return wrapped.applyFieldSelection();
}

@Override
public String fieldSelectionWorkspaces() {
return wrapped.fieldSelectionWorkspaces();
}

@Override
public String strictComparisonNormalizationWorkspaces() {
return wrapped.strictComparisonNormalizationWorkspaces();
}

@Override
public String strictComparisonNormalizationTag() {
return wrapped.strictComparisonNormalizationTag();
}

@Override
public String deploymentMode() {
return wrapped.deploymentMode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.cdk.integrations.base.IntegrationCliParser;
import io.airbyte.cdk.integrations.base.IntegrationConfig;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -23,7 +24,7 @@ public class AdaptiveDestinationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveDestinationRunner.class);

private static final String DEPLOYMENT_MODE_KEY = "DEPLOYMENT_MODE";
private static final String DEPLOYMENT_MODE_KEY = EnvVariableFeatureFlags.DEPLOYMENT_MODE;
private static final String CLOUD_MODE = "CLOUD";

public static OssDestinationBuilder baseOnEnv() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.Source;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,7 +18,7 @@ public class AdaptiveSourceRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveSourceRunner.class);

public static final String DEPLOYMENT_MODE_KEY = "DEPLOYMENT_MODE";
public static final String DEPLOYMENT_MODE_KEY = EnvVariableFeatureFlags.DEPLOYMENT_MODE;
public static final String CLOUD_MODE = "CLOUD";

public static OssSourceBuilder baseOnEnv() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.4.5
version=0.4.6
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Connection createConnection(final JsonNode jdbcConfig) throws SQLE
validateReplicationConnection(connection);
return connection;
} catch (final PSQLException exception) {
if (exception.getMessage().equals("FATAL: must be superuser or replication role to start walsender")) {
if ("42501".equals(exception.getSQLState())) { // insufficient_privilege
throw new ConfigErrorException(String.format(REPLICATION_PRIVILEGE_ERROR_MESSAGE, jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText()));
}
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.cdk.db.AbstractDatabase;
Expand Down Expand Up @@ -75,8 +76,14 @@ public abstract class AbstractDbSource<DataType, Database extends AbstractDataba
public static final String READ_TRACE_OPERATION_NAME = "read-operation";

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDbSource.class);

// TODO: Remove when the flag is not use anymore
private final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
protected FeatureFlags featureFlags = new EnvVariableFeatureFlags();

@VisibleForTesting
public void setFeatureFlags(FeatureFlags featureFlags) {
this.featureFlags = featureFlags;
}

@Override
@Trace(operationName = CHECK_TRACE_OPERATION_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ public void setup() throws Exception {
});
}

protected void maybeSetShorterConnectionTimeout() {
// Optionally implement this to speed up test cases which will result in a connection timeout.
}

protected DataSource getDataSource(final JsonNode jdbcConfig) {
return DataSourceFactory.create(
jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(),
Expand Down Expand Up @@ -316,6 +320,7 @@ void testCheckSuccess() throws Exception {

@Test
void testCheckFailure() throws Exception {
maybeSetShorterConnectionTimeout();
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake");
final AirbyteConnectionStatus actual = source.check(config);
assertEquals(Status.FAILED, actual.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.configoss.JobGetSpecConfig;
import io.airbyte.configoss.StandardCheckConnectionInput;
Expand Down Expand Up @@ -45,6 +46,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -142,12 +144,14 @@ public void setUpInternal() throws Exception {
when(mSourceApi.writeDiscoverCatalogResult(any()))
.thenReturn(new DiscoverCatalogResult().catalogId(CATALOG_ID));
mConnectorConfigUpdater = mock(ConnectorConfigUpdater.class);
var envMap = new HashMap<>(new TestEnvConfigs().getJobDefaultEnvMap());
envMap.put(EnvVariableFeatureFlags.DEPLOYMENT_MODE, featureFlags().deploymentMode());
processFactory = new DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
"host",
new TestEnvConfigs().getJobDefaultEnvMap());
envMap);

postSetup();
}
Expand All @@ -163,26 +167,30 @@ public void tearDownInternal() throws Exception {
tearDown(environment);
}

protected FeatureFlags featureFlags() {
return new EnvVariableFeatureFlags();
}

protected ConnectorSpecification runSpec() throws TestHarnessException {
final io.airbyte.protocol.models.ConnectorSpecification spec = new DefaultGetSpecTestHarness(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false,
new EnvVariableFeatureFlags()))
featureFlags()))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec();
return convertProtocolObject(spec, ConnectorSpecification.class);
}

protected StandardCheckConnectionOutput runCheck() throws Exception {
return new DefaultCheckConnectionTestHarness(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false,
new EnvVariableFeatureFlags()),
featureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot).getCheckConnection();
}

protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exception {
return new DefaultCheckConnectionTestHarness(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false,
new EnvVariableFeatureFlags()),
featureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString();
}
Expand All @@ -191,7 +199,7 @@ protected UUID runDiscover() throws Exception {
final UUID toReturn = new DefaultDiscoverCatalogTestHarness(
mAirbyteApiClient,
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false,
new EnvVariableFeatureFlags()),
featureFlags()),
mConnectorConfigUpdater)
.run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot)
.getDiscoverCatalogId();
Expand Down Expand Up @@ -222,12 +230,10 @@ protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog catalog, f
.withState(state == null ? null : new State().withState(state))
.withCatalog(convertProtocolObject(catalog, io.airbyte.protocol.models.ConfiguredAirbyteCatalog.class));

final var featureFlags = new EnvVariableFeatureFlags();

final AirbyteSource source = new DefaultAirbyteSource(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false,
featureFlags),
featureFlags);
featureFlags()),
featureFlags());
final List<AirbyteMessage> messages = new ArrayList<>();
source.start(sourceConfig, jobRoot);
while (!source.isFinished()) {
Expand Down Expand Up @@ -266,7 +272,6 @@ protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final Configure
}

private AirbyteSource prepareAirbyteSource() {
final var featureFlags = new EnvVariableFeatureFlags();
final var integrationLauncher = new AirbyteIntegrationLauncher(
JOB_ID,
JOB_ATTEMPT,
Expand All @@ -275,8 +280,8 @@ private AirbyteSource prepareAirbyteSource() {
null,
null,
false,
featureFlags);
return new DefaultAirbyteSource(integrationLauncher, featureFlags);
featureFlags());
return new DefaultAirbyteSource(integrationLauncher, featureFlags());
}

private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
Expand Down
Loading

0 comments on commit 5013f95

Please sign in to comment.