From c1c9cb00a4f46fd57a9e93d130273d37c6fd3cc6 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Thu, 4 Jun 2020 17:30:38 +0200 Subject: [PATCH] Add jflyte-aws Implement S3 FileSystem --- Dockerfile | 1 + .../flyte/jflyte/api/FileSystemRegistrar.java | 4 +- jflyte-aws/pom.xml | 123 ++++++++++++++++ .../org/flyte/jflyte/aws/S3FileSystem.java | 132 ++++++++++++++++++ .../jflyte/aws/S3FileSystemRegistrar.java | 32 +++++ .../jflyte/aws/S3WritableByteChannel.java | 79 +++++++++++ .../org/flyte/jflyte/aws/package-info.java | 19 +++ .../org/flyte/jflyte/aws/S3FileSystemIT.java | 91 ++++++++++++ jflyte-build/pom.xml | 4 + .../jflyte/gcp/GcsFileSystemRegistrar.java | 4 +- .../org/flyte/jflyte/FileSystemLoader.java | 5 +- pom.xml | 6 + scripts/jflyte | 5 +- 13 files changed, 499 insertions(+), 6 deletions(-) create mode 100644 jflyte-aws/pom.xml create mode 100644 jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystem.java create mode 100644 jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystemRegistrar.java create mode 100644 jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3WritableByteChannel.java create mode 100644 jflyte-aws/src/main/java/org/flyte/jflyte/aws/package-info.java create mode 100644 jflyte-aws/src/test/java/org/flyte/jflyte/aws/S3FileSystemIT.java diff --git a/Dockerfile b/Dockerfile index 056d1347f..007e9ff95 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,7 @@ ARG FLYTE_INTERNAL_IMAGE COPY jflyte/target/lib /jflyte/ # plugins +COPY jflyte-aws/target/lib /jflyte/modules/jflyte-aws COPY jflyte-google-cloud/target/lib /jflyte/modules/jflyte-google-cloud ENV FLYTE_INTERNAL_MODULE_DIR "/jflyte/modules" diff --git a/jflyte-api/src/main/java/org/flyte/jflyte/api/FileSystemRegistrar.java b/jflyte-api/src/main/java/org/flyte/jflyte/api/FileSystemRegistrar.java index 4d6b2c388..480d250c0 100644 --- a/jflyte-api/src/main/java/org/flyte/jflyte/api/FileSystemRegistrar.java +++ b/jflyte-api/src/main/java/org/flyte/jflyte/api/FileSystemRegistrar.java @@ -16,7 +16,9 @@ */ package org.flyte.jflyte.api; +import java.util.Map; + /** A registrar that creates {@link FileSystem} instances. */ public abstract class FileSystemRegistrar { - public abstract Iterable load(); + public abstract Iterable load(Map env); } diff --git a/jflyte-aws/pom.xml b/jflyte-aws/pom.xml new file mode 100644 index 000000000..eaa66bf9b --- /dev/null +++ b/jflyte-aws/pom.xml @@ -0,0 +1,123 @@ + + + 4.0.0 + + + org.flyte + flytekit-parent + 0.2.0-SNAPSHOT + + + jflyte-aws + + + + + + com.amazonaws + aws-java-sdk-bom + 1.11.327 + pom + import + + + org.testcontainers + testcontainers-bom + 1.14.3 + pom + import + + + + + commons-logging + commons-logging + 1.2 + + + + + + + + com.amazonaws + aws-java-sdk-s3 + + + + + org.flyte + jflyte-api + provided + + + com.google.auto.service + auto-service-annotations + provided + + + com.google.code.findbugs + jsr305 + provided + + + + + org.junit.jupiter + junit-jupiter + test + + + org.hamcrest + hamcrest + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + localstack + test + + + + + + + maven-jar-plugin + + + maven-dependency-plugin + + + + diff --git a/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystem.java b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystem.java new file mode 100644 index 000000000..8c28f32e2 --- /dev/null +++ b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystem.java @@ -0,0 +1,132 @@ +/* + * Copyright 2020 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.jflyte.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.AmazonS3URI; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectId; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.flyte.jflyte.api.FileSystem; +import org.flyte.jflyte.api.Manifest; + +public class S3FileSystem implements FileSystem { + private final AmazonS3 s3; + + private static final Logger LOG = Logger.getLogger(S3FileSystem.class.getName()); + + static { + // enable all levels for the actual handler to pick up + LOG.setLevel(Level.ALL); + } + + public S3FileSystem(AmazonS3 s3) { + this.s3 = s3; + } + + public static S3FileSystem create(Map env) { + String endpoint = env.get("FLYTE_AWS_ENDPOINT"); + String accessKeyId = env.get("FLYTE_AWS_ACCESS_KEY_ID"); + + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + + if (accessKeyId != null) { + LOG.fine(String.format("Using FLYTE_AWS_ACCESS_KEY_ID [%s]", accessKeyId)); + + String secretAccessKey = env.get("FLYTE_AWS_SECRET_ACCESS_KEY"); + BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey); + + builder.withCredentials(new AWSStaticCredentialsProvider(credentials)); + } + + if (endpoint != null) { + LOG.fine(String.format("Using FLYTE_AWS_ENDPOINT [%s]", endpoint)); + + // assume it's minio from this point, it doesn't work without signer override + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + + builder.withClientConfiguration(clientConfiguration); + builder.withPathStyleAccessEnabled(true); + + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(endpoint, Regions.US_EAST_1.name())); + } else { + builder.withRegion(Regions.DEFAULT_REGION); + } + + return new S3FileSystem(builder.build()); + } + + @Override + public String getScheme() { + return "s3"; + } + + @Override + public ReadableByteChannel reader(String uri) { + AmazonS3URI s3Uri = new AmazonS3URI(uri); + S3ObjectId objectId = new S3ObjectId(s3Uri.getBucket(), s3Uri.getKey(), s3Uri.getVersionId()); + S3Object object = s3.getObject(new GetObjectRequest(objectId)); + + return Channels.newChannel(object.getObjectContent()); + } + + @Override + public WritableByteChannel writer(String uri) { + AmazonS3URI s3Uri = new AmazonS3URI(uri); + + LOG.fine("bucket=" + s3Uri.getBucket() + " key=" + s3Uri.getKey()); + LOG.fine(s3.listBuckets() + ""); + + try { + return S3WritableByteChannel.create( + s3, /* bucketName= */ s3Uri.getBucket(), /* key= */ s3Uri.getKey()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Nullable + @Override + public Manifest getManifest(String uri) { + AmazonS3URI s3Uri = new AmazonS3URI(uri); + + if (!s3.doesObjectExist(s3Uri.getBucket(), s3Uri.getKey())) { + return null; + } + + // TODO once we have fields in Manifest, populate them + + return Manifest.create(); + } +} diff --git a/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystemRegistrar.java b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystemRegistrar.java new file mode 100644 index 000000000..eea912fb9 --- /dev/null +++ b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystemRegistrar.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.jflyte.aws; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.Map; +import org.flyte.jflyte.api.FileSystem; +import org.flyte.jflyte.api.FileSystemRegistrar; + +@AutoService(FileSystemRegistrar.class) +public class S3FileSystemRegistrar extends FileSystemRegistrar { + + @Override + public Iterable load(Map env) { + return Collections.singletonList(S3FileSystem.create(env)); + } +} diff --git a/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3WritableByteChannel.java b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3WritableByteChannel.java new file mode 100644 index 000000000..e1e1c2a46 --- /dev/null +++ b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3WritableByteChannel.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.jflyte.aws; + +import com.amazonaws.services.s3.AmazonS3; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; + +class S3WritableByteChannel implements WritableByteChannel { + private final File file; + private final String bucketName; + private final String key; + private final WritableByteChannel fileChannel; + private final AmazonS3 s3; + + // TODO ideally, we should buffer into file only if it's nescessary, but for now we + // got with a simple option of always creating a file given that the code isn't + // performance-critical + + S3WritableByteChannel( + AmazonS3 s3, String bucketName, String key, File file, WritableByteChannel fileChannel) { + this.s3 = s3; + this.bucketName = bucketName; + this.key = key; + this.file = file; + this.fileChannel = fileChannel; + } + + public static S3WritableByteChannel create(AmazonS3 s3, String bucketName, String key) + throws IOException { + String fileName = key.replaceAll("\\W+", "_"); + File file = File.createTempFile("s3-upload", fileName); + file.deleteOnExit(); + + WritableByteChannel fileChannel = Files.newByteChannel(file.toPath(), StandardOpenOption.WRITE); + + return new S3WritableByteChannel( + s3, + /* bucketName= */ bucketName, + /* key= */ key, + /* file= */ file, + /* fileChannel= */ fileChannel); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return fileChannel.write(src); + } + + @Override + public boolean isOpen() { + return fileChannel.isOpen(); + } + + @Override + public void close() throws IOException { + fileChannel.close(); + + s3.putObject(/* bucketName= */ bucketName, /* key= */ key, file); + } +} diff --git a/jflyte-aws/src/main/java/org/flyte/jflyte/aws/package-info.java b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/package-info.java new file mode 100644 index 000000000..b5ebb1da2 --- /dev/null +++ b/jflyte-aws/src/main/java/org/flyte/jflyte/aws/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2020 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** Plugin for Amazon Web Services (AWS). */ +package org.flyte.jflyte.aws; diff --git a/jflyte-aws/src/test/java/org/flyte/jflyte/aws/S3FileSystemIT.java b/jflyte-aws/src/test/java/org/flyte/jflyte/aws/S3FileSystemIT.java new file mode 100644 index 000000000..ee4597a6f --- /dev/null +++ b/jflyte-aws/src/test/java/org/flyte/jflyte/aws/S3FileSystemIT.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.jflyte.aws; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.util.IOUtils; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ThreadLocalRandom; +import org.flyte.jflyte.api.Manifest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class S3FileSystemIT { + + @Container public LocalStackContainer localStack = new LocalStackContainer().withServices(S3); + + private AmazonS3 s3; + + @BeforeEach + public void setUp() { + s3 = + AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(localStack.getEndpointConfiguration(S3)) + .withCredentials(localStack.getDefaultCredentialsProvider()) + .build(); + + s3.createBucket("flyteorg"); + } + + @Test + public void testWriteAndRead() throws IOException { + S3FileSystem fileSystem = new S3FileSystem(s3); + String uri = "s3://flyteorg/0z/9bea2470f7a802f23abf84dc64cd8982"; + + byte[] inputBytes = new byte[42]; + ThreadLocalRandom.current().nextBytes(inputBytes); + + try (WritableByteChannel writer = fileSystem.writer(uri)) { + ByteArrayInputStream input = new ByteArrayInputStream(inputBytes); + IOUtils.copy(input, Channels.newOutputStream(writer)); + } + + byte[] outputBytes; + try (ReadableByteChannel reader = fileSystem.reader(uri)) { + outputBytes = IOUtils.toByteArray(Channels.newInputStream(reader)); + } + + Manifest manifest = fileSystem.getManifest(uri); + + assertArrayEquals(inputBytes, outputBytes); + assertEquals(Manifest.create(), manifest); + } + + @Test + public void testFileNotExists() { + S3FileSystem fileSystem = new S3FileSystem(s3); + String uri = "s3://flyteorg/0z/9bea2470f7a802f23abf84dc64cd8982"; + + Manifest manifest = fileSystem.getManifest(uri); + + assertNull(manifest); + } +} diff --git a/jflyte-build/pom.xml b/jflyte-build/pom.xml index 780f8d937..4a795d246 100644 --- a/jflyte-build/pom.xml +++ b/jflyte-build/pom.xml @@ -41,6 +41,10 @@ org.flyte jflyte + + org.flyte + jflyte-aws + org.flyte jflyte-google-cloud diff --git a/jflyte-google-cloud/src/main/java/org/flyte/jflyte/gcp/GcsFileSystemRegistrar.java b/jflyte-google-cloud/src/main/java/org/flyte/jflyte/gcp/GcsFileSystemRegistrar.java index 0a793b2ba..10ec17b6e 100644 --- a/jflyte-google-cloud/src/main/java/org/flyte/jflyte/gcp/GcsFileSystemRegistrar.java +++ b/jflyte-google-cloud/src/main/java/org/flyte/jflyte/gcp/GcsFileSystemRegistrar.java @@ -18,14 +18,16 @@ import com.google.auto.service.AutoService; import java.util.Collections; +import java.util.Map; import org.flyte.jflyte.api.FileSystem; import org.flyte.jflyte.api.FileSystemRegistrar; /** Registrar for {@link GcsFileSystem}. */ @AutoService(FileSystemRegistrar.class) public class GcsFileSystemRegistrar extends FileSystemRegistrar { + @Override - public Iterable load() { + public Iterable load(Map env) { return Collections.singletonList(new GcsFileSystem()); } } diff --git a/jflyte/src/main/java/org/flyte/jflyte/FileSystemLoader.java b/jflyte/src/main/java/org/flyte/jflyte/FileSystemLoader.java index fcbbe9810..4b1ddbfbc 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/FileSystemLoader.java +++ b/jflyte/src/main/java/org/flyte/jflyte/FileSystemLoader.java @@ -17,6 +17,7 @@ package org.flyte.jflyte; import com.google.common.base.Verify; +import com.google.common.collect.ImmutableMap; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -55,8 +56,10 @@ static List loadFileSystems() { List fileSystems = new ArrayList<>(); + Map env = ImmutableMap.copyOf(System.getenv()); + for (FileSystemRegistrar registrar : loader) { - for (FileSystem fileSystem : registrar.load()) { + for (FileSystem fileSystem : registrar.load(env)) { LOG.debug(String.format("Discovered FileSystem [%s]", fileSystem.getClass().getName())); fileSystems.add(fileSystem); diff --git a/pom.xml b/pom.xml index d195f41d0..31afa5f80 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ flytekit-examples-scala jflyte jflyte-api + jflyte-aws jflyte-build jflyte-google-cloud @@ -94,6 +95,11 @@ jflyte ${project.version} + + org.flyte + jflyte-aws + ${project.version} + org.flyte jflyte-google-cloud diff --git a/scripts/jflyte b/scripts/jflyte index 12f446e8d..84ef5c7dc 100755 --- a/scripts/jflyte +++ b/scripts/jflyte @@ -12,12 +12,11 @@ fi FLYTE_INTERNAL_IMAGE=${FLYTE_INTERNAL_IMAGE:-$CUSTOM_FLYTE_INTERNAL_IMAGE} docker run -it \ + --env-file=.env.local \ -e "FLYTE_INTERNAL_IMAGE=${FLYTE_INTERNAL_IMAGE}" \ - -e "FLYTE_PLATFORM_URL=${FLYTE_PLATFORM_URL}" \ - -e "FLYTE_STAGING_LOCATION=${FLYTE_STAGING_LOCATION}" \ - -e "FLYTE_PLATFORM_INSECURE=${FLYTE_PLATFORM_INSECURE}" \ -v "$(pwd):/workdir:ro" \ -v "$HOME/.config/gcloud/:/root/.config/gcloud:ro" \ -w "/workdir" \ + --net=host \ "$FLYTE_INTERNAL_IMAGE" \ jflyte $*