Skip to content

Commit

Permalink
use cloudsql on stage acceptance tests (#11894)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpefaur committed Apr 2, 2024
1 parent 415897e commit 89701bb
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 95 deletions.
2 changes: 2 additions & 0 deletions airbyte-test-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation(libs.bundles.kubernetes.client)
implementation(libs.bundles.flyway)
implementation(libs.temporal.sdk)
implementation(libs.google.cloud.api.client)
implementation(libs.google.cloud.sqladmin)

api(libs.junit.jupiter.api)

Expand Down
17 changes: 17 additions & 0 deletions airbyte-test-utils/readme.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# airbyte-test-utils

Shared Java code for executing TestContainers and other helpers.

## Stage databases setup

When we run acceptance tests on an environment that is not `stage`, a test container will be used for each connector that requires a database. Each test container will be used for only one test case, and it will be deleted once the test case completes.

When we run acceptance tests on stage, things are slightly more complex, but we try to have the same behavior. Instead of using a test container for each connector that requires a database, we will use a CloudSQL database for each connector. Similarly to the test containers, each CloudSQL database will be used for only one test case, and it will be deleted once the test case completes.

It's important to understand how are the different components communicating when running on stage.

![Stage network setup](stage_network_setup.png)

- It is possible to communicate with the `CloudSQL Instance` from both private IP and public ip
- One same `CloudSQL Instance` is use for all the tests, but each test case will create their own databases inside this instance.
- We run the acceptance tests from a `AWS Test Runner` (EC2 instances), which are behind Tailscale, so they can communicate with the CloudSQL instance using its private IP. We need to be able to access the CloudSQL instance from these test runners since the tests will access these databases to validate their content.
- The only IPs that are allowed to connect to the CloudSQL instance via its public IP are the ones that belong to stage Dataplanes (both `GCP Dataplane` and `AWS Dataplane`). Note that this is not a workaround for the sake of our tests, this is the same setup that real users have.


Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.test.container.AirbyteTestContainer;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
Expand All @@ -114,6 +112,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -179,9 +178,6 @@ public class AcceptanceTestHarness {
private static final String SOURCE_USERNAME = "sourceusername";
public static final String SOURCE_PASSWORD = "hunter2";
public static final String PUBLIC_SCHEMA_NAME = "public";
public static final String STAGING_SCHEMA_NAME = "staging";
public static final String COOL_EMPLOYEES_TABLE_NAME = "cool_employees";
public static final String AWESOME_PEOPLE_TABLE_NAME = "awesome_people";
public static final String PUBLIC = "public";

private static final String DEFAULT_POSTGRES_INIT_SQL_FILE = "postgres_init.sql";
Expand All @@ -191,6 +187,8 @@ public class AcceptanceTestHarness {
public static final int MAX_TRIES = 5;
public static final int MAX_ALLOWED_SECOND_PER_RUN = 120;

private static final String CLOUD_SQL_DATABASE_PREFIX = "acceptance_test_";

// NOTE: we include `INCOMPLETE` here because the job may still retry; see
// https://docs.airbyte.com/understanding-airbyte/jobs/.
public static final Set<JobStatus> IN_PROGRESS_JOB_STATUSES = Set.of(JobStatus.PENDING, JobStatus.INCOMPLETE, JobStatus.RUNNING);
Expand All @@ -203,6 +201,7 @@ public class AcceptanceTestHarness {
private static boolean isMac;
private static boolean useExternalDeployment;
private static boolean ensureCleanSlate;
private CloudSqlDatabaseProvisioner cloudSqlDatabaseProvisioner;

/**
* When the acceptance tests are run against a local instance of docker-compose or KUBE then these
Expand All @@ -211,27 +210,30 @@ public class AcceptanceTestHarness {
*/
private PostgreSQLContainer sourcePsql;
private PostgreSQLContainer destinationPsql;
private String sourceDatabaseName;
private String destinationDatabaseName;

private AirbyteTestContainer airbyteTestContainer;
private AirbyteApiClient apiClient;

private WebBackendApi webBackendApi;
private final UUID defaultWorkspaceId;
private final String postgresSqlInitFile;

private KubernetesClient kubernetesClient;

private final List<UUID> sourceIds = Lists.newArrayList();
private final List<UUID> connectionIds = Lists.newArrayList();
private final List<UUID> destinationIds = Lists.newArrayList();
private final List<UUID> operationIds = Lists.newArrayList();
private final List<UUID> sourceDefinitionIds = Lists.newArrayList();
private DataSource sourceDataSource;
private DataSource destinationDataSource;
private String postgresPassword;

public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
private String gcpProjectId;
private String cloudSqlInstanceId;
private String cloudSqlInstanceUsername;
private String cloudSqlInstancePassword;
private String cloudSqlInstancePrivateIp;
private String cloudSqlInstancePublicIp;

public void removeConnection(final UUID connection) {
connectionIds.remove(connection);
Expand All @@ -241,7 +243,7 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient,
final WebBackendApi webBackendApi,
final UUID defaultWorkspaceId,
final String postgresSqlInitFile)
throws URISyntaxException, IOException, InterruptedException {
throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException {
// reads env vars to assign static variables
assignEnvVars();
this.apiClient = apiClient;
Expand All @@ -260,12 +262,16 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient,

destinationPsql = new PostgreSQLContainer(DESTINATION_POSTGRES_IMAGE_NAME);
destinationPsql.start();
}

if (isKube && !isGke) {
// TODO(mfsiega-airbyte): get the Kube client to work with GKE tests. We don't use it yet but we
// will want to someday.
kubernetesClient = new DefaultKubernetesClient();
} else {
this.cloudSqlDatabaseProvisioner = new CloudSqlDatabaseProvisioner();
sourceDatabaseName = cloudSqlDatabaseProvisioner.createDatabase(
gcpProjectId,
cloudSqlInstanceId,
generateRandomCloudSqlDatabaseName());
destinationDatabaseName = cloudSqlDatabaseProvisioner.createDatabase(
gcpProjectId,
cloudSqlInstanceId,
generateRandomCloudSqlDatabaseName());
}

// by default use airbyte deployment governed by a test container.
Expand All @@ -289,7 +295,7 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient,
}

public AcceptanceTestHarness(final AirbyteApiClient apiClient, final WebBackendApi webBackendApi, final UUID defaultWorkspaceId)
throws URISyntaxException, IOException, InterruptedException {
throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException {
this(apiClient, webBackendApi, defaultWorkspaceId, DEFAULT_POSTGRES_INIT_SQL_FILE);
}

Expand All @@ -314,9 +320,17 @@ public void stopDbAndContainers() {
public void setup() throws SQLException, URISyntaxException, IOException, ApiException {
if (isGke) {
// Prepare the database data sources.
LOGGER.info("postgresPassword: {}", postgresPassword);
sourceDataSource = GKEPostgresConfig.getSourceDataSource(postgresPassword);
destinationDataSource = GKEPostgresConfig.getDestinationDataSource(postgresPassword);
LOGGER.info("postgresPassword: {}", cloudSqlInstancePassword);
sourceDataSource = GKEPostgresConfig.getDataSource(
cloudSqlInstanceUsername,
cloudSqlInstancePassword,
cloudSqlInstancePrivateIp,
sourceDatabaseName);
destinationDataSource = GKEPostgresConfig.getDataSource(
cloudSqlInstanceUsername,
cloudSqlInstancePassword,
cloudSqlInstancePrivateIp,
destinationDatabaseName);
// seed database.
GKEPostgresConfig.runSqlScript(Path.of(MoreResources.readResourceAsFile(postgresSqlInitFile).toURI()), getSourceDatabase());
} else {
Expand Down Expand Up @@ -362,6 +376,15 @@ public void cleanup() {
if (isGke) {
DataSourceFactory.close(sourceDataSource);
DataSourceFactory.close(destinationDataSource);

cloudSqlDatabaseProvisioner.deleteDatabase(
gcpProjectId,
cloudSqlInstanceId,
sourceDatabaseName);
cloudSqlDatabaseProvisioner.deleteDatabase(
gcpProjectId,
cloudSqlInstanceId,
destinationDatabaseName);
} else {
destinationPsql.stop();
sourcePsql.stop();
Expand Down Expand Up @@ -432,9 +455,12 @@ private void assignEnvVars() {
&& System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
ensureCleanSlate = System.getenv("ENSURE_CLEAN_SLATE") != null
&& System.getenv("ENSURE_CLEAN_SLATE").equalsIgnoreCase("true");
postgresPassword = System.getenv("POSTGRES_PASSWORD") != null
? System.getenv("POSTGRES_PASSWORD")
: "admin123";
gcpProjectId = System.getenv("GCP_PROJECT_ID");
cloudSqlInstanceId = System.getenv("CLOUD_SQL_INSTANCE_ID");
cloudSqlInstanceUsername = System.getenv("CLOUD_SQL_INSTANCE_USERNAME");
cloudSqlInstancePassword = System.getenv("CLOUD_SQL_INSTANCE_PASSWORD");
cloudSqlInstancePrivateIp = System.getenv("CLOUD_SQL_INSTANCE_PRIVATE_IP");
cloudSqlInstancePublicIp = System.getenv("CLOUD_SQL_INSTANCE_PUBLIC_IP");
}

private WorkflowClient getWorkflowClient() {
Expand Down Expand Up @@ -767,25 +793,29 @@ public List<JsonNode> retrieveRecordsFromDatabase(final Database database, final
}

public JsonNode getSourceDbConfig() {
return getDbConfig(sourcePsql, false, false, Type.SOURCE);
return getDbConfig(sourcePsql, false, false, sourceDatabaseName);
}

public JsonNode getDestinationDbConfig() {
return getDbConfig(destinationPsql, false, true, Type.DESTINATION);
return getDbConfig(destinationPsql, false, true, destinationDatabaseName);
}

public JsonNode getDestinationDbConfigWithHiddenPassword() {
return getDbConfig(destinationPsql, true, true, Type.DESTINATION);
return getDbConfig(destinationPsql, true, true, destinationDatabaseName);
}

public JsonNode getDbConfig(final PostgreSQLContainer psql,
final boolean hiddenPassword,
final boolean withSchema,
final Type connectorType) {
final String databaseName) {
try {
final Map<Object, Object> dbConfig =
(isKube && isGke) ? GKEPostgresConfig.dbConfig(connectorType, hiddenPassword ? null : postgresPassword, withSchema)
: localConfig(psql, hiddenPassword, withSchema);
(isKube && isGke) ? GKEPostgresConfig.dbConfig(
hiddenPassword ? null : cloudSqlInstancePassword,
withSchema,
cloudSqlInstanceUsername,
cloudSqlInstancePublicIp,
databaseName) : localConfig(psql, hiddenPassword, withSchema);
final var config = Jsons.jsonNode(dbConfig);
LOGGER.info("Using db config: {}", Jsons.toPrettyString(config));
return config;
Expand Down Expand Up @@ -1227,14 +1257,6 @@ public StreamStatusReadList getStreamStatuses(UUID connectionId, Long jobId, Int
"get stream statuses", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
}

/**
* Connector type.
*/
public enum Type {
SOURCE,
DESTINATION
}

public void setIncrementalAppendSyncMode(final AirbyteCatalog airbyteCatalog, final List<String> cursorField) {
airbyteCatalog.getStreams().forEach(stream -> {
stream.getConfig().syncMode(SyncMode.INCREMENTAL)
Expand Down Expand Up @@ -1303,4 +1325,8 @@ public void compareCatalog(AirbyteCatalog actual) {
assertEquals(expected, actual);
}

private static String generateRandomCloudSqlDatabaseName() {
return CLOUD_SQL_DATABASE_PREFIX + UUID.randomUUID();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.test.utils;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.sqladmin.SQLAdmin;
import com.google.api.services.sqladmin.model.Database;
import com.google.api.services.sqladmin.model.Operation;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Creates and deletes GCP CloudSQL databases.
*/
public class CloudSqlDatabaseProvisioner {

private static final Logger LOGGER = LoggerFactory.getLogger(CloudSqlDatabaseProvisioner.class);

private static final String SQL_OPERATION_DONE_STATUS = "DONE";
private static final int DEFAULT_MAX_POLL_ATTEMPTS = 10;
private static final int DEFAULT_MAX_API_CALL_ATTEMPTS = 10;
private static final String APPLICATION_NAME = "cloud-sql-database-provisioner";

private final SQLAdmin sqlAdmin;
private final int maxPollAttempts;
private final int maxApiCallAttempts;

@VisibleForTesting
CloudSqlDatabaseProvisioner(SQLAdmin sqlAdmin, int maxPollAttempts, int maxApiCallAttempts) {
this.sqlAdmin = sqlAdmin;
this.maxPollAttempts = maxPollAttempts;
this.maxApiCallAttempts = maxApiCallAttempts;
}

public CloudSqlDatabaseProvisioner() throws GeneralSecurityException, IOException {
this.sqlAdmin = new SQLAdmin.Builder(
GoogleNetHttpTransport.newTrustedTransport(),
GsonFactory.getDefaultInstance(),
new HttpCredentialsAdapter(GoogleCredentials.getApplicationDefault())).setApplicationName(APPLICATION_NAME).build();
this.maxPollAttempts = DEFAULT_MAX_POLL_ATTEMPTS;
this.maxApiCallAttempts = DEFAULT_MAX_API_CALL_ATTEMPTS;
}

public synchronized String createDatabase(String projectId, String instanceId, String databaseName) throws IOException, InterruptedException {
Database database = new Database().setName(databaseName);
Operation operation = runWithRetry(() -> sqlAdmin.databases().insert(projectId, instanceId, database).execute());
pollOperation(projectId, operation.getName());

return databaseName;
}

public synchronized void deleteDatabase(String projectId, String instanceId, String databaseName) throws IOException, InterruptedException {
Operation operation = runWithRetry(() -> sqlAdmin.databases().delete(projectId, instanceId, databaseName).execute());
pollOperation(projectId, operation.getName());
}

/**
* Database operations are asynchronous. This method polls the operation until it is done.
*/
@VisibleForTesting
void pollOperation(String projectId, String operationName) throws IOException, InterruptedException {
int pollAttempts = 0;
while (pollAttempts < maxPollAttempts) {
Operation operation = sqlAdmin.operations().get(projectId, operationName).execute();
if (operation.getStatus().equals(SQL_OPERATION_DONE_STATUS)) {
return;
}
Thread.sleep(1000);
pollAttempts += 1;
}

throw new RuntimeException("Operation " + operationName + " did not complete successfully");
}

/**
* If there's another operation already in progress in one same cloudsql instance then the api will
* return a 409 error. This method will retry api calls that return a 409 error.
*/
@VisibleForTesting
Operation runWithRetry(Callable<Operation> callable) throws InterruptedException {
int attempts = 0;
while (attempts < maxApiCallAttempts) {
try {
return callable.call();
} catch (Exception e) {
if (e instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) e).getStatusCode() == 409) {
attempts++;
LOGGER.info("Attempt " + attempts + " failed with 409 error");
LOGGER.info("Exception thrown by API: " + e.getMessage());
Thread.sleep(1000);
} else {
throw new RuntimeException(e);
}
}
}
throw new RuntimeException("Max retries exceeded. Could not complete operation.");
}

}
Loading

0 comments on commit 89701bb

Please sign in to comment.