Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupWithin prematurely closes resources #3477

Open
Jasper-M opened this issue Sep 20, 2024 · 5 comments
Open

groupWithin prematurely closes resources #3477

Jasper-M opened this issue Sep 20, 2024 · 5 comments
Labels

Comments

@Jasper-M
Copy link
Contributor

Jasper-M commented Sep 20, 2024

Code:

import scala.concurrent.duration._
import cats.effect.{IO, Resource}
import fs2.Stream
import cats.effect.unsafe.implicits.global

Stream.resource(
  Resource.make(IO.println("start"))(_ => IO.println("stop"))
)
.flatMap(_ => Stream(1,2,3,4).covary[IO].metered(100.millis))
.take(2)
.evalTap(IO.println)
.groupWithin(2, 1.second)
.evalTap(IO.println)
.compile
.drain
.unsafeRunSync()

Result:

start
1
2
stop
Chunk(1, 2)

The resource is already closed before the stage after groupWithin gets to process the chunk. So this can leak a closed resource.

@Jasper-M Jasper-M added the bug label Sep 20, 2024
@ValdemarGr
Copy link
Contributor

ValdemarGr commented Sep 23, 2024

Some of the fs2 combinators don't respect resources. The cause is that the foreground stream is compiled separately here.

If possible, can you structure your stream like this:

Stream
  .resource(
    Resource.make(IO.println("start"))(_ => IO.println("stop"))
  )
  .flatMap { _ =>
    Stream(1, 2, 3, 4)
      .covary[IO]
      .metered(100.millis)
      .take(2)
      .evalTap(IO.println)
      .groupWithin(2, 1.second)
      .evalTap(IO.println)
  }
  .compile
  .drain

I think a resource safe version of most combinators can be implemented via a more expressive Pull api, something like the following.

object Pull2 {
  // Option = Is there anything to lease now
  // Resource = Try to take a lease
  // Boolean = Is the resource still available; Was the Scope's resource closed between the Pull and when the resource was opened?
  def lease[F[_]]: Pull[F, Nothing, Option[Resource[F, Boolean]]] = ???
}

This would allow transferal of resource ownership across separately compiled streams.

// very contrived example which shows that ownership can cross through a channel
def leaseChunks[F[_], A](implicit
  F: Async[F]
): fs2.Pipe[F, A, (Chunk[A], F[Unit])] = stream =>
  Stream.resource(Supervisor[F]).flatMap { sup =>
    Stream
      .eval {
        concurrent.Channel.bounded[F, (Chunk[A], F[Unit])](1)
      }
      .flatMap { chan =>
        def send(c: Chunk[A]): Pull[F, Nothing, Unit] = for {
          r <- Pull2.lease[F]
          _ <- Pull.eval {
            r match {
              case None => F.unit
              case Some(r) =>
                F.uncancelable { poll =>
                  poll(r.allocated).flatMap {
                    case (false, _) => F.unit
                    case (true, release) =>
                      sup.supervise(Async[F].never[Unit].onCancel(release)).flatMap { fib =>
                        poll(chan.send((c, fib.cancel))).void
                      }
                  }
                }
            }
          }
        } yield ()

        val bg = stream.chunks.flatMap(c => send(c).stream).onFinalize(chan.close.void)

        chan.stream.concurrently(bg)
      }
  }

I have an issue open here regarding this.

@Jasper-M
Copy link
Contributor Author

In this simple example that I adapted a little bit, using Pull.extendScopeTo actually seems to make a difference to keep the resource open until the result stream stops. But when I try to apply that transformation to a less trivial piece of code like groupWithin I always get the Scope lookup failure error, no matter if use F.start or stream.concurrently or where I compile which stream.

@Jasper-M
Copy link
Contributor Author

Actually no, it does seem to work on groupWithin as well! It's just that if your input stream uses metered (or probably any other thing that uses a zip-like operation) you run into the scope lookup errors.

See, I only added metered here to my previous example, and then it blows up.

I've actually run into this issue before: #3081 (comment)

@ValdemarGr
Copy link
Contributor

Resources and interruption is implemented via Scope which acts as a tree of resources.

concurrently compiles the supplied stream of your program:

val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase {

which introduces a new Scope tree:
Resource
.makeCase(Scope.newRoot[F](this))((scope, ec) => scope.close(ec).rethrow)
.use(scope => Pull.compile[F, O, Out](p, scope, false, init)(foldChunk))

When the lookup function is evaluated, which traverses the scope tree, the link between the sub-stream's scopes and the previous parent scopes is broken.

def shiftScope(scopeId: Unique.Token, context: => String): F[Scope[F]] =

There might be an implementation of concurrently that shares the root scope, however I am unsure of the implications.

All in all, the rule is that if a stream origins as substream, it might have weak references to state in its origin stream though scopeIds, thus you may not compile that stream separately.

@Jasper-M
Copy link
Contributor Author

Well, a dummy-proof and easy API for transferring resource ownership between streams would be great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants