Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ab2d-6030/Multipart upload to S3 #94

Merged
merged 7 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion attribution-data-file-share/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ repositories {
dependencies {
implementation 'com.amazonaws:aws-lambda-java-core:1.2.2'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.529'
implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7'
implementation 'software.amazon.awssdk.crt:aws-crt:0.29.11'
implementation 'org.postgresql:postgresql:42.7.2'
implementation 'software.amazon.awssdk:s3:2.21.7'
implementation 'software.amazon.awssdk:ssm:2.25.7'
implementation 'software.amazon.awssdk:sts:2.25.6'
implementation project(path: ':lambda-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import gov.cms.ab2d.lambdalibs.lib.FileUtil;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.DriverManager;
Expand All @@ -36,10 +35,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
String fileFullPath = FILE_PATH + fileName;
var parameterStore = AttributionParameterStore.getParameterStore();
AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger);
try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())){
try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) {

helper.copyDataToFile(dbConnection);
helper.writeFileToFinalDestination(getS3Client(ENDPOINT, parameterStore));
helper.uploadToS3(getAsyncS3Client(ENDPOINT, parameterStore));

} catch (NullPointerException | URISyntaxException | SQLException ex) {
throwAttributionDataShareException(logger, ex);
Expand All @@ -49,10 +48,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
}
}

public S3Client getS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException {
var client = S3Client.builder()
.region(S3_REGION)
.endpointOverride(new URI(endpoint));
public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException {
var client = S3AsyncClient.crtCreate();

if (endpoint.equals(ENDPOINT)) {
var stsClient = StsClient
Expand All @@ -72,10 +69,17 @@ public S3Client getS3Client(String endpoint, AttributionParameterStore parameter
.refreshRequest(request)
.build();

client.credentialsProvider(credentials);
client =
S3AsyncClient.crtBuilder()
.credentialsProvider(credentials)
.region(S3_REGION)
.targetThroughputInGbps(20.0)
.minimumPartSizeInBytes(8 * 1025 * 1024L)
.build();
}
return client.build();
return client;
}

AttributionDataShareHelper helperInit(String fileName, String fileFullPath, LambdaLogger logger) {
return new AttributionDataShareHelper(fileName, fileFullPath, logger);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package gov.cms.ab2d.attributionDataShare;

import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -31,7 +33,9 @@ void copyDataToFile(Connection connection) {
String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date());
try (var stmt = connection.createStatement();
var writer = new BufferedWriter(new FileWriter(fileFullPath, true))) {

var rs = getExecuteQuery(stmt);

writer.write(FIRST_LINE + date);
writer.newLine();
long records = 0;
Expand All @@ -42,6 +46,7 @@ void copyDataToFile(Connection connection) {
records++;
}
writer.write(LAST_LINE + date + String.format("%010d", records));

} catch (SQLException | IOException ex) {
String errorMessage = "An error occurred while exporting data to a file. ";
logger.log(errorMessage + ex.getMessage());
Expand All @@ -63,19 +68,23 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu
return result.toString();
}

void writeFileToFinalDestination(S3Client s3Client) {
try {
var objectRequest = PutObjectRequest.builder()
.bucket(getBucketName())
.key(getUploadPath() + fileName)
.build();
public String uploadToS3(S3AsyncClient s3AsyncClient) {
String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date());
var key = REQ_FILE_NAME + currentDate;
S3TransferManager transferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();

s3Client.putObject(objectRequest, RequestBody.fromFile(new File(fileFullPath)));
} catch (AmazonS3Exception ex) {
var errorMessage = "Response AttributionDataShare file cannot be created. ";
logger.log(errorMessage + ex.getMessage());
throw new AttributionDataShareException(errorMessage, ex);
}
UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(getBucketName()).key(getUploadPath() + key))
.addTransferListener(LoggingTransferListener.create())
.source(Paths.get(fileFullPath))
.build();

FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest);

CompletedFileUpload uploadResult = fileUpload.completionFuture().join();
return uploadResult.response().eTag();
}

public String getBucketName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class AttributionDataShareHandlerTest {

@Test
void attributionDataShareInvoke() {
var mockParameterStore = mockStatic(AttributionParameterStore.class);
var mockParameterStore = mockStatic(AttributionParameterStore.class);
mockParameterStore
.when(AttributionParameterStore::getParameterStore)
.thenReturn(parameterStore);

Connection dbConnection = mock(Connection.class);
mockStatic(DriverManager.class)
.when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection);
.when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection);

when(handler.helperInit(anyString(), anyString(), any(LambdaLogger.class))).thenReturn(helper);
assertDoesNotThrow(() -> handler.handleRequest(null, System.out, new TestContext()));
Expand All @@ -51,6 +51,6 @@ void attributionDataShareExceptionTest() {

@Test
void getS3ClientTest() throws URISyntaxException {
assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore));
assertNotNull(handler.getAsyncS3Client(TEST_ENDPOINT, parameterStore));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void getResponseLineTest() {
@Test
void writeFileToFinalDestinationTest() throws IOException {
createTestFile();
helper.writeFileToFinalDestination(S3MockAPIExtension.S3_CLIENT);
helper.uploadToS3(S3MockAPIExtension.S3_CLIENT);
assertTrue(S3MockAPIExtension.isObjectExists(FILE_NAME));
S3MockAPIExtension.deleteFile(FILE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.findify.s3mock.S3Mock;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.*;

import java.net.URI;
Expand All @@ -14,7 +14,7 @@
public class S3MockAPIExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {

private static final S3Mock API = S3Mock.create(8001, "/tmp/s3");
public static S3Client S3_CLIENT;
public static S3AsyncClient S3_CLIENT;
private static boolean STARTED = false;

@Override
Expand All @@ -25,7 +25,7 @@ public void beforeAll(ExtensionContext context) throws Exception {

API.start();

S3_CLIENT = S3Client.builder()
S3_CLIENT = S3AsyncClient.crtBuilder()
.region(S3_REGION)
.endpointOverride(new URI(TEST_ENDPOINT))
.build();
Expand Down
Loading