Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limitations: stream upload (for small files) #1137

Open
daddykotex opened this issue Dec 19, 2023 · 0 comments
Open

limitations: stream upload (for small files) #1137

daddykotex opened this issue Dec 19, 2023 · 0 comments

Comments

@daddykotex
Copy link
Contributor

daddykotex commented Dec 19, 2023

Hey, thanks for the library it's great.

As we've discussed earlier, I decided to build my own S3 integration rather than using your library. This issue explains the problem I have and why I chose not to use your library.

We've got a service that handles file uploads from a client. It gets the stream of data from the client and send it directly to an object in S3. We've recently introduced parallel uploads in the client and saw some reliability issues in our service. Instead of increasing the size of the machine, I decided to look if we could stream the bytes from the client throught the service all the way into the S3 bucket. We did it but it was not w/o issues.

The original implementation did something along the lines of your implementation: load the bytes into memory and then send that Array[Byte] to S3.

There are two downside to streaming directly in S3:

  1. you have to know the size ahead of time - it does not work for unbounded streams of bytes
  2. the implementation we came up with can't be automatically retried (as opposed to the original one (and yours))

For 1, we have files written to disk on the client size, so we know the size.

For 2, we decided 503 from the service if the upload to S3 fails so the client can retry. This is heavier than retrying locally (in the service), but it happens very occasionally, so we're okay with that.

Here is a full implementation:

//> using lib "software.amazon.awssdk:s3:2.22.1"
//> using lib "org.typelevel::toolkit:0.1.20"

import cats.effect.IO
import cats.effect.IOApp
import cats.effect.kernel.Resource
import fs2.io.file.Files
import fs2.io.file.Path
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.core.exception.NonRetryableException
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.PutObjectRequest

import java.io.OutputStream

final case class ToUpload(
    bucket: String,
    nameInBucket: String,
    size: Long,
    data: fs2.Stream[IO, Byte]
)

/** Downsides:
  *   - you have to know the file size in advance
  *   - using `forBlockingOutputStream` implies you can't rely on automatic S3
  *     client retries
  *   - suboptimal for large files as a failure means you need to re-upload
  *     everything
  */
object Main extends IOApp.Simple {
  val path = Path("./s3-upload.scala")
  val run = upload(path, "bucket", "pathInBucket")

  def upload(p: Path, bucket: String, nameInBucket: String): IO[Unit] = prepareS3().use { s3 =>
    for {
      toUpload <- prepare(p, bucket, nameInBucket)
      _ <- streamToS3(s3, toUpload)
    } yield ()
  }

  /** This will throw and @NonRetryableException if upload to S3 fails.
    */
  def streamToS3(s3: S3AsyncClient, toUpload: ToUpload): IO[Unit] = {
    val putOb = PutObjectRequest
      .builder()
      .bucket(toUpload.bucket)
      .key(toUpload.nameInBucket)
      .build()

    IO.delay(AsyncRequestBody.forBlockingOutputStream(toUpload.size)).flatMap {
      arb =>
        val out =
          IO.blocking(arb.outputStream()).map(x => (x: OutputStream))
        val pipe = fs2.io.writeOutputStream(out, closeAfterUse = true)
        val writeRes = toUpload.data.through(pipe).compile.drain.background

        writeRes.use { outcome =>
          val sendReq = IO
            .fromCompletableFuture(IO.delay(s3.putObject(putOb, arb)))
            .void
          sendReq *> outcome.flatMap(_.embedError)
        }
    }

  }

  def prepareS3(): Resource[IO, S3AsyncClient] = {
    Resource.fromAutoCloseable(
      IO.blocking(
        S3AsyncClient.builder().region(Region.US_EAST_1).build()
      )
    )
  }

  def prepare(p: Path, bucket: String, nameInBucket: String): IO[ToUpload] = {
    Files[IO].size(p).map { size =>
      ToUpload(bucket, nameInBucket, size, Files[IO].readAll(p))
    }
  }
}

I've created this so that you can play with it yourself and see if this is something you'd like to see in your library

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant