Skip to content

Commit d8e530a

Browse files
committed
feat: add support for s3 multipart upload
Signed-off-by: Gustav Grusell <[email protected]>
1 parent 65de9bc commit d8e530a

File tree

9 files changed

+167
-81
lines changed

9 files changed

+167
-81
lines changed

encore-common/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ dependencies {
1616

1717
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
1818
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j")
19-
implementation(platform("software.amazon.awssdk:bom:2.29.2"))
19+
implementation(platform("software.amazon.awssdk:bom:2.33.9"))
2020
implementation("software.amazon.awssdk:s3")
21+
implementation("software.amazon.awssdk:s3-transfer-manager")
2122

2223
testImplementation(project(":encore-web"))
2324
testImplementation("org.springframework.security:spring-security-test")

encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import software.amazon.awssdk.regions.Region
1717
import software.amazon.awssdk.services.s3.S3AsyncClient
1818
import software.amazon.awssdk.services.s3.S3Configuration
1919
import software.amazon.awssdk.services.s3.presigner.S3Presigner
20+
import software.amazon.awssdk.transfer.s3.S3TransferManager
2021
import java.net.URI
2122

2223
@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true")
@@ -28,6 +29,12 @@ class S3RemoteFilesConfiguration {
2829
fun s3Region() =
2930
Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1")
3031

32+
@Bean
33+
fun transferManager(s3Client: S3AsyncClient, s3Properties: S3Properties): S3TransferManager =
34+
S3TransferManager.builder()
35+
.s3Client(s3Client)
36+
.build()
37+
3138
@Bean
3239
fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder()
3340
.region(s3Region)
@@ -42,11 +49,11 @@ class S3RemoteFilesConfiguration {
4249
if (s3Properties.anonymousAccess) {
4350
AnonymousCredentialsProvider.create()
4451
} else {
45-
DefaultCredentialsProvider.create()
52+
DefaultCredentialsProvider.builder().build()
4653
},
4754
)
4855
.apply {
49-
if (!s3Properties.endpoint.isNullOrBlank()) {
56+
if (s3Properties.endpoint.isNotBlank()) {
5057
endpointOverride(URI.create(s3Properties.endpoint))
5158
}
5259
}
@@ -76,6 +83,7 @@ class S3RemoteFilesConfiguration {
7683
s3Presigner: S3Presigner,
7784
s3Properties: S3Properties,
7885
s3UriConverter: S3UriConverter,
86+
transferManager: S3TransferManager,
7987
) =
80-
S3RemoteFileHandler(s3Client, s3Presigner, s3Properties, s3UriConverter)
88+
S3RemoteFileHandler(s3Client, s3Presigner, s3Properties, s3UriConverter, transferManager)
8189
}

encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import java.time.Duration
1111
data class S3Properties(
1212
val enabled: Boolean = false,
1313
val anonymousAccess: Boolean = false,
14+
val usePathStyle: Boolean = false,
1415
val endpoint: String = "",
1516
val presignDurationSeconds: Long = Duration.ofHours(12).seconds,
1617
val uploadTimeoutSeconds: Long = Duration.ofHours(1).seconds,

encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest
1111
import software.amazon.awssdk.services.s3.model.PutObjectRequest
1212
import software.amazon.awssdk.services.s3.presigner.S3Presigner
1313
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest
14+
import software.amazon.awssdk.transfer.s3.S3TransferManager
15+
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest
1416
import java.net.URI
1517
import java.nio.file.Paths
1618
import java.util.concurrent.TimeUnit
@@ -22,6 +24,7 @@ class S3RemoteFileHandler(
2224
private val presigner: S3Presigner,
2325
private val s3Properties: S3Properties,
2426
private val s3UriConverter: S3UriConverter,
27+
private val transferManager: S3TransferManager,
2528
) : RemoteFileHandler {
2629

2730
override fun getAccessUri(uri: String): String {
@@ -54,11 +57,30 @@ class S3RemoteFileHandler(
5457
log.info { "Uploading $localFile to $remoteFile" }
5558
val s3Uri = URI.create(remoteFile)
5659
val (bucket, key) = s3UriConverter.getBucketAndKey(s3Uri)
60+
if (s3Properties.anonymousAccess) {
61+
standardUpload(localFile, bucket, key)
62+
} else {
63+
transferManagerUpload(localFile, bucket, key)
64+
}
65+
}
66+
67+
private fun transferManagerUpload(localFile: String, bucket: String, key: String) {
68+
val uploadRequest = UploadFileRequest.builder()
69+
.putObjectRequest { por -> por.bucket(bucket).key(key) }
70+
.source(Paths.get(localFile))
71+
.build()
72+
val fileUpload = transferManager.uploadFile(uploadRequest)
73+
val res = fileUpload.completionFuture().get(s3Properties.uploadTimeoutSeconds, TimeUnit.SECONDS)
74+
log.info { "Upload completed: $res" }
75+
}
76+
77+
private fun standardUpload(localFile: String, bucket: String, key: String) {
5778
val putObjectRequest: PutObjectRequest = PutObjectRequest.builder()
5879
.bucket(bucket)
5980
.key(key)
6081
.build()
61-
val res = client.putObject(putObjectRequest, Paths.get(localFile)).get(s3Properties.presignDurationSeconds, TimeUnit.SECONDS)
82+
val res = client.putObject(putObjectRequest, Paths.get(localFile))
83+
.get(s3Properties.presignDurationSeconds, TimeUnit.SECONDS)
6284
log.info { "Upload result: $res" }
6385
}
6486

encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3UriConverter.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package se.svt.oss.encore.service.remotefiles.s3
66

7+
import org.springframework.web.util.UriComponentsBuilder
78
import software.amazon.awssdk.regions.Region
89
import java.net.URI
910

@@ -20,7 +21,17 @@ class S3UriConverter(
2021
val key = s3Uri.path.stripLeadingSlash()
2122

2223
if (s3Properties.endpoint.isNotBlank()) {
23-
return "https://$bucket.${s3Properties.endpoint}/$key"
24+
val endpointUri = URI.create(s3Properties.endpoint)
25+
val uriBuilder = UriComponentsBuilder.fromUri(endpointUri)
26+
val pathSegments = key.split("/").toTypedArray()
27+
if (s3Properties.usePathStyle) {
28+
uriBuilder.pathSegment(bucket, *pathSegments)
29+
} else {
30+
uriBuilder
31+
.host("$bucket.${endpointUri.host}")
32+
.pathSegment(*pathSegments)
33+
}
34+
return uriBuilder.toUriString()
2435
}
2536
return "https://$bucket.s3.$region.amazonaws.com/$key"
2637
}

encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import mu.KotlinLogging
1111
import org.awaitility.Durations
1212
import org.junit.jupiter.api.AfterEach
1313
import org.junit.jupiter.api.BeforeEach
14+
import org.junit.jupiter.api.Nested
1415
import org.junit.jupiter.api.Test
1516
import org.junit.jupiter.api.extension.ExtendWith
1617
import org.junit.jupiter.api.io.TempDir
1718
import org.springframework.beans.factory.annotation.Autowired
1819
import org.springframework.test.context.ActiveProfiles
20+
import org.springframework.test.context.TestPropertySource
1921
import se.svt.oss.encore.Assertions.assertThat
2022
import se.svt.oss.encore.model.Status
2123
import se.svt.oss.encore.model.callback.JobProgress
@@ -25,9 +27,7 @@ import java.io.File
2527
import java.nio.file.Paths
2628

2729
@ExtendWith(S3StorageExtension::class)
28-
@ActiveProfiles(profiles = ["test-local", "test-s3"])
29-
@WireMockTest
30-
class EncoreS3IntegrationTest(wireMockRuntimeInfo: WireMockRuntimeInfo) : EncoreIntegrationTestBase(wireMockRuntimeInfo) {
30+
abstract class EncoreS3IntegrationTest(wireMockRuntimeInfo: WireMockRuntimeInfo) : EncoreIntegrationTestBase(wireMockRuntimeInfo) {
3131
private val log = KotlinLogging.logger {}
3232

3333
@Autowired
@@ -61,7 +61,6 @@ class EncoreS3IntegrationTest(wireMockRuntimeInfo: WireMockRuntimeInfo) : Encore
6161
}
6262
}
6363

64-
@Test
6564
fun jobWiths3InputAndOutputIsSuccessful(@TempDir outputDir: File) {
6665
val filename = "test.mp4"
6766
val remoteInput = uploadInputfile(testFileSurround.file.absolutePath, filename)
@@ -104,4 +103,30 @@ class EncoreS3IntegrationTest(wireMockRuntimeInfo: WireMockRuntimeInfo) : Encore
104103

105104
return "s3://$inputBucket/$key"
106105
}
106+
107+
@Nested
108+
@ActiveProfiles(profiles = ["test-local", "test-s3"])
109+
@WireMockTest
110+
class StandardS3Access(wireMockRuntimeInfo: WireMockRuntimeInfo) : EncoreS3IntegrationTest(wireMockRuntimeInfo) {
111+
@Test
112+
fun jobWithS3InputAndOutputIsSuccessful(@TempDir outputDir: File) {
113+
super.jobWiths3InputAndOutputIsSuccessful(outputDir)
114+
}
115+
}
116+
117+
@Nested
118+
@ActiveProfiles(profiles = ["test-local", "test-s3"])
119+
@TestPropertySource(
120+
properties = [
121+
"remote-files.s3.anonymous-access=true",
122+
"remote-files.s3.use-path-style=true", // localstack requires path style access
123+
],
124+
)
125+
@WireMockTest
126+
class AnonymousS3Access(wireMockRuntimeInfo: WireMockRuntimeInfo) : EncoreS3IntegrationTest(wireMockRuntimeInfo) {
127+
@Test
128+
fun jobWithS3InputAndOutputIsSuccessful(@TempDir outputDir: File) {
129+
super.jobWiths3InputAndOutputIsSuccessful(outputDir)
130+
}
131+
}
107132
}

encore-common/src/test/kotlin/se/svt/oss/encore/service/remotefiles/S3UriConverterTest.kt

Lines changed: 0 additions & 70 deletions
This file was deleted.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
2+
//
3+
// SPDX-License-Identifier: EUPL-1.2
4+
5+
package se.svt.oss.encore.service.remotefiles.s3
6+
7+
import org.assertj.core.api.Assertions.assertThat
8+
import org.assertj.core.api.Assertions.assertThatThrownBy
9+
import org.junit.jupiter.api.Nested
10+
import org.junit.jupiter.api.Test
11+
import software.amazon.awssdk.regions.Region
12+
import java.net.URI
13+
14+
class S3UriConverterTest {
15+
16+
private val s3Properties = S3Properties(enabled = true, anonymousAccess = true)
17+
private val region = Region.of("eu-west-1")
18+
private val s3Uri = URI.create("s3://my-bucket/test2/test1_x264_3100.mp4")
19+
private val s3UriConverter = S3UriConverter(s3Properties, region)
20+
21+
@Nested
22+
inner class ToHttp {
23+
24+
@Test
25+
fun returnsCorrectUri() {
26+
val httpUri = s3UriConverter.toHttp(s3Uri)
27+
assertThat(httpUri).isEqualTo("https://my-bucket.s3.eu-west-1.amazonaws.com/test2/test1_x264_3100.mp4")
28+
}
29+
30+
@Test
31+
fun differentRegionReturnsCorrectUri() {
32+
val s3UriConverter = S3UriConverter(s3Properties, Region.of("eu-north-1"))
33+
34+
val httpUri = s3UriConverter.toHttp(s3Uri)
35+
assertThat(httpUri).isEqualTo("https://my-bucket.s3.eu-north-1.amazonaws.com/test2/test1_x264_3100.mp4")
36+
}
37+
38+
@Test
39+
fun customEndpointReturnsCorrectUri() {
40+
val endpoint = "https://some-host:1234"
41+
val s3UriConverter = S3UriConverter(s3Properties.copy(endpoint = endpoint), region)
42+
43+
val httpUri = s3UriConverter.toHttp(s3Uri)
44+
assertThat(httpUri).isEqualTo("https://my-bucket.some-host:1234/test2/test1_x264_3100.mp4")
45+
}
46+
47+
@Test
48+
fun customEndpointWithoutPortReturnsCorrectUri() {
49+
val endpoint = "https://some-host"
50+
val s3UriConverter = S3UriConverter(s3Properties.copy(endpoint = endpoint), region)
51+
52+
val httpUri = s3UriConverter.toHttp(s3Uri)
53+
assertThat(httpUri).isEqualTo("https://my-bucket.some-host/test2/test1_x264_3100.mp4")
54+
}
55+
56+
@Test
57+
fun customEndpointReturnsCorrectPathStyleUri() {
58+
val endpoint = "http://some-host:1234"
59+
val s3UriConverter = S3UriConverter(s3Properties.copy(endpoint = endpoint, usePathStyle = true), region)
60+
61+
val httpUri = s3UriConverter.toHttp(s3Uri)
62+
assertThat(httpUri).isEqualTo("http://some-host:1234/my-bucket/test2/test1_x264_3100.mp4")
63+
}
64+
65+
@Test
66+
fun nonS3UriThrowsException() {
67+
val uri = URI.create("https://my-bucket/test2/test1_x264_3100.mp4")
68+
assertThatThrownBy { s3UriConverter.toHttp(uri) }.isInstanceOf(IllegalArgumentException::class.java)
69+
.hasMessage("Invalid URI: $uri")
70+
}
71+
}
72+
73+
@Nested
74+
inner class GetBucketAndKey {
75+
@Test
76+
fun returnsCorrectValues() {
77+
val (bucket, key) = s3UriConverter.getBucketAndKey(s3Uri)
78+
assertThat(bucket).isEqualTo("my-bucket")
79+
assertThat(key).isEqualTo("test2/test1_x264_3100.mp4")
80+
}
81+
82+
@Test
83+
fun nonS3UriThrowsException() {
84+
val uri = URI.create("https://my-bucket/test2/test1_x264_3100.mp4")
85+
assertThatThrownBy { s3UriConverter.getBucketAndKey(uri) }.isInstanceOf(IllegalArgumentException::class.java)
86+
.hasMessage("Invalid URI: $uri")
87+
}
88+
}
89+
}

encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ class S3StorageExtension : BeforeAllCallback {
2424
localstack.start()
2525

2626
log.info { "localstack endpoint: ${localstack.endpoint}" }
27-
2827
System.setProperty("aws.accessKeyId", localstack.accessKey)
2928
System.setProperty("aws.secretAccessKey", localstack.secretKey)
3029
System.setProperty("remote-files.s3.endpoint", localstack.endpoint.toString())

0 commit comments

Comments
 (0)