Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Store the resources in S3 buckets #611

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bundles/sirix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
api implLibraries.iouring
api implLibraries.lz4
api implLibraries.roaringbitmap
api implLibraries.amazonS3

implementation implLibraries.snappyJava
implementation implLibraries.browniesCollections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,22 @@ public final class AmazonS3Storage implements ICloudStorage {
public AmazonS3Storage(String bucketName, String awsProfile,
String region,
boolean shouldCreateBucketIfNotExists, final ResourceConfiguration resourceConfig,
AsyncCache<Integer, RevisionFileData> cache,
ByteHandlerPipeline byteHandlerPipeline) {
AsyncCache<Integer, RevisionFileData> cache) {
this.bucketName = bucketName;
this.awsProfile = awsProfile;
this.region = region;
this.cache = cache;
this.byteHandlerPipeline = byteHandlerPipeline;
this.byteHandlerPipeline = resourceConfig.byteHandlePipeline;
this.file = resourceConfig.resourcePath;
this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only.
if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) {
createBucket(bucketName);
}
this.reader = new AmazonS3StorageReader(bucketName,
s3Client,
getDataFilePath().toAbsolutePath().toString(),
getRevisionFilePath().toAbsolutePath().toString(),
new ByteHandlerPipeline(byteHandlerPipeline),
new ByteHandlerPipeline(this.byteHandlerPipeline),
SerializationType.DATA,
new PagePersister(),
cache.synchronous());
Expand All @@ -125,7 +128,7 @@ void createBucket(String bucketName) {
}

boolean isBucketExists(String bucketName) {
HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build();
HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build();

try {
s3Client.headBucket(headBucketRequest);
Expand All @@ -136,20 +139,17 @@ boolean isBucketExists(String bucketName) {
}

S3Client getS3Client() {
S3Client s3Client = null;
s3Client = S3Client.builder()
return this.s3Client==null ? S3Client.builder()
.region(Region.of(region))
.credentialsProvider(ProfileCredentialsProvider.create(awsProfile))
.build();
return s3Client;
.build() : this.s3Client;
}

S3AsyncClient getAsyncS3Client() {
S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
return S3AsyncClient.builder()
.region(Region.of(region))
.credentialsProvider(ProfileCredentialsProvider.create(awsProfile))
.build();
return s3AsyncClient;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ protected Path readObjectDataFromS3(String keyName) {

ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(objectRequest);
byte[] data = objectBytes.asByteArray();
String path = System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator() + keyName;
/*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem
* instead of in the tmp partition*/
String path = FileSystems.getDefault().getSeparator() + bucketName + FileSystems.getDefault().getSeparator() + keyName;
sband marked this conversation as resolved.
Show resolved Hide resolved
// Write the data to a local file.
File myFile = new File(path);
try(OutputStream os = new FileOutputStream(myFile)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected void writeObjectToS3(String keyName, File object, boolean isDataFile)
try {
if (response != null) {
LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName,bucketName));
object.delete();
/*No need to delete/cleanup the file as we are writing on the local file system, so this avoid
* unnecessarily filling up the filesystem space*/
} else {
// Handle error
error.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package org.sirix.io.combined;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.sirix.api.PageReadOnlyTrx;
import org.sirix.io.AbstractForwardingReader;
import org.sirix.io.Reader;
import org.sirix.io.RevisionFileData;
import org.sirix.io.Writer;
import org.sirix.io.cloud.amazon.AmazonS3StorageReader;
import org.sirix.page.PageReference;
import org.sirix.page.RevisionRootPage;
import org.sirix.page.interfaces.Page;
import org.sirix.utils.LogWrapper;
import org.slf4j.LoggerFactory;

import net.openhft.chronicle.bytes.Bytes;

public class CombinedStorageWriter extends AbstractForwardingReader implements Writer {

private Writer localStorageWriter, remoteStorageWriter;
private Reader storageReader;
/** Logger. */
private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(CombinedStorageWriter.class));

public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader storageReader) {
this.localStorageWriter = localWriter;
Expand All @@ -29,29 +31,61 @@ public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader sto
@Override
public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes<ByteBuffer> bufferedBytes) {
Writer writer = localStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes);
remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes);
CompletableFuture<Writer> remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes));
if (writer == null) {
sband marked this conversation as resolved.
Show resolved Hide resolved
sband marked this conversation as resolved.
Show resolved Hide resolved
try {
writer = remoteWriterTask.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Could not complete remote write operation, please check the error details");
e.printStackTrace();
}
}
return writer;
}

@Override
public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference,
Bytes<ByteBuffer> bufferedBytes) {
Writer writer = localStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes);
remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes);
CompletableFuture<Writer> remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes));
if (writer == null) {
sband marked this conversation as resolved.
Show resolved Hide resolved
try {
writer = remoteWriterTask.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Could not complete remote write operation, please check the error details");
e.printStackTrace();
}
}
return writer;
}

@Override
public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) {
Writer writer = localStorageWriter.truncateTo(pageReadOnlyTrx, revision);
remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision);
CompletableFuture<Writer> remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision));
sband marked this conversation as resolved.
Show resolved Hide resolved
if (writer == null) {
try {
writer = remoteWriterTask.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Could not complete remote write operation, please check the error details");
e.printStackTrace();
}
}
return writer;
}

@Override
public Writer truncate() {
Writer writer = localStorageWriter.truncate();
remoteStorageWriter.truncate();
CompletableFuture<Writer> remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncate());
if (writer == null) {
sband marked this conversation as resolved.
Show resolved Hide resolved
try {
writer = remoteWriterTask.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Could not complete remote write operation, please check the error details");
e.printStackTrace();
}
}
return writer;
}

Expand Down