Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize ResourceLogOps.log to support arbitrary callbacks #237

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import cats.implicits.*
import cats.{Applicative, ApplicativeError, MonadError}

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.{Either, Try}

Expand Down Expand Up @@ -146,21 +147,34 @@ object CatsHelper {
}
}

implicit final class ResourceLogOps[F[_], A](val self: Resource[F, A]) extends AnyVal {
def log(name: String)(implicit F: Sync[F], log: Log[F], md: MeasureDuration[F]): Resource[F, A] = Resource {
implicit final class ResourceObservabilityOps[F[_], A](val self: Resource[F, A]) extends AnyVal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not backward compatible change :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just how diff is displayed. I didn't rename ResourceLogOps, ResourceLogOps is still there.


/**
* Add callbacks to a resource. Use for things like metrics and logs.
* It's very important that these callbacks shouldn't fail,
* otherwise there can be a resource leak.
*/
def observe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure that this is useful? :)

could you please describe your use case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to add metrics. Adding metrics to a resource is very similar to adding logs.

see consumerWithMetrics in another PR:
https://github.com/evolution-gaming/kafka-journal/pull/500/files#diff-fdca83b54e7278ff458b068b2b38db53c143ce7138c3074defce921a321ce425R40

consumerWithMetrics looks pretty ugly, I want to avoid that

onAcquireStart: Option[F[Unit]] = None,
onAcquireSuccess: Option[(A, FiniteDuration) => F[Unit]] = None,
onAcquireFail: Option[(Throwable, FiniteDuration) => F[Unit]] = None,
onReleaseStart: Option[A => F[Unit]] = None,
onReleaseSuccess: Option[FiniteDuration => F[Unit]] = None,
onReleaseFail: Option[(Throwable, FiniteDuration) => F[Unit]] = None,
)(implicit F: Sync[F], md: MeasureDuration[F]): Resource[F, A] = Resource {
for {
timedAcquireAndRelease <- for {
getMeasurement <- MeasureDuration[F].start
_ <- Log[F].info(s"$name acquiring")
_ <- onAcquireStart.sequence_
a <- self.allocated.attemptTap {
case Left(err) => for {
measureResult <- getMeasurement
_ <- Log[F].error(s"$name acquisition failed in ${measureResult.toMillis}ms with $err", err)
_ <- onAcquireFail.traverse_(_(err, measureResult))
} yield ()

case Right(_) => for {
case Right((obj, _)) => for {
measureResult <- getMeasurement
_ <- Log[F].info(s"$name acquired in ${measureResult.toMillis}ms")
_ <- onAcquireSuccess.traverse_(_(obj, measureResult))
} yield ()
}
} yield a
Expand All @@ -169,21 +183,47 @@ object CatsHelper {

timedRelease = for {
getMeasurement <- MeasureDuration[F].start
_ <- Log[F].info(s"$name releasing")
_ <- onReleaseStart.traverse_(_(timedAcquire))
_ <- release.attemptTap {
case Left(err) => for {
measureResult <- getMeasurement
_ <- Log[F].error(s"$name release failed in ${measureResult.toMillis}ms with $err", err)
_ <- onReleaseFail.traverse_(_(err, measureResult))
} yield ()

case Right(a) => for {
measureResult <- getMeasurement
_ <- Log[F].info(s"$name released in ${measureResult.toMillis}ms")
_ <- onReleaseSuccess.traverse_(_(measureResult))
} yield a
}
} yield ()

} yield (timedAcquire, timedRelease)
}
}

implicit final class ResourceLogOps[F[_], A](val self: Resource[F, A]) extends AnyVal {

def log(name: String)(implicit F: Sync[F], log: Log[F], md: MeasureDuration[F]): Resource[F, A] = {
self.observe(
onAcquireStart = Some(
Log[F].info(s"$name acquiring")
),
onAcquireSuccess = Some((_, duration) =>
Log[F].info(s"$name acquired in ${duration.toMillis}ms")
),
onReleaseStart = Some(_ =>
Log[F].info(s"$name releasing")
),
onReleaseSuccess = Some(duration =>
Log[F].info(s"$name released in ${duration.toMillis}ms")
),
onAcquireFail = Some((err, duration) =>
Log[F].error(s"$name acquisition failed in ${duration.toMillis}ms with $err", err)
),
onReleaseFail = Some((err, duration) =>
Log[F].error(s"$name release failed in ${duration.toMillis}ms with $err", err)
),
)
}
}
}