diff --git a/airbyte-test-utils/build.gradle.kts b/airbyte-test-utils/build.gradle.kts index 25ea49c2deb..d627290b99b 100644 --- a/airbyte-test-utils/build.gradle.kts +++ b/airbyte-test-utils/build.gradle.kts @@ -26,6 +26,8 @@ dependencies { implementation(libs.bundles.kubernetes.client) implementation(libs.bundles.flyway) implementation(libs.temporal.sdk) + implementation(libs.google.cloud.api.client) + implementation(libs.google.cloud.sqladmin) api(libs.junit.jupiter.api) diff --git a/airbyte-test-utils/readme.md b/airbyte-test-utils/readme.md index f75ba4e74b0..fb22cc4b538 100644 --- a/airbyte-test-utils/readme.md +++ b/airbyte-test-utils/readme.md @@ -1,3 +1,20 @@ # airbyte-test-utils Shared Java code for executing TestContainers and other helpers. + +## Stage databases setup + +When we run acceptance tests on an environment that is not `stage`, a test container will be used for each connector that requires a database. Each test container will be used for only one test case, and it will be deleted once the test case completes. + +When we run acceptance tests on stage, things are slightly more complex, but we try to have the same behavior. Instead of using a test container for each connector that requires a database, we will use a CloudSQL database for each connector. Similarly to the test containers, each CloudSQL database will be used for only one test case, and it will be deleted once the test case completes. + +It's important to understand how are the different components communicating when running on stage. + +![Stage network setup](stage_network_setup.png) + +- It is possible to communicate with the `CloudSQL Instance` from both private IP and public ip +- One same `CloudSQL Instance` is use for all the tests, but each test case will create their own databases inside this instance. +- We run the acceptance tests from a `AWS Test Runner` (EC2 instances), which are behind Tailscale, so they can communicate with the CloudSQL instance using its private IP. We need to be able to access the CloudSQL instance from these test runners since the tests will access these databases to validate their content. +- The only IPs that are allowed to connect to the CloudSQL instance via its public IP are the ones that belong to stage Dataplanes (both `GCP Dataplane` and `AWS Dataplane`). Note that this is not a workaround for the sake of our tests, this is the same setup that real users have. + + diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java index f497fb8366f..a015f6bc48d 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java @@ -103,8 +103,6 @@ import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.test.container.AirbyteTestContainer; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClient; import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; @@ -114,6 +112,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.file.Path; +import java.security.GeneralSecurityException; import java.sql.SQLException; import java.time.Duration; import java.time.Instant; @@ -179,9 +178,6 @@ public class AcceptanceTestHarness { private static final String SOURCE_USERNAME = "sourceusername"; public static final String SOURCE_PASSWORD = "hunter2"; public static final String PUBLIC_SCHEMA_NAME = "public"; - public static final String STAGING_SCHEMA_NAME = "staging"; - public static final String COOL_EMPLOYEES_TABLE_NAME = "cool_employees"; - public static final String AWESOME_PEOPLE_TABLE_NAME = "awesome_people"; public static final String PUBLIC = "public"; private static final String DEFAULT_POSTGRES_INIT_SQL_FILE = "postgres_init.sql"; @@ -191,6 +187,8 @@ public class AcceptanceTestHarness { public static final int MAX_TRIES = 5; public static final int MAX_ALLOWED_SECOND_PER_RUN = 120; + private static final String CLOUD_SQL_DATABASE_PREFIX = "acceptance_test_"; + // NOTE: we include `INCOMPLETE` here because the job may still retry; see // https://docs.airbyte.com/understanding-airbyte/jobs/. public static final Set IN_PROGRESS_JOB_STATUSES = Set.of(JobStatus.PENDING, JobStatus.INCOMPLETE, JobStatus.RUNNING); @@ -203,6 +201,7 @@ public class AcceptanceTestHarness { private static boolean isMac; private static boolean useExternalDeployment; private static boolean ensureCleanSlate; + private CloudSqlDatabaseProvisioner cloudSqlDatabaseProvisioner; /** * When the acceptance tests are run against a local instance of docker-compose or KUBE then these @@ -211,6 +210,9 @@ public class AcceptanceTestHarness { */ private PostgreSQLContainer sourcePsql; private PostgreSQLContainer destinationPsql; + private String sourceDatabaseName; + private String destinationDatabaseName; + private AirbyteTestContainer airbyteTestContainer; private AirbyteApiClient apiClient; @@ -218,8 +220,6 @@ public class AcceptanceTestHarness { private final UUID defaultWorkspaceId; private final String postgresSqlInitFile; - private KubernetesClient kubernetesClient; - private final List sourceIds = Lists.newArrayList(); private final List connectionIds = Lists.newArrayList(); private final List destinationIds = Lists.newArrayList(); @@ -227,11 +227,13 @@ public class AcceptanceTestHarness { private final List sourceDefinitionIds = Lists.newArrayList(); private DataSource sourceDataSource; private DataSource destinationDataSource; - private String postgresPassword; - public KubernetesClient getKubernetesClient() { - return kubernetesClient; - } + private String gcpProjectId; + private String cloudSqlInstanceId; + private String cloudSqlInstanceUsername; + private String cloudSqlInstancePassword; + private String cloudSqlInstancePrivateIp; + private String cloudSqlInstancePublicIp; public void removeConnection(final UUID connection) { connectionIds.remove(connection); @@ -241,7 +243,7 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient, final WebBackendApi webBackendApi, final UUID defaultWorkspaceId, final String postgresSqlInitFile) - throws URISyntaxException, IOException, InterruptedException { + throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException { // reads env vars to assign static variables assignEnvVars(); this.apiClient = apiClient; @@ -260,12 +262,16 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient, destinationPsql = new PostgreSQLContainer(DESTINATION_POSTGRES_IMAGE_NAME); destinationPsql.start(); - } - - if (isKube && !isGke) { - // TODO(mfsiega-airbyte): get the Kube client to work with GKE tests. We don't use it yet but we - // will want to someday. - kubernetesClient = new DefaultKubernetesClient(); + } else { + this.cloudSqlDatabaseProvisioner = new CloudSqlDatabaseProvisioner(); + sourceDatabaseName = cloudSqlDatabaseProvisioner.createDatabase( + gcpProjectId, + cloudSqlInstanceId, + generateRandomCloudSqlDatabaseName()); + destinationDatabaseName = cloudSqlDatabaseProvisioner.createDatabase( + gcpProjectId, + cloudSqlInstanceId, + generateRandomCloudSqlDatabaseName()); } // by default use airbyte deployment governed by a test container. @@ -289,7 +295,7 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient, } public AcceptanceTestHarness(final AirbyteApiClient apiClient, final WebBackendApi webBackendApi, final UUID defaultWorkspaceId) - throws URISyntaxException, IOException, InterruptedException { + throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException { this(apiClient, webBackendApi, defaultWorkspaceId, DEFAULT_POSTGRES_INIT_SQL_FILE); } @@ -314,9 +320,17 @@ public void stopDbAndContainers() { public void setup() throws SQLException, URISyntaxException, IOException, ApiException { if (isGke) { // Prepare the database data sources. - LOGGER.info("postgresPassword: {}", postgresPassword); - sourceDataSource = GKEPostgresConfig.getSourceDataSource(postgresPassword); - destinationDataSource = GKEPostgresConfig.getDestinationDataSource(postgresPassword); + LOGGER.info("postgresPassword: {}", cloudSqlInstancePassword); + sourceDataSource = GKEPostgresConfig.getDataSource( + cloudSqlInstanceUsername, + cloudSqlInstancePassword, + cloudSqlInstancePrivateIp, + sourceDatabaseName); + destinationDataSource = GKEPostgresConfig.getDataSource( + cloudSqlInstanceUsername, + cloudSqlInstancePassword, + cloudSqlInstancePrivateIp, + destinationDatabaseName); // seed database. GKEPostgresConfig.runSqlScript(Path.of(MoreResources.readResourceAsFile(postgresSqlInitFile).toURI()), getSourceDatabase()); } else { @@ -362,6 +376,15 @@ public void cleanup() { if (isGke) { DataSourceFactory.close(sourceDataSource); DataSourceFactory.close(destinationDataSource); + + cloudSqlDatabaseProvisioner.deleteDatabase( + gcpProjectId, + cloudSqlInstanceId, + sourceDatabaseName); + cloudSqlDatabaseProvisioner.deleteDatabase( + gcpProjectId, + cloudSqlInstanceId, + destinationDatabaseName); } else { destinationPsql.stop(); sourcePsql.stop(); @@ -432,9 +455,12 @@ private void assignEnvVars() { && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true"); ensureCleanSlate = System.getenv("ENSURE_CLEAN_SLATE") != null && System.getenv("ENSURE_CLEAN_SLATE").equalsIgnoreCase("true"); - postgresPassword = System.getenv("POSTGRES_PASSWORD") != null - ? System.getenv("POSTGRES_PASSWORD") - : "admin123"; + gcpProjectId = System.getenv("GCP_PROJECT_ID"); + cloudSqlInstanceId = System.getenv("CLOUD_SQL_INSTANCE_ID"); + cloudSqlInstanceUsername = System.getenv("CLOUD_SQL_INSTANCE_USERNAME"); + cloudSqlInstancePassword = System.getenv("CLOUD_SQL_INSTANCE_PASSWORD"); + cloudSqlInstancePrivateIp = System.getenv("CLOUD_SQL_INSTANCE_PRIVATE_IP"); + cloudSqlInstancePublicIp = System.getenv("CLOUD_SQL_INSTANCE_PUBLIC_IP"); } private WorkflowClient getWorkflowClient() { @@ -767,25 +793,29 @@ public List retrieveRecordsFromDatabase(final Database database, final } public JsonNode getSourceDbConfig() { - return getDbConfig(sourcePsql, false, false, Type.SOURCE); + return getDbConfig(sourcePsql, false, false, sourceDatabaseName); } public JsonNode getDestinationDbConfig() { - return getDbConfig(destinationPsql, false, true, Type.DESTINATION); + return getDbConfig(destinationPsql, false, true, destinationDatabaseName); } public JsonNode getDestinationDbConfigWithHiddenPassword() { - return getDbConfig(destinationPsql, true, true, Type.DESTINATION); + return getDbConfig(destinationPsql, true, true, destinationDatabaseName); } public JsonNode getDbConfig(final PostgreSQLContainer psql, final boolean hiddenPassword, final boolean withSchema, - final Type connectorType) { + final String databaseName) { try { final Map dbConfig = - (isKube && isGke) ? GKEPostgresConfig.dbConfig(connectorType, hiddenPassword ? null : postgresPassword, withSchema) - : localConfig(psql, hiddenPassword, withSchema); + (isKube && isGke) ? GKEPostgresConfig.dbConfig( + hiddenPassword ? null : cloudSqlInstancePassword, + withSchema, + cloudSqlInstanceUsername, + cloudSqlInstancePublicIp, + databaseName) : localConfig(psql, hiddenPassword, withSchema); final var config = Jsons.jsonNode(dbConfig); LOGGER.info("Using db config: {}", Jsons.toPrettyString(config)); return config; @@ -1227,14 +1257,6 @@ public StreamStatusReadList getStreamStatuses(UUID connectionId, Long jobId, Int "get stream statuses", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); } - /** - * Connector type. - */ - public enum Type { - SOURCE, - DESTINATION - } - public void setIncrementalAppendSyncMode(final AirbyteCatalog airbyteCatalog, final List cursorField) { airbyteCatalog.getStreams().forEach(stream -> { stream.getConfig().syncMode(SyncMode.INCREMENTAL) @@ -1303,4 +1325,8 @@ public void compareCatalog(AirbyteCatalog actual) { assertEquals(expected, actual); } + private static String generateRandomCloudSqlDatabaseName() { + return CLOUD_SQL_DATABASE_PREFIX + UUID.randomUUID(); + } + } diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/CloudSqlDatabaseProvisioner.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/CloudSqlDatabaseProvisioner.java new file mode 100644 index 00000000000..3057ec467fc --- /dev/null +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/CloudSqlDatabaseProvisioner.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.test.utils; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.services.sqladmin.SQLAdmin; +import com.google.api.services.sqladmin.model.Database; +import com.google.api.services.sqladmin.model.Operation; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Creates and deletes GCP CloudSQL databases. + */ +public class CloudSqlDatabaseProvisioner { + + private static final Logger LOGGER = LoggerFactory.getLogger(CloudSqlDatabaseProvisioner.class); + + private static final String SQL_OPERATION_DONE_STATUS = "DONE"; + private static final int DEFAULT_MAX_POLL_ATTEMPTS = 10; + private static final int DEFAULT_MAX_API_CALL_ATTEMPTS = 10; + private static final String APPLICATION_NAME = "cloud-sql-database-provisioner"; + + private final SQLAdmin sqlAdmin; + private final int maxPollAttempts; + private final int maxApiCallAttempts; + + @VisibleForTesting + CloudSqlDatabaseProvisioner(SQLAdmin sqlAdmin, int maxPollAttempts, int maxApiCallAttempts) { + this.sqlAdmin = sqlAdmin; + this.maxPollAttempts = maxPollAttempts; + this.maxApiCallAttempts = maxApiCallAttempts; + } + + public CloudSqlDatabaseProvisioner() throws GeneralSecurityException, IOException { + this.sqlAdmin = new SQLAdmin.Builder( + GoogleNetHttpTransport.newTrustedTransport(), + GsonFactory.getDefaultInstance(), + new HttpCredentialsAdapter(GoogleCredentials.getApplicationDefault())).setApplicationName(APPLICATION_NAME).build(); + this.maxPollAttempts = DEFAULT_MAX_POLL_ATTEMPTS; + this.maxApiCallAttempts = DEFAULT_MAX_API_CALL_ATTEMPTS; + } + + public synchronized String createDatabase(String projectId, String instanceId, String databaseName) throws IOException, InterruptedException { + Database database = new Database().setName(databaseName); + Operation operation = runWithRetry(() -> sqlAdmin.databases().insert(projectId, instanceId, database).execute()); + pollOperation(projectId, operation.getName()); + + return databaseName; + } + + public synchronized void deleteDatabase(String projectId, String instanceId, String databaseName) throws IOException, InterruptedException { + Operation operation = runWithRetry(() -> sqlAdmin.databases().delete(projectId, instanceId, databaseName).execute()); + pollOperation(projectId, operation.getName()); + } + + /** + * Database operations are asynchronous. This method polls the operation until it is done. + */ + @VisibleForTesting + void pollOperation(String projectId, String operationName) throws IOException, InterruptedException { + int pollAttempts = 0; + while (pollAttempts < maxPollAttempts) { + Operation operation = sqlAdmin.operations().get(projectId, operationName).execute(); + if (operation.getStatus().equals(SQL_OPERATION_DONE_STATUS)) { + return; + } + Thread.sleep(1000); + pollAttempts += 1; + } + + throw new RuntimeException("Operation " + operationName + " did not complete successfully"); + } + + /** + * If there's another operation already in progress in one same cloudsql instance then the api will + * return a 409 error. This method will retry api calls that return a 409 error. + */ + @VisibleForTesting + Operation runWithRetry(Callable callable) throws InterruptedException { + int attempts = 0; + while (attempts < maxApiCallAttempts) { + try { + return callable.call(); + } catch (Exception e) { + if (e instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) e).getStatusCode() == 409) { + attempts++; + LOGGER.info("Attempt " + attempts + " failed with 409 error"); + LOGGER.info("Exception thrown by API: " + e.getMessage()); + Thread.sleep(1000); + } else { + throw new RuntimeException(e); + } + } + } + throw new RuntimeException("Max retries exceeded. Could not complete operation."); + } + +} diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/GKEPostgresConfig.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/GKEPostgresConfig.java index 11d789d4787..9f18f42ba71 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/GKEPostgresConfig.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/GKEPostgresConfig.java @@ -8,7 +8,6 @@ import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.test.utils.AcceptanceTestHarness.Type; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -20,33 +19,24 @@ /** * This class is used to provide information related to the test databases for running the - * {@link AcceptanceTestHarness} on GKE. We launch 2 postgres databases in GKE as pods which act as - * source and destination and the tests run against them. In order to allow the test instance to - * connect to these databases we use port forwarding Refer - * tools/bin/gke-kube-acceptance-test/acceptance_test_kube_gke.sh for more info + * {@link AcceptanceTestHarness} on GKE. */ class GKEPostgresConfig { - // NOTE: these two hosts refer to services named `acceptance-test-postgres-[source|destination]-svc` - // in the `acceptance-tests` namespace, running in the same cluster as the check/discover/sync - // workers. - // - // The namespace here needs to be in sync with the namespaces created in - // tools/bin/gke-kube-acceptance-test/acceptance_test_kube_gke.sh. - private static final String SOURCE_HOST = "acceptance-test-postgres-source-svc.acceptance-tests.svc.cluster.local"; - private static final String DESTINATION_HOST = "acceptance-test-postgres-destination-svc.acceptance-tests.svc.cluster.local"; private static final Integer PORT = 5432; - private static final String USERNAME = "postgresadmin"; - private static final String DB = "postgresdb"; - static Map dbConfig(final Type connectorType, final String password, final boolean withSchema) { + static Map dbConfig(final String password, + final boolean withSchema, + String username, + String cloudSqlInstanceIp, + String databaseName) { final Map dbConfig = new HashMap<>(); - dbConfig.put(JdbcUtils.HOST_KEY, connectorType == Type.SOURCE ? SOURCE_HOST : DESTINATION_HOST); + dbConfig.put(JdbcUtils.HOST_KEY, cloudSqlInstanceIp); dbConfig.put(JdbcUtils.PASSWORD_KEY, password == null ? "**********" : password); dbConfig.put(JdbcUtils.PORT_KEY, PORT); - dbConfig.put(JdbcUtils.DATABASE_KEY, DB); - dbConfig.put(JdbcUtils.USERNAME_KEY, USERNAME); + dbConfig.put(JdbcUtils.DATABASE_KEY, databaseName); + dbConfig.put(JdbcUtils.USERNAME_KEY, username); dbConfig.put(JdbcUtils.JDBC_URL_PARAMS, "connectTimeout=60"); if (withSchema) { @@ -56,20 +46,9 @@ static Map dbConfig(final Type connectorType, final String passw return dbConfig; } - static DataSource getDestinationDataSource(final String password) { - // Note: we set the connection timeout to 30s. The underlying Hikari default is also 30s -- - // https://github.com/brettwooldridge/HikariCP#frequently-used -- but our DataSourceFactory - // overrides that to MAX_INTEGER unless we explicitly specify it. - return DataSourceFactory.create(USERNAME, password, DatabaseDriver.POSTGRESQL.getDriverClassName(), - "jdbc:postgresql://localhost:4000/postgresdb", Map.of(PGProperty.CONNECT_TIMEOUT.getName(), "60")); - } - - static DataSource getSourceDataSource(final String password) { - // Note: we set the connection timeout to 30s. The underlying Hikari default is also 30s -- - // https://github.com/brettwooldridge/HikariCP#frequently-used -- but our DataSourceFactory - // overrides that to MAX_INTEGER unless we explicitly specify it. - return DataSourceFactory.create(USERNAME, password, DatabaseDriver.POSTGRESQL.getDriverClassName(), - "jdbc:postgresql://localhost:2000/postgresdb", Map.of(PGProperty.CONNECT_TIMEOUT.getName(), "60")); + static DataSource getDataSource(final String username, final String password, String cloudSqlInstanceIp, String databaseName) { + return DataSourceFactory.create(username, password, DatabaseDriver.POSTGRESQL.getDriverClassName(), + "jdbc:postgresql://" + cloudSqlInstanceIp + ":5432/" + databaseName, Map.of(PGProperty.CONNECT_TIMEOUT.getName(), "60")); } static void runSqlScript(final Path scriptFilePath, final Database db) throws SQLException, IOException { diff --git a/airbyte-test-utils/src/test/java/io/airbyte/test/utils/CloudSqlDatabaseProvisionerTest.java b/airbyte-test-utils/src/test/java/io/airbyte/test/utils/CloudSqlDatabaseProvisionerTest.java new file mode 100644 index 00000000000..8bf975eb163 --- /dev/null +++ b/airbyte-test-utils/src/test/java/io/airbyte/test/utils/CloudSqlDatabaseProvisionerTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.test.utils; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.sqladmin.SQLAdmin; +import com.google.api.services.sqladmin.SQLAdmin.Operations; +import com.google.api.services.sqladmin.model.Database; +import com.google.api.services.sqladmin.model.Operation; +import java.io.IOException; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class CloudSqlDatabaseProvisionerTest { + + private static final String PROJECT_ID = "project-id"; + private static final String INSTANCE_ID = "instance-id"; + private static final String DATABASE_NAME = "database-name"; + private static final int POLL_ATTEMPTS = 2; + private static final int API_CALL_ATTEMPTS = 2; + + @Mock + private SQLAdmin sqlAdmin; + @Mock + private SQLAdmin.Databases databases; + @Mock + private Operations operations; + @Mock + private Operations.Get getOperation; + @Mock + private SQLAdmin.Databases.Insert insertDatabase; + @Mock + private SQLAdmin.Databases.Delete deleteDatabase; + @Mock + private Operation operation; + @Mock + private GoogleJsonResponseException googleJsonResponseException; + @Mock + private Callable callable; + + private CloudSqlDatabaseProvisioner provisioner; + + @BeforeEach + void setUp() { + provisioner = new CloudSqlDatabaseProvisioner(sqlAdmin, POLL_ATTEMPTS, API_CALL_ATTEMPTS); + } + + @Test + void testCreateDatabase() throws IOException, InterruptedException { + mockOperation(); + when(operation.getStatus()).thenReturn("DONE"); + when(sqlAdmin.databases()).thenReturn(databases); + when(databases.insert(anyString(), anyString(), any(Database.class))).thenReturn(insertDatabase); + when(insertDatabase.execute()).thenReturn(operation); + when(operation.getName()).thenReturn("operation-name"); + + provisioner.createDatabase(PROJECT_ID, INSTANCE_ID, DATABASE_NAME); + + verify(databases).insert(PROJECT_ID, INSTANCE_ID, new Database().setName(DATABASE_NAME)); + verify(insertDatabase).execute(); + } + + @Test + void testDeleteDatabase() throws IOException, InterruptedException { + mockOperation(); + when(operation.getStatus()).thenReturn("DONE"); + when(sqlAdmin.databases()).thenReturn(databases); + when(databases.delete(anyString(), anyString(), anyString())).thenReturn(deleteDatabase); + when(deleteDatabase.execute()).thenReturn(operation); + when(operation.getName()).thenReturn("operation-name"); + + provisioner.deleteDatabase(PROJECT_ID, INSTANCE_ID, DATABASE_NAME); + + verify(databases).delete(PROJECT_ID, INSTANCE_ID, DATABASE_NAME); + verify(deleteDatabase).execute(); + } + + @Test + void testPollOperationNotDoneAfterMaxStatusChecks() throws IOException { + mockOperation(); + when(operation.getStatus()) + .thenReturn("PENDING") + .thenReturn("RUNNING") + .thenReturn("DONE"); + assertThrows(RuntimeException.class, () -> provisioner.pollOperation(PROJECT_ID, "operation-name")); + } + + @Test + void testPollOperationDoneBeforeMaxStatusChecks() throws IOException { + mockOperation(); + when(operation.getStatus()) + .thenReturn("PENDING") + .thenReturn("DONE"); + assertDoesNotThrow(() -> provisioner.pollOperation(PROJECT_ID, "operation-name")); + } + + private void mockOperation() throws IOException { + when(sqlAdmin.operations()).thenReturn(operations); + when(operations.get(eq(PROJECT_ID), anyString())).thenReturn(getOperation); + when(getOperation.execute()).thenReturn(operation); + } + + @Test + void testMoreThanMaxAttempts() throws Exception { + when(callable.call()).thenThrow(googleJsonResponseException); + when(googleJsonResponseException.getStatusCode()).thenReturn(409); + assertThrows(RuntimeException.class, () -> provisioner.runWithRetry(callable)); + } + + @Test + void testNoRetry() throws Exception { + when(callable.call()).thenThrow(new RuntimeException()); + assertThrows(RuntimeException.class, () -> provisioner.runWithRetry(callable)); + } + + @Test + void testOneRetry() throws Exception { + when(googleJsonResponseException.getStatusCode()).thenReturn(409); + when(callable.call()) + .thenThrow(googleJsonResponseException) + .thenReturn(null); + + assertDoesNotThrow(() -> provisioner.runWithRetry(callable)); + } + +} diff --git a/airbyte-test-utils/stage_network_setup.png b/airbyte-test-utils/stage_network_setup.png new file mode 100644 index 00000000000..4ac000d9b58 Binary files /dev/null and b/airbyte-test-utils/stage_network_setup.png differ diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AcceptanceTestsResources.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AcceptanceTestsResources.java index 0cbb62fa5aa..1f932c4cd57 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AcceptanceTestsResources.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AcceptanceTestsResources.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.security.GeneralSecurityException; import java.sql.SQLException; import java.time.Duration; import java.util.Collections; @@ -276,7 +277,7 @@ void runSmallSyncForAWorkspaceId(final UUID workspaceId) throws Exception { StreamStatusJobType.SYNC); } - void init() throws URISyntaxException, IOException, InterruptedException, ApiException { + void init() throws URISyntaxException, IOException, InterruptedException, ApiException, GeneralSecurityException { // TODO(mfsiega-airbyte): clean up and centralize the way we do config. final boolean isGke = System.getenv().containsKey(IS_GKE); // Set up the API client. diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java index 7331bcb527b..3f5bab6be82 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java @@ -18,7 +18,6 @@ import com.google.common.collect.Sets; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.invoker.generated.ApiClient; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; import io.airbyte.api.client.model.generated.AttemptInfoRead; @@ -42,9 +41,7 @@ import io.airbyte.test.utils.AcceptanceTestHarness; import io.airbyte.test.utils.Asserts; import io.airbyte.test.utils.TestConnectionCreate; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -90,7 +87,7 @@ class AdvancedAcceptanceTests { private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001"); @BeforeAll - static void init() throws URISyntaxException, IOException, InterruptedException, ApiException { + static void init() throws Exception { final URI url = new URI(AIRBYTE_SERVER_HOST); final var apiClient = new AirbyteApiClient( new ApiClient().setScheme(url.getScheme()) diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ApiAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ApiAcceptanceTests.java index ba8df5af9b7..85000ecf2fb 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ApiAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ApiAcceptanceTests.java @@ -37,9 +37,6 @@ import io.airbyte.test.utils.AcceptanceTestHarness; import io.airbyte.test.utils.Asserts; import io.airbyte.test.utils.TestConnectionCreate; -import java.io.IOException; -import java.net.URISyntaxException; -import java.sql.SQLException; import java.util.List; import java.util.Set; import java.util.UUID; @@ -81,7 +78,7 @@ class ApiAcceptanceTests { private UUID workspaceId; @BeforeEach - void setup() throws SQLException, URISyntaxException, IOException, ApiException, InterruptedException { + void setup() throws Exception { testResources = new AcceptanceTestsResources(); testResources.init(); testHarness = testResources.getTestHarness(); diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ConnectorBuilderTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ConnectorBuilderTests.java index f1f51799a75..6f28346caa0 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ConnectorBuilderTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/ConnectorBuilderTests.java @@ -32,10 +32,7 @@ import io.airbyte.test.utils.AcceptanceTestHarness; import io.airbyte.test.utils.Databases; import io.airbyte.test.utils.SchemaTableNamePair; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; -import java.sql.SQLException; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -166,7 +163,7 @@ public class ConnectorBuilderTests { } @BeforeAll - static void init() throws URISyntaxException, IOException, InterruptedException, ApiException, SQLException { + static void init() throws Exception { final URI url = new URI(AIRBYTE_SERVER_HOST); final var underlyingApiClient = new ApiClient().setScheme(url.getScheme()) .setHost(url.getHost()) diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java index 7c74b999435..9ea731cf60d 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.security.GeneralSecurityException; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -114,7 +115,7 @@ private void createTestConnections() throws Exception { .build()); } - void init() throws ApiException, URISyntaxException, IOException, InterruptedException { + void init() throws ApiException, URISyntaxException, IOException, InterruptedException, GeneralSecurityException { // TODO(mfsiega-airbyte): clean up and centralize the way we do config. final boolean isGke = System.getenv().containsKey(IS_GKE); // Set up the API client. diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SyncAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SyncAcceptanceTests.java index a5c029a1fb6..173e1de5210 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SyncAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SyncAcceptanceTests.java @@ -72,9 +72,6 @@ import io.airbyte.test.utils.SchemaTableNamePair; import io.airbyte.test.utils.TestConnectionCreate; import io.temporal.client.WorkflowQueryException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.sql.SQLException; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -139,7 +136,7 @@ class SyncAcceptanceTests { UUID workspaceId; @BeforeEach - void setup() throws SQLException, URISyntaxException, IOException, ApiException, InterruptedException { + void setup() throws Exception { testResources = new AcceptanceTestsResources(); testResources.init(); testHarness = testResources.getTestHarness(); diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java index 00a2828429f..572f662f8dd 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java @@ -10,13 +10,9 @@ import static io.airbyte.test.acceptance.AcceptanceTestsResources.TRUE; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.CheckConnectionRead; import io.airbyte.api.client.model.generated.CheckConnectionRead.StatusEnum; -import java.io.IOException; -import java.net.URISyntaxException; -import java.sql.SQLException; import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -39,7 +35,7 @@ public class WorkloadBasicAcceptanceTests { static final UUID RUN_DISCOVER_WITH_WORKLOAD_WORKSPACE_ID = UUID.fromString("3851861d-ac0b-440c-bd60-408cf9e7fc0e"); @BeforeEach - void setup() throws SQLException, URISyntaxException, IOException, ApiException, InterruptedException { + void setup() throws Exception { testResources.init(); testResources.setup(); } diff --git a/deps.toml b/deps.toml index 5cba7fa2e83..721b7946ccc 100644 --- a/deps.toml +++ b/deps.toml @@ -84,6 +84,8 @@ glassfish = { module = "org.glassfish.jersey:jackson-bom", version.ref = "glassf google-auth-library-oauth2-http = { module = "com.google.auth:google-auth-library-oauth2-http", version = "1.4.0" } google-cloud-storage = { module = "com.google.cloud:google-cloud-storage", version = "2.17.2" } google-cloud-storage-secretmanager = { module = "com.google.cloud:google-cloud-secretmanager", version = "2.0.5" } +google-cloud-sqladmin = { module = "com.google.apis:google-api-services-sqladmin", version = "v1-rev20240317-2.0.0" } +google-cloud-api-client = { module = "com.google.api-client:google-api-client", version = "2.4.0" } guava = { module = "com.google.guava:guava", version = "31.1-jre" } gson = { module = "com.google.code.gson:gson", version = "2.10.1" } hamcrest-all = { module = "org.hamcrest:hamcrest-all", version = "1.3" }