Skip to content

Commit

Permalink
Add jflyte-aws
Browse files Browse the repository at this point in the history
Implement S3 FileSystem
  • Loading branch information
kanterov committed Jun 16, 2020
1 parent 7be868a commit c1c9cb0
Show file tree
Hide file tree
Showing 13 changed files with 499 additions and 6 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ARG FLYTE_INTERNAL_IMAGE
COPY jflyte/target/lib /jflyte/

# plugins
COPY jflyte-aws/target/lib /jflyte/modules/jflyte-aws
COPY jflyte-google-cloud/target/lib /jflyte/modules/jflyte-google-cloud

ENV FLYTE_INTERNAL_MODULE_DIR "/jflyte/modules"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.flyte.jflyte.api;

import java.util.Map;

/** A registrar that creates {@link FileSystem} instances. */
public abstract class FileSystemRegistrar {
public abstract Iterable<FileSystem> load();
public abstract Iterable<FileSystem> load(Map<String, String> env);
}
123 changes: 123 additions & 0 deletions jflyte-aws/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<!--
Copyright 2020 Spotify AB.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.flyte</groupId>
<artifactId>flytekit-parent</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>

<artifactId>jflyte-aws</artifactId>

<dependencyManagement>
<dependencies>
<!--
NB: we don't add bom to root pom on purpose,
because jflyte-aws is loaded in a separate class loader
not to cause dependency conflicts with the user code
-->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.327</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.14.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- dependency conflict resolution -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- compile -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<!-- provided -->
<dependency>
<groupId>org.flyte</groupId>
<artifactId>jflyte-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
132 changes: 132 additions & 0 deletions jflyte-aws/src/main/java/org/flyte/jflyte/aws/S3FileSystem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2020 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.jflyte.aws;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectId;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.flyte.jflyte.api.FileSystem;
import org.flyte.jflyte.api.Manifest;

public class S3FileSystem implements FileSystem {
private final AmazonS3 s3;

private static final Logger LOG = Logger.getLogger(S3FileSystem.class.getName());

static {
// enable all levels for the actual handler to pick up
LOG.setLevel(Level.ALL);
}

public S3FileSystem(AmazonS3 s3) {
this.s3 = s3;
}

public static S3FileSystem create(Map<String, String> env) {
String endpoint = env.get("FLYTE_AWS_ENDPOINT");
String accessKeyId = env.get("FLYTE_AWS_ACCESS_KEY_ID");

AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();

if (accessKeyId != null) {
LOG.fine(String.format("Using FLYTE_AWS_ACCESS_KEY_ID [%s]", accessKeyId));

String secretAccessKey = env.get("FLYTE_AWS_SECRET_ACCESS_KEY");
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);

builder.withCredentials(new AWSStaticCredentialsProvider(credentials));
}

if (endpoint != null) {
LOG.fine(String.format("Using FLYTE_AWS_ENDPOINT [%s]", endpoint));

// assume it's minio from this point, it doesn't work without signer override
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

builder.withClientConfiguration(clientConfiguration);
builder.withPathStyleAccessEnabled(true);

builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(endpoint, Regions.US_EAST_1.name()));
} else {
builder.withRegion(Regions.DEFAULT_REGION);
}

return new S3FileSystem(builder.build());
}

@Override
public String getScheme() {
return "s3";
}

@Override
public ReadableByteChannel reader(String uri) {
AmazonS3URI s3Uri = new AmazonS3URI(uri);
S3ObjectId objectId = new S3ObjectId(s3Uri.getBucket(), s3Uri.getKey(), s3Uri.getVersionId());
S3Object object = s3.getObject(new GetObjectRequest(objectId));

return Channels.newChannel(object.getObjectContent());
}

@Override
public WritableByteChannel writer(String uri) {
AmazonS3URI s3Uri = new AmazonS3URI(uri);

LOG.fine("bucket=" + s3Uri.getBucket() + " key=" + s3Uri.getKey());
LOG.fine(s3.listBuckets() + "");

try {
return S3WritableByteChannel.create(
s3, /* bucketName= */ s3Uri.getBucket(), /* key= */ s3Uri.getKey());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Nullable
@Override
public Manifest getManifest(String uri) {
AmazonS3URI s3Uri = new AmazonS3URI(uri);

if (!s3.doesObjectExist(s3Uri.getBucket(), s3Uri.getKey())) {
return null;
}

// TODO once we have fields in Manifest, populate them

return Manifest.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.jflyte.aws;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.Map;
import org.flyte.jflyte.api.FileSystem;
import org.flyte.jflyte.api.FileSystemRegistrar;

@AutoService(FileSystemRegistrar.class)
public class S3FileSystemRegistrar extends FileSystemRegistrar {

@Override
public Iterable<FileSystem> load(Map<String, String> env) {
return Collections.singletonList(S3FileSystem.create(env));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.jflyte.aws;

import com.amazonaws.services.s3.AmazonS3;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

class S3WritableByteChannel implements WritableByteChannel {
private final File file;
private final String bucketName;
private final String key;
private final WritableByteChannel fileChannel;
private final AmazonS3 s3;

// TODO ideally, we should buffer into file only if it's nescessary, but for now we
// got with a simple option of always creating a file given that the code isn't
// performance-critical

S3WritableByteChannel(
AmazonS3 s3, String bucketName, String key, File file, WritableByteChannel fileChannel) {
this.s3 = s3;
this.bucketName = bucketName;
this.key = key;
this.file = file;
this.fileChannel = fileChannel;
}

public static S3WritableByteChannel create(AmazonS3 s3, String bucketName, String key)
throws IOException {
String fileName = key.replaceAll("\\W+", "_");
File file = File.createTempFile("s3-upload", fileName);
file.deleteOnExit();

WritableByteChannel fileChannel = Files.newByteChannel(file.toPath(), StandardOpenOption.WRITE);

return new S3WritableByteChannel(
s3,
/* bucketName= */ bucketName,
/* key= */ key,
/* file= */ file,
/* fileChannel= */ fileChannel);
}

@Override
public int write(ByteBuffer src) throws IOException {
return fileChannel.write(src);
}

@Override
public boolean isOpen() {
return fileChannel.isOpen();
}

@Override
public void close() throws IOException {
fileChannel.close();

s3.putObject(/* bucketName= */ bucketName, /* key= */ key, file);
}
}
Loading

0 comments on commit c1c9cb0

Please sign in to comment.