Skip to content

Conversation

mwisnicki
Copy link
Contributor

@mwisnicki mwisnicki commented Jun 29, 2025

With this method in place I can fix all my datadog java instrumentation issues by simply activating span on Sync.suspend by providing custom instance like so:

import cats.data.Kleisli
import cats.effect.kernel.Async
import cats.syntax.all.*
import natchez.*
import scala.concurrent.duration.FiniteDuration

object Tracing {
  given asyncForKleisli[F[_]: Async]: Async[Kleisli[F, Span[F], *]] = TracingAsyncKleisli(Async.asyncForKleisli)

  class TracingAsyncKleisli[F[_]](underlying: Async[Kleisli[F, Span[F], *]]) extends Async[Kleisli[F, Span[F], *]] {
    export underlying.{suspend as _, *}

    override def suspend[A](hint: Sync.Type)(thunk: => A): Kleisli[F, Span[F], A] = {
      // `suspend` is a demarcation point for all FFI from cats-effect to Java/impure code.
      // By wrapping thunk with span activation we ensure correct span for instrumentation of Java libs.
      Kleisli { span =>
        underlying.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)).run(span)
      }
    }

    // the export on this one does not work because of overloading
    override def sleep(time: FiniteDuration): Kleisli[F, Span[F], Unit] = underlying.sleep(time)
  }
}

And then when lifting resources to Kleisli:

import Tracing.asyncForKleisli
given LoggerFactory[Kleisli[F, Span[F], *]] = Slf4jFactory.create // when using log4cats
entrypoint.liftR(routesF)

I suppose I could add this to natchez but supporting scala 2 would require a lot of boilerplate code in natchez.

Ideally paired with #1185.


For ioTrace something like this should work (with tagless final code) although I haven't tested it:

implicit object TracingAsyncIO extends Async[IO] {
    private val underlying = IO.asyncForIO
    export underlying.{suspend as _, *}

    private val currentSpan = IOLocal(Span.noop[IO])

    def withSpan[A](span: Span[IO])(io: IO[A]): IO[A] = Resource
      .make(currentSpan.flatMap(_.getAndSet(span)))(previousSpan => currentSpan.flatMap(_.set(previousSpan)))
      .use(_ => io)

    override def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
      currentSpan.flatMap(_.get).flatMap(span => IO.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)))

    override def sleep(time: FiniteDuration): IO[Unit] = underlying.sleep(time)
  }

@mwisnicki mwisnicki force-pushed the unsafeRunWithActivatedSpan-pr branch from 943e660 to 6afd135 Compare June 29, 2025 06:59
*
* This should always be wrapped in `Sync.suspend` or equivalent.
*/
def unsafeRunWithActivatedSpan[T](run: => T): T = run
Copy link
Member

@bpholt bpholt Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a surprising API, in my opinion. I think it's very likely to be a footgun for inexperienced developers, despite the note about it needing to be wrapped in Sync.suspend. Is there a way we can do more to enforce that requirement? Why not accept and return an F[T]?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bpholt there's no way to take an F[T] and ensure that the span is activated every time it runs compute tasks (which is a prerequisite to having the desired semantics). You can't even force it to run on one thread with evalOn because it might contain blocking inside.

I have dealt with this exact problem before, and had a custom implementation that made it work... but the only good thing we can do here is return F[T], imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't call it from inside suspend if I take or return F[T]. This is meant to be used entirely on impure side hence unsafe in the name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coming back to this, I think it might be possible to take an F. With caveats:

  • if your F contains async boundaries, auto-cedes, etc. you'd be calling that unsafeRunWithActivatedSpan every time you go back to "compute" tasks.
  • if there are any evalOn/blockings inside the effect, they wouldn't have the span activated.

Something like this:

def runWithActivatedSpan[T](ft: F[T]): F[T] = {
  val ec = new ExecutionContext {
    def execute(r: Runnable): Unit = unsafeRunWithActivatedSpan(r.run())
    def reportFailure(e: Throwable): Unit = throw e
  }
  ft.evalOn(ec)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your use case? The libraries that require such integration do not use cats (or scala).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usecase is when you already have an effect that wraps Java code, and you want to give it the scope. This might happen if you're using a library that wraps Java (so you can't modify it to use the underlying unsafe function) or you want to centralize that kind of context handling in a middleware of sorts

Copy link
Contributor Author

@mwisnicki mwisnicki Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An effect wrapping Java code would have to wrap it in blocking (pretty much any code using tracing is blocking) and you just said this code does not work with blocking inside effect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol fair enough, I take that back then 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though not all Java code is blocking... still I don't think it's very useful without support for blocking blocks

@kubukoz
Copy link
Member

kubukoz commented Jun 30, 2025

for the record, I don't like the idea of a custom Async instance, but the low-level unsafe API addition sounds good to me.

@bpholt
Copy link
Member

bpholt commented Jun 30, 2025

I'm not sure what you mean by a custom Async instance. Could something like this work?

def runWithActivatedSpan[T](run: F[T])(implicit F: Sync[F]): F[T]

(edit: maybe you weren't replying to my message, in which case, please disregard 🙂)

@kubukoz
Copy link
Member

kubukoz commented Jun 30, 2025

@bpholt yeah I was referring to the first snippet in the PR's description :)

@mwisnicki
Copy link
Contributor Author

for the record, I don't like the idea of a custom Async instance, but the low-level unsafe API addition sounds good to me.

That's why I didn't add it in this PR but I couldn't find a better way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants