diff --git a/.gitignore b/.gitignore index c7c506ae45ad..ce23ae809b2c 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ resources/examples/airflow/logs/* # Cloud Demo !airbyte-webapp/src/packages/cloud/data + diff --git a/airbyte-integrations/connectors/destination-gcs/readme.md b/airbyte-integrations/connectors/destination-gcs/readme.md new file mode 100644 index 000000000000..2900370f2d61 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/readme.md @@ -0,0 +1,6 @@ +# Destination Google Cloud Storage (GCS) + +In order to test the GCS destination, you need a Google Cloud Platform account. + +You need to generate the HMAC Keys for a service account then use them as keys to connect. +Obviously you also need to provide the right grants to the user in order to allow it to create and delete buckets and files in GCS. \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java index 60b7d30b8a0d..22681e46d1c1 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java @@ -22,6 +22,7 @@ * SOFTWARE. */ +package io.airbyte.integrations.destination.s3.avro; package io.airbyte.integrations.destination.gcs.avro; import alex.mojaki.s3upload.MultiPartOutputStream; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java index bcddf1efa61e..f2366935bf38 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java @@ -28,33 +28,49 @@ public class GcsConfig { - private final String projectId; private final String bucketName; - private final String credentialsJson; + private final String bucketPath; + private final String accessKeyId; + private final String secretAccessKey; + private final String region; - public GcsConfig(String projectId, String bucketName, String credentialsJson) { - this.projectId = projectId; + public GcsConfig(String bucketName, String bucketPath, String accessKeyId, String secretAccessKey, String region) { this.bucketName = bucketName; - this.credentialsJson = credentialsJson; - } - - public String getProjectId() { - return projectId; + this.bucketPath = bucketPath; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + this.region = region; } public String getBucketName() { return bucketName; } - public String getCredentialsJson() { - return credentialsJson; + public String getbucketPath() { + return bucketPath; } - public static GcsConfig getGcsConfig(JsonNode config) { + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + + public String getRegion() { + return region; + } + + public static GcsConfig getGcsConfig(JsonNode config) { + return new GcsConfig( - config.get("loading_method").get("project_id").asText(), - config.get("loading_method").get("bucket_name").asText(), - config.get("loading_method").get("credentials_json").asText()); + config.get("gcs_bucket_name").asText(), + config.get("gcs_bucket_path").asText(), + config.get("access_key_id").asText(), + config.get("secret_access_key").asText(), + config.get("gcs_bucket_region").asText() + ); } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index c74ee13e8389..890475c72b01 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -50,6 +50,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.client.builder.AwsClientBuilder; + + public abstract class GcsStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); @@ -180,25 +188,38 @@ public static void attemptWriteToPersistence(GcsConfig gcsConfig) throws IOExcep attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName); } + public static void attemptGcsWriteAndDelete(GcsConfig gcsConfig) throws IOException { + final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName); + } + private static void attemptWriteAndDeleteGcsObject(GcsConfig gcsConfig, String outputTableName) throws IOException { - var storage = getStorageClient(gcsConfig); - var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName); - var blobInfo = BlobInfo.newBuilder(blobId).build(); + var storage = getStorageClient(gcsConfig); + // var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName); + // var blobInfo = BlobInfo.newBuilder(blobId).build(); - storage.create(blobInfo, "".getBytes()); - storage.delete(blobId); - } + // storage.create(blobInfo, "".getBytes()); + // storage.delete(blobId); + var gcsBucket = gcsConfig.getBucketName(); - public static Storage getStorageClient(GcsConfig gcsConfig) throws IOException { - InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes()); - GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream); - return StorageOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(gcsConfig.getProjectId()) - .build() - .getService(); + storage.putObject(gcsBucket, outputTableName, "check-content"); + storage.deleteObject(gcsBucket, outputTableName); } + public static AmazonS3 getStorageClient(GcsConfig gcsConfig) throws IOException { + var region = gcsConfig.getRegion(); + var accessKeyId = gcsConfig.getAccessKeyId(); + var secretAccessKey = gcsConfig.getSecretAccessKey(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + "https://storage.googleapis.com", region)) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database, String gcsFileLocation, String schema, diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java index 594eaf85285f..78d5497e76b8 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -38,6 +38,13 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.client.builder.AwsClientBuilder; + public abstract class GcsStreamCopierFactory implements StreamCopierFactory { /** @@ -56,15 +63,28 @@ public StreamCopier create(String configuredSchema, var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); var schema = getSchema(stream, configuredSchema, nameTransformer); - InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes()); - GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream); - Storage storageClient = StorageOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(gcsConfig.getProjectId()) - .build() - .getService(); + var region = gcsConfig.getRegion(); + var accessKeyId = gcsConfig.getAccessKeyId(); + var secretAccessKey = gcsConfig.getSecretAccessKey(); + + // InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes()); + // GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream); + // Storage storageClient = StorageOptions.newBuilder() + // .setCredentials(credentials) + // .setProjectId(gcsConfig.getProjectId()) + // .build() + // .getService(); + + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + AmazonS3 s3Client = AmazonS3ClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + "https://storage.googleapis.com", region)) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); - return create(stagingFolder, syncMode, schema, pair.getName(), storageClient, db, gcsConfig, nameTransformer, sqlOperations); + return create(stagingFolder, syncMode, schema, pair.getName(), //storageClient + s3Client, db, gcsConfig, nameTransformer, sqlOperations); } catch (Exception e) { throw new RuntimeException(e); } @@ -77,7 +97,7 @@ public abstract StreamCopier create(String stagingFolder, DestinationSyncMode syncMode, String schema, String streamName, - Storage storageClient, + AmazonS3 s3Client, JdbcDatabase db, GcsConfig gcsConfig, ExtendedNameTransformer nameTransformer, diff --git a/gradlew b/gradlew index fbd7c515832d..2b5857a1c082 100755 --- a/gradlew +++ b/gradlew @@ -81,6 +81,7 @@ case "`uname`" in esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar +JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-14.jdk/Contents/Home # Determine the Java command to use to start the JVM.