Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add FutureOps which with an await style. #1666

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Jan 2, 2025

Motivation:
I found the Await.result is everywhere.

Modification:
Add an await operator to the future.

Result:
Fluent code.

[info] Benchmark                              (pool)  (recursion)  (threads)   Mode  Cnt        Score       Error   Units
[info] CompleteFutureBenchmark.success           fix         1024          1  thrpt    5  1069166.183 ± 46755.228  ops/ms
[info] CompleteFutureBenchmark.successOption     fix         1024          1  thrpt    5  1383230.259 ± 15758.252  ops/ms
[info] CompleteFutureBenchmark.successUnwrap     fix         1024          1  thrpt    5  1377034.015 ± 46460.695  ops/ms

I did some changes in the scala/scala

@He-Pin He-Pin added the t:stream Pekko Streams label Jan 2, 2025
@He-Pin He-Pin added this to the 1.2.0 milestone Jan 2, 2025
@He-Pin He-Pin force-pushed the awaitable branch 2 times, most recently from ef0b126 to ba09de8 Compare January 2, 2025 17:12
@pjfanning
Copy link
Contributor

This code is internal. Does it matter if it is fluent?

@He-Pin
Copy link
Member Author

He-Pin commented Jan 2, 2025

So does performance, I think which will avoid the waiting if the future is already done.

I was want to make this public, but it seems safer for me to taste it first.

@pjfanning
Copy link
Contributor

If there is a perf issue in Await.result, I would prefer it was fixed in Scala runtime.

The Scala 2 impl seems to try to optimise for the completed case.

    final def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case f: Future[T] if f.isCompleted => f.result(atMost)(AwaitPermission)
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

@He-Pin
Copy link
Member Author

He-Pin commented Jan 2, 2025

The code seems need more adjustment, as we don't need to test if it's a future anymore

@He-Pin
Copy link
Member Author

He-Pin commented Jan 2, 2025

??? Why its closed?

@He-Pin He-Pin reopened this Jan 2, 2025
*/
def await(atMost: Duration = Duration.Inf): T = future.value match {
case Some(value) => value.get
case None => blocking(future.result(atMost)(null))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid the double checking now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @throws(classOf[TimeoutException])
    @throws(classOf[InterruptedException])
    final def resultOption[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case CompletedFuture(v)  => v.get
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

    private final object CompletedFuture {
      def unapply[T](f: Future[T]): Option[Try[T]] = f.value
    }

@He-Pin
Copy link
Member Author

He-Pin commented Jan 3, 2025

@pjfanning I have attached the result

@pjfanning
Copy link
Contributor

@pjfanning I have attached the result

I would still like this discussed with the Scala team to understand if the Await.result can be improved. Everyone benefits if you have discovered a perf issue there and that it then gets fixed.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 3, 2025

@pjfanning scala/scala#10972 , submitted to scala

@raboof
Copy link
Member

raboof commented Jan 3, 2025

scala/scala#10972 , submitted to scala

Awesome that you have proposed this improvement upstream, thanks!

I see two motivations for this change: the performance improvement, and the fact that the code 'looks nicer'. I agree that if this performance improvement could make it upstream that would be much better.

For the idea that this change makes the code looks nicer, I'm not sure I agree: it replaces well-known Scala idiom with a custom helper. While the custom helper is perhaps nicer, it's also 'one more thing to learn'. Also, because asynchronicity is at the heart of Pekko, we should use Await sparingly and with caution. For this reason, having to use the arguably-'clumsy' Scala idiom might not be so bad, as it adds some friction to consider whether Await is really needed.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 3, 2025

  1. this is just following the style in gears.
  extension [T](src: Source[scala.util.Try[T]])
    /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
      * @see
      *   [[Source!.awaitResult awaitResult]] for non-unwrapping await.
      */
    inline def await(using Async) = src.awaitResult.get

and then with usage:

  test("Constant returns") {
    Async.blocking:
      for (i <- -5 to 5)
        val f1 = Future { i }
        val f2 = Future.now(Success(i))
        assertEquals(f1.await, i)
        assertEquals(f1.await, f2.await)
  }
  1. The current implementation by @viktorklang will double-getting on java.util.concurrent.atomic.AtomicReference#get, have no idea, why would we do that.
  2. This optimization fix point 2.
  3. I try to avoid the lifting to Option with
  def unwrap: Try[T]

in Future.scala, and then

    @throws(classOf[TimeoutException])
    @throws(classOf[InterruptedException])
    final def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case f: Future[T] @unchecked =>
        val r = f.unwrap
        if (r ne null) r.get
        else blocking(awaitable.result(atMost)(AwaitPermission))
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

But the result doesn't have much difference in gc pressure.

@He-Pin He-Pin marked this pull request as draft January 3, 2025 16:54
@He-Pin
Copy link
Member Author

He-Pin commented Jan 3, 2025

One advantage of having this inside is it works on all scala versions.

@He-Pin He-Pin marked this pull request as ready for review January 3, 2025 16:56
@pjfanning
Copy link
Contributor

@He-Pin could you add the CompleteFutureBenchmark to this PR? I think we have some benchmarks in this repo where this could be added.

// jmh:run -i 11 -wi 11 -f1 -t1 org.apache.pekko.util.FutureOpsBenchmark
// [info] Benchmark Mode Cnt Score Error Units
// [info] FutureOpsBenchmark.awaitWithAwaitable thrpt 11 706198.499 ± 8185.983 ops/ms
// [info] FutureOpsBenchmark.awaitWithFutureOps thrpt 11 766901.781 ± 9741.792 ops/ms
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning Attached.

@pjfanning
Copy link
Contributor

I would like us to take a little bit of time over this just to see what the Scala team and other users have to say.
I am not against merging a change to Pekko but I think that change must not negatively impact the awaits for incomplete futures. The existing benchmark only checks complete futures. I know it is harder to measure the impact of a change on incomplete futures because benchmarks are mainly measuring the wait and not the code around getting the result from the awaitable after it completes.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 4, 2025

@pjfanning I think that will not make much difference, the old implementation will always do the AtomicReference.get so does this, one. and when the future is not completed, this will return a None (no allocation), which was returning a boolean.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 4, 2025

And I think there is a limitation of the current Scala implementation, were we can't save the value into a slot to avoid double get, the value is on the stack, which is very cheap,but we can't do it with current scala.

https://contributors.scala-lang.org/t/inline-convertion-in-pattern-matchings-if-guard/6962/12

@He-Pin He-Pin added the performance Related to performance label Jan 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Related to performance t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants