Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement a presignGetObject method #500

Open
wants to merge 2 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
with:
fetch-depth: 0
- name: Setup Scala and Java
uses: olafurpg/setup-scala@v13
uses: coursier/setup-action@v1
- name: Cache scala dependencies
uses: coursier/cache-action@v6
- name: Lint code
Expand All @@ -35,7 +35,7 @@ jobs:
- name: Checkout current branch
uses: actions/[email protected]
- name: Setup Scala and Java
uses: olafurpg/setup-scala@v13
uses: coursier/setup-action@v1
- name: Cache scala dependencies
uses: coursier/cache-action@v6
- name: Check Document Generation
Expand All @@ -55,7 +55,7 @@ jobs:
with:
fetch-depth: 0
- name: Setup Scala and Java
uses: olafurpg/setup-scala@v13
uses: coursier/setup-action@v1
with:
java-version: ${{ matrix.java }}
- name: Cache scala dependencies
Expand All @@ -72,16 +72,16 @@ jobs:
strategy:
fail-fast: false
matrix:
java: ['17', '21']
java: ['corretto:17.0.11.9.1', 'corretto:21.0.3.9.1']
steps:
- name: Checkout current branch
uses: actions/[email protected]
with:
fetch-depth: 0
- name: Setup Scala and Java
uses: olafurpg/setup-scala@v13
uses: coursier/setup-action@v1
with:
java-version: ${{ matrix.java }}
jvm: ${{ matrix.java }}
- name: Cache scala dependencies
uses: coursier/cache-action@v6
- name: Start containers
Expand All @@ -108,7 +108,7 @@ jobs:
with:
fetch-depth: 0
- name: Setup Scala and Java
uses: olafurpg/setup-scala@v13
uses: coursier/setup-action@v1
- name: Cache scala dependencies
uses: coursier/cache-action@v6
- name: Release artifacts
Expand Down
24 changes: 14 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ inThisBuild(
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "2.0.21"
val awsVersion = "2.25.27"
val zioVersion = "2.0.21"
val awsVersion = "2.25.27"
val sttpVersion = "3.9.7"

lazy val root =
project.in(file(".")).settings(publish / skip := true).aggregate(`zio-s3`, docs)
Expand All @@ -36,14 +37,17 @@ lazy val `zio-s3` = project
.settings(dottySettings)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-nio" % "2.0.2",
"dev.zio" %% "zio-interop-reactivestreams" % "2.0.2",
"software.amazon.awssdk" % "s3" % awsVersion,
"software.amazon.awssdk" % "sts" % awsVersion,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-nio" % "2.0.2",
"dev.zio" %% "zio-interop-reactivestreams" % "2.0.2",
"software.amazon.awssdk" % "s3" % awsVersion,
"software.amazon.awssdk" % "sts" % awsVersion,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
"com.dimafeng" %% "testcontainers-scala-minio" % "0.41.4" % Test,
Copy link
Member

@regis-leray regis-leray Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove test container, we already have a minio in place with docker compose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Regis,

Actually we find that handy to not have a launch containers by hand before running tests.
Could you elaborate on why you don't like it, please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multiple reason: slow down the execution, more code to add in unit-test, more librairies dependencies etc...
Prefer to stick to docker-compose, since it works pretty well.
thank you

"com.softwaremill.sttp.client3" %% "core" % sttpVersion % Test,
"com.softwaremill.sttp.client3" %% "zio" % sttpVersion % Test
),
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
)
Expand Down
76 changes: 59 additions & 17 deletions zio-s3/src/main/scala/zio/s3/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher }
import software.amazon.awssdk.core.exception.SdkException
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder }
import software.amazon.awssdk.services.s3.presigner.S3Presigner
import software.amazon.awssdk.services.s3.presigner.model.{ GetObjectPresignRequest, PresignedGetObjectRequest }
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder, S3Configuration }
import zio._
import zio.interop.reactivestreams._
import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse }
Expand All @@ -39,7 +41,7 @@ import scala.jdk.CollectionConverters._
*
* @param unsafeClient: Amazon Async S3 Client
*/
final class Live(unsafeClient: S3AsyncClient) extends S3 {
final class Live(unsafeClient: S3AsyncClient, s3Presigner: S3Presigner) extends S3 {

override def createBucket(bucketName: String): IO[S3Exception, Unit] =
execute(_.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())).unit
Expand Down Expand Up @@ -149,6 +151,29 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
}
.unit

/**
* This method generates a `PresignedGetObjectRequest` containing an URL that allows the object to be downloaded
* without any credentials. Signature happens locally, it does not issue any network call. It's not a pure
* computation neither as it performs an IO by looking at the current time.
*
* See https://web.archive.org/web/20240621234636/https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html
* for further details regarding signature with S3.
*/
override def presignGetObject(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think this code should be in the client. since there is nothing related to this. you could move this into the extra utils function

 def presignGetObject[R](
    bucketName: String,
    key: String,
    signatureDuration: Duration
  ): ZIO[S3Presigner, S3Exception, Unit] =   ZIO.serviceWithZIO[S3Presigner]{ s3Presigner =>

   //should wrap this unasync call with ZIO
   s3Presigner.presignGetObject(
        GetObjectPresignRequest
          .builder()
          .signatureDuration(signatureDuration)
          .getObjectRequest(GetObjectRequest.builder().bucket(bucketName).key(key).build())
          .build()
}


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the reason why we chose to put it in the client is that the configuration required is very similiar to the S3 client one.
The other solution is to provide another service with it's own layer definition, but then we should probably provide a layer for the configuration to avoid asking the same configuration twice to the user.
What do you think?

bucketName: String,
key: String,
signatureDuration: Duration
): IO[S3Exception, PresignedGetObjectRequest] =
ZIO.succeed {
s3Presigner.presignGetObject(
GetObjectPresignRequest
.builder()
.signatureDuration(signatureDuration)
.getObjectRequest(GetObjectRequest.builder().bucket(bucketName).key(key).build())
.build()
)
}

def multipartUpload[R](
bucketName: String,
key: String,
Expand Down Expand Up @@ -227,6 +252,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
case s3: S3Exception => s3
case sdk: SdkException => SdkError(sdk)
}

}

object Live {
Expand All @@ -238,23 +264,39 @@ object Live {
forcePathStyle: Option[Boolean] = None
): ZIO[R with Scope, ConnectionError, S3] =
for {
credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause))
builder <- ZIO.succeed {
val builder = S3AsyncClient
.builder()
.credentialsProvider(credentials)
.region(region.region)
uriEndpoint.foreach(builder.endpointOverride)
forcePathStyle.foreach(builder.forcePathStyle(_))
builder
}
service <- connect(builder)
credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause))
builder <- ZIO.succeed {
val builder = S3AsyncClient
.builder()
.credentialsProvider(credentials)
.region(region.region)
uriEndpoint.foreach(builder.endpointOverride)
forcePathStyle.foreach(builder.forcePathStyle(_))
builder
}
s3PresignerBuilder <- ZIO.succeed {
val builder = S3Presigner
.builder()
.credentialsProvider(credentials)
.region(region.region)
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
uriEndpoint.foreach(builder.endpointOverride)
builder
}
service <- connect(builder, s3PresignerBuilder)
} yield service

def connect[R](builder: S3AsyncClientBuilder): ZIO[R with Scope, ConnectionError, S3] =
ZIO
.fromAutoCloseable(ZIO.attempt(builder.build()))
.mapBoth(e => ConnectionError(e.getMessage, e.getCause), new Live(_))
def connect[R](
builder: S3AsyncClientBuilder,
s3PresignerBuilder: S3Presigner.Builder
): ZIO[R with Scope, ConnectionError, S3] =
for {
s3Client <- ZIO
.fromAutoCloseable(ZIO.attempt(builder.build()))
.mapError(e => ConnectionError(e.getMessage, e.getCause))
s3Presigner <- ZIO.fromAutoCloseable(ZIO.succeed(s3PresignerBuilder.build()))

} yield new Live(s3Client, s3Presigner)

type StreamResponse = ZStream[Any, Throwable, Chunk[Byte]]

Expand Down
8 changes: 8 additions & 0 deletions zio-s3/src/main/scala/zio/s3/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package zio.s3

import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest
import zio.s3.S3Bucket.S3BucketListing
import zio.s3.errors.DecodingException
import zio.stream.{ Stream, ZPipeline, ZStream }
import zio.{ IO, ZIO }

import java.nio.charset.CharacterCodingException
import java.time.Duration
import java.util.concurrent.CompletableFuture

/**
Expand Down Expand Up @@ -177,6 +179,12 @@ trait S3 { self =>
case current => self.getNextObjects(current).map(next => current -> Some(next))
}

def presignGetObject(
bucketName: String,
key: String,
signatureDuration: Duration
): IO[S3Exception, PresignedGetObjectRequest]

/**
* *
* expose safely amazon s3 async client
Expand Down
4 changes: 4 additions & 0 deletions zio-s3/src/main/scala/zio/s3/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package zio.s3

import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest
import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils }
import zio._
import zio.nio.channels.AsynchronousFileChannel
Expand Down Expand Up @@ -240,6 +241,9 @@ object Test {
)
} yield ()
}

override def presignGetObject(bucketName: ContentType, key: ContentType, signatureDuration: Duration)
: IO[S3Exception, PresignedGetObjectRequest] = ZIO.fail(???)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions zio-s3/src/test/resources/console.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
this is a log file
on multi line
87 changes: 87 additions & 0 deletions zio-s3/src/test/scala/zio/s3/S3PresignedTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package zio.s3

import com.dimafeng.testcontainers.MinIOContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.S3Exception
import sttp.client3
import sttp.client3.httpclient.zio.HttpClientZioBackend
import sttp.client3.{ SttpBackend, basicRequest }
import sttp.model.Uri
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._
import zio.{ Chunk, Scope, Task, ZIO, ZLayer, durationInt, s3 }

import java.net.{ URI, URL }

object S3PresignedTest extends ZIOSpecDefault {

val bucketName = "initial-bucket"
val objectKey = "path/to/object"

val spec = (suite("S3Presigned")(
test("presignGetObject should return an url") {
for {
s3 <- ZIO.service[S3]
url <- s3.presignGetObject(bucketName, objectKey, 1.minute)
} yield assert(url.url.toExternalForm)(
containsString(bucketName) &&
containsString(objectKey) &&
containsString("X-Amz-Expires=60") &&
containsString("X-Amz-Signature")
)
},
test("presignGetObject url should be downloadable") {

for {
s3 <- ZIO.service[S3]
fileContent <- putObject(bucketName, "my-new-key")
url <- s3.presignGetObject(bucketName, "my-new-key", 1.minute)
actual <- getUrlContent(url.url)
expected <- fileContent.runCollect
} yield assert(Chunk.fromArray(actual))(equalTo(expected))
}
) @@ TestAspect.before(s3.createBucket(bucketName) *> putObject(bucketName, objectKey)))
.provideSome[Scope](minio, zioS3Layer, HttpClientZioBackend.layer())

def getUrlContent(url: URL): ZIO[SttpBackend[Task, Any], Throwable, Array[Byte]] =
for {
url <- ZIO.fromEither(Uri.parse(url.toExternalForm)).orDieWith(e => new Throwable(e))
request = basicRequest.response(client3.asByteArrayAlways).get(url)
result <- ZIO.serviceWithZIO[SttpBackend[Task, Any]](_.send(request)).map(_.body)
} yield result

def putObject(bucketName: String, key: String): ZIO[S3, Throwable, ZStream[Any, Throwable, Byte]] = {
val filePath = ClassLoader.getSystemResource("console.log").getFile
val fileContent = ZStream.fromFileName(filePath)
for {
fileLength <- fileContent.runCount
_ <- s3.putObject(bucketName = bucketName, key = key, contentLength = fileLength, content = fileContent)
} yield fileContent
}

def zioS3Layer: ZLayer[MinIOContainer, S3Exception, S3] =
ZLayer {
for {
minio <- ZIO.service[MinIOContainer]
} yield s3.live(
region = Region.US_WEST_1,
credentials = AwsBasicCredentials.create(minio.userName, minio.password),
uriEndpoint = Some(URI.create(minio.s3URL)),
forcePathStyle = Some(true)
)
}.flatten

def minio: ZLayer[Scope, Nothing, MinIOContainer] =
ZLayer {
ZIO
.attemptBlocking(
new MinIOContainer(dockerImageName = DockerImageName.parse("minio/minio:RELEASE.2024-06-22T05-26-45Z"))
)
.tap(minio => ZIO.attemptBlockingIO(minio.start()))
.withFinalizerAuto
}.orDie

}
Loading