Skip to content

Commit

Permalink
Ab2d-6030/Multipart upload to S3 (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
smirnovaae authored Mar 15, 2024
1 parent f4ac5ba commit 37949ee
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 35 deletions.
3 changes: 2 additions & 1 deletion attribution-data-file-share/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ repositories {
dependencies {
implementation 'com.amazonaws:aws-lambda-java-core:1.2.2'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.529'
implementation 'software.amazon.awssdk:s3-transfer-manager:2.25.7'
implementation 'software.amazon.awssdk.crt:aws-crt:0.29.11'
implementation 'org.postgresql:postgresql:42.7.2'
implementation 'software.amazon.awssdk:s3:2.21.7'
implementation 'software.amazon.awssdk:ssm:2.25.7'
implementation 'software.amazon.awssdk:sts:2.25.6'
implementation project(path: ':lambda-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import gov.cms.ab2d.lambdalibs.lib.FileUtil;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.DriverManager;
Expand All @@ -36,10 +35,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
String fileFullPath = FILE_PATH + fileName;
var parameterStore = AttributionParameterStore.getParameterStore();
AttributionDataShareHelper helper = helperInit(fileName, fileFullPath, logger);
try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())){
try (var dbConnection = DriverManager.getConnection(parameterStore.getDbHost(), parameterStore.getDbUser(), parameterStore.getDbPassword())) {

helper.copyDataToFile(dbConnection);
helper.writeFileToFinalDestination(getS3Client(ENDPOINT, parameterStore));
helper.uploadToS3(getAsyncS3Client(ENDPOINT, parameterStore));

} catch (NullPointerException | URISyntaxException | SQLException ex) {
throwAttributionDataShareException(logger, ex);
Expand All @@ -49,10 +48,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
}
}

public S3Client getS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException {
var client = S3Client.builder()
.region(S3_REGION)
.endpointOverride(new URI(endpoint));
public S3AsyncClient getAsyncS3Client(String endpoint, AttributionParameterStore parameterStore) throws URISyntaxException {
var client = S3AsyncClient.crtCreate();

if (endpoint.equals(ENDPOINT)) {
var stsClient = StsClient
Expand All @@ -72,10 +69,17 @@ public S3Client getS3Client(String endpoint, AttributionParameterStore parameter
.refreshRequest(request)
.build();

client.credentialsProvider(credentials);
client =
S3AsyncClient.crtBuilder()
.credentialsProvider(credentials)
.region(S3_REGION)
.targetThroughputInGbps(20.0)
.minimumPartSizeInBytes(8 * 1025 * 1024L)
.build();
}
return client.build();
return client;
}

AttributionDataShareHelper helperInit(String fileName, String fileFullPath, LambdaLogger logger) {
return new AttributionDataShareHelper(fileName, fileFullPath, logger);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package gov.cms.ab2d.attributionDataShare;

import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -31,7 +33,9 @@ void copyDataToFile(Connection connection) {
String date = new SimpleDateFormat(EFFECTIVE_DATE_PATTERN).format(new Date());
try (var stmt = connection.createStatement();
var writer = new BufferedWriter(new FileWriter(fileFullPath, true))) {

var rs = getExecuteQuery(stmt);

writer.write(FIRST_LINE + date);
writer.newLine();
long records = 0;
Expand All @@ -42,6 +46,7 @@ void copyDataToFile(Connection connection) {
records++;
}
writer.write(LAST_LINE + date + String.format("%010d", records));

} catch (SQLException | IOException ex) {
String errorMessage = "An error occurred while exporting data to a file. ";
logger.log(errorMessage + ex.getMessage());
Expand All @@ -63,19 +68,23 @@ String getResponseLine(String currentMbi, Timestamp effectiveDate, Boolean optOu
return result.toString();
}

void writeFileToFinalDestination(S3Client s3Client) {
try {
var objectRequest = PutObjectRequest.builder()
.bucket(getBucketName())
.key(getUploadPath() + fileName)
.build();
public String uploadToS3(S3AsyncClient s3AsyncClient) {
String currentDate = new SimpleDateFormat(REQ_FILE_NAME_PATTERN).format(new Date());
var key = REQ_FILE_NAME + currentDate;
S3TransferManager transferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();

s3Client.putObject(objectRequest, RequestBody.fromFile(new File(fileFullPath)));
} catch (AmazonS3Exception ex) {
var errorMessage = "Response AttributionDataShare file cannot be created. ";
logger.log(errorMessage + ex.getMessage());
throw new AttributionDataShareException(errorMessage, ex);
}
UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(getBucketName()).key(getUploadPath() + key))
.addTransferListener(LoggingTransferListener.create())
.source(Paths.get(fileFullPath))
.build();

FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest);

CompletedFileUpload uploadResult = fileUpload.completionFuture().join();
return uploadResult.response().eTag();
}

public String getBucketName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class AttributionDataShareHandlerTest {

@Test
void attributionDataShareInvoke() {
var mockParameterStore = mockStatic(AttributionParameterStore.class);
var mockParameterStore = mockStatic(AttributionParameterStore.class);
mockParameterStore
.when(AttributionParameterStore::getParameterStore)
.thenReturn(parameterStore);

Connection dbConnection = mock(Connection.class);
mockStatic(DriverManager.class)
.when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection);
.when(() -> DriverManager.getConnection(anyString(), anyString(), anyString())).thenReturn(dbConnection);

when(handler.helperInit(anyString(), anyString(), any(LambdaLogger.class))).thenReturn(helper);
assertDoesNotThrow(() -> handler.handleRequest(null, System.out, new TestContext()));
Expand All @@ -51,6 +51,6 @@ void attributionDataShareExceptionTest() {

@Test
void getS3ClientTest() throws URISyntaxException {
assertNotNull(handler.getS3Client(TEST_ENDPOINT, parameterStore));
assertNotNull(handler.getAsyncS3Client(TEST_ENDPOINT, parameterStore));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void getResponseLineTest() {
@Test
void writeFileToFinalDestinationTest() throws IOException {
createTestFile();
helper.writeFileToFinalDestination(S3MockAPIExtension.S3_CLIENT);
helper.uploadToS3(S3MockAPIExtension.S3_CLIENT);
assertTrue(S3MockAPIExtension.isObjectExists(FILE_NAME));
S3MockAPIExtension.deleteFile(FILE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.findify.s3mock.S3Mock;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.*;

import java.net.URI;
Expand All @@ -14,7 +14,7 @@
public class S3MockAPIExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {

private static final S3Mock API = S3Mock.create(8001, "/tmp/s3");
public static S3Client S3_CLIENT;
public static S3AsyncClient S3_CLIENT;
private static boolean STARTED = false;

@Override
Expand All @@ -25,7 +25,7 @@ public void beforeAll(ExtensionContext context) throws Exception {

API.start();

S3_CLIENT = S3Client.builder()
S3_CLIENT = S3AsyncClient.crtBuilder()
.region(S3_REGION)
.endpointOverride(new URI(TEST_ENDPOINT))
.build();
Expand Down

0 comments on commit 37949ee

Please sign in to comment.