Skip to content

Commit

Permalink
Merge in original GCS destination PR (airbytehq#4329)
Browse files Browse the repository at this point in the history
Merge in the original PR to include author @MaxwellJK as an Airbyte contributor.

Author: Marco Fontana <[email protected]>
  • Loading branch information
MaxwellJK authored Aug 20, 2021
1 parent 825c358 commit 05a5503
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ resources/examples/airflow/logs/*

# Cloud Demo
!airbyte-webapp/src/packages/cloud/data

6 changes: 6 additions & 0 deletions airbyte-integrations/connectors/destination-gcs/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Destination Google Cloud Storage (GCS)

In order to test the GCS destination, you need a Google Cloud Platform account.

You need to generate the HMAC Keys for a service account then use them as keys to connect.
Obviously you also need to provide the right grants to the user in order to allow it to create and delete buckets and files in GCS.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.integrations.destination.s3.avro;
package io.airbyte.integrations.destination.gcs.avro;

import alex.mojaki.s3upload.MultiPartOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,49 @@

public class GcsConfig {

private final String projectId;
private final String bucketName;
private final String credentialsJson;
private final String bucketPath;
private final String accessKeyId;
private final String secretAccessKey;
private final String region;

public GcsConfig(String projectId, String bucketName, String credentialsJson) {
this.projectId = projectId;
public GcsConfig(String bucketName, String bucketPath, String accessKeyId, String secretAccessKey, String region) {
this.bucketName = bucketName;
this.credentialsJson = credentialsJson;
}

public String getProjectId() {
return projectId;
this.bucketPath = bucketPath;
this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
this.region = region;
}

public String getBucketName() {
return bucketName;
}

public String getCredentialsJson() {
return credentialsJson;
public String getbucketPath() {
return bucketPath;
}

public static GcsConfig getGcsConfig(JsonNode config) {
public String getAccessKeyId() {
return accessKeyId;
}

public String getSecretAccessKey() {
return secretAccessKey;
}

public String getRegion() {
return region;
}

public static GcsConfig getGcsConfig(JsonNode config) {

return new GcsConfig(
config.get("loading_method").get("project_id").asText(),
config.get("loading_method").get("bucket_name").asText(),
config.get("loading_method").get("credentials_json").asText());
config.get("gcs_bucket_name").asText(),
config.get("gcs_bucket_path").asText(),
config.get("access_key_id").asText(),
config.get("secret_access_key").asText(),
config.get("gcs_bucket_region").asText()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder;


public abstract class GcsStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class);
Expand Down Expand Up @@ -180,25 +188,38 @@ public static void attemptWriteToPersistence(GcsConfig gcsConfig) throws IOExcep
attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName);
}

public static void attemptGcsWriteAndDelete(GcsConfig gcsConfig) throws IOException {
final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName);
}

private static void attemptWriteAndDeleteGcsObject(GcsConfig gcsConfig, String outputTableName) throws IOException {
var storage = getStorageClient(gcsConfig);
var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
var blobInfo = BlobInfo.newBuilder(blobId).build();
var storage = getStorageClient(gcsConfig);
// var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
// var blobInfo = BlobInfo.newBuilder(blobId).build();

storage.create(blobInfo, "".getBytes());
storage.delete(blobId);
}
// storage.create(blobInfo, "".getBytes());
// storage.delete(blobId);
var gcsBucket = gcsConfig.getBucketName();

public static Storage getStorageClient(GcsConfig gcsConfig) throws IOException {
InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
return StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();
storage.putObject(gcsBucket, outputTableName, "check-content");
storage.deleteObject(gcsBucket, outputTableName);
}

public static AmazonS3 getStorageClient(GcsConfig gcsConfig) throws IOException {
var region = gcsConfig.getRegion();
var accessKeyId = gcsConfig.getAccessKeyId();
var secretAccessKey = gcsConfig.getSecretAccessKey();

var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"https://storage.googleapis.com", region))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}

public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database,
String gcsFileLocation,
String schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder;

public abstract class GcsStreamCopierFactory implements StreamCopierFactory<GcsConfig> {

/**
Expand All @@ -56,15 +63,28 @@ public StreamCopier create(String configuredSchema,
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
var schema = getSchema(stream, configuredSchema, nameTransformer);

InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
Storage storageClient = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();
var region = gcsConfig.getRegion();
var accessKeyId = gcsConfig.getAccessKeyId();
var secretAccessKey = gcsConfig.getSecretAccessKey();

// InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
// GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
// Storage storageClient = StorageOptions.newBuilder()
// .setCredentials(credentials)
// .setProjectId(gcsConfig.getProjectId())
// .build()
// .getService();

var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"https://storage.googleapis.com", region))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();

return create(stagingFolder, syncMode, schema, pair.getName(), storageClient, db, gcsConfig, nameTransformer, sqlOperations);
return create(stagingFolder, syncMode, schema, pair.getName(), //storageClient
s3Client, db, gcsConfig, nameTransformer, sqlOperations);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -77,7 +97,7 @@ public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
Storage storageClient,
AmazonS3 s3Client,
JdbcDatabase db,
GcsConfig gcsConfig,
ExtendedNameTransformer nameTransformer,
Expand Down
1 change: 1 addition & 0 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ case "`uname`" in
esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-14.jdk/Contents/Home


# Determine the Java command to use to start the JVM.
Expand Down

0 comments on commit 05a5503

Please sign in to comment.