Skip to content

Commit

Permalink
chore: Add FutureOps which with an await style.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 4, 2025
1 parent ec5e33f commit 83ffb6b
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import scala.annotation.{ nowarn, tailrec }
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -178,7 +178,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
}
}

Await.result(stop(), getShutdownTimeout).foreach {
import pekko.util.Helpers._
stop().await(getShutdownTimeout).foreach {
case task: Scheduler.TaskRunOnClose =>
runTask(task)
case holder: TaskHolder => // don't run
Expand Down
18 changes: 10 additions & 8 deletions actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.BiFunction
import java.util.function.Consumer
import scala.annotation.nowarn
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -446,13 +446,15 @@ class CircuitBreaker(
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T =
Await.result(
withCircuitBreaker(
try Future.successful(body)
catch { case NonFatal(t) => Future.failed(t) },
defineFailureFn),
callTimeout)
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = {
import pekko.util.Helpers._
withCircuitBreaker(
try Future.successful(body)
catch {
case NonFatal(t) => Future.failed(t)
},
defineFailureFn).await(callTimeout)
}

/**
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ package org.apache.pekko.serialization

import java.util.concurrent.CompletionStage

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.Future

import org.apache.pekko
import pekko.actor.ExtendedActorSystem
Expand Down Expand Up @@ -59,14 +58,16 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem)
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for pekko persistence plugins that support them. Class: {}",
o.getClass)
Await.result(toBinaryAsync(o), Duration.Inf)
import pekko.util.Helpers._
toBinaryAsync(o).await()
}

final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for Pekko persistence plugins that support them. Manifest: [{}]",
manifest)
Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf)
import pekko.util.Helpers._
fromBinaryAsync(bytes, manifest).await()
}
}

Expand Down
19 changes: 19 additions & 0 deletions actor/src/main/scala/org/apache/pekko/util/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ import java.time.format.DateTimeFormatter
import java.util.{ Comparator, Locale }
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }

import com.typesafe.config.{ Config, ConfigRenderOptions }

import scala.concurrent.{ blocking, Future }

object Helpers {

def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -195,4 +199,19 @@ object Helpers {
Duration(config.getDuration(path, unit), unit)
}

/**
* INTERNAL API
*/
private[pekko] final implicit class FutureOps[T](val future: Future[T]) extends AnyVal {

/**
* Wait for the future to complete and return the result, or throw an exception if the future failed.
* Optimize for the case when the future is already completed.
* @since 1.2.0
*/
def await(atMost: Duration = Duration.Inf): T = future.value match {
case Some(value) => value.get
case None => blocking(future.result(atMost)(null))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.pekko.util

import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Measurement, Mode, OutputTimeUnit, Scope, State, Warmup }

import java.util.concurrent.TimeUnit
import scala.concurrent.Future

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 1000)
@Measurement(iterations = 10000)
class FutureOpsBenchmark {
private val completedFuture: Future[Int] = Future.successful(1)
// jmh:run -i 11 -wi 11 -f1 -t1 org.apache.pekko.util.FutureOpsBenchmark
// [info] Benchmark Mode Cnt Score Error Units
// [info] FutureOpsBenchmark.awaitWithAwaitable thrpt 11 706198.499 ± 8185.983 ops/ms
// [info] FutureOpsBenchmark.awaitWithFutureOps thrpt 11 766901.781 ± 9741.792 ops/ms

@Benchmark
def awaitWithFutureOps(): Unit = {
import scala.concurrent.duration._
import org.apache.pekko.util.Helpers._
completedFuture.await(Duration.Inf)
}

@Benchmark
def awaitWithAwaitable(): Unit = {
import scala.concurrent.duration._
import scala.concurrent.Await
Await.result(completedFuture, Duration.Inf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.pekko.stream

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise

import org.apache.pekko
Expand Down Expand Up @@ -77,7 +76,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
private[pekko] def createAdditionalSystemMaterializer(): Materializer = {
val started =
(materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

/**
Expand All @@ -91,12 +91,14 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
val started =
(materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings))
.mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

val materializer: Materializer = {
// block on async creation to make it effectively final
Await.result(systemMaterializerPromise.future, materializerTimeout.duration)
import pekko.util.Helpers._
systemMaterializerPromise.future.await(materializerTimeout.duration)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.stream.impl.io
import java.io.{ IOException, OutputStream }
import java.util.concurrent.{ Semaphore, TimeUnit }

import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

Expand Down Expand Up @@ -87,7 +86,8 @@ private[pekko] class OutputStreamAdapter(
}

try {
Await.result(sendToStage.invokeWithFeedback(Send(data)), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Send(data)).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down Expand Up @@ -115,7 +115,8 @@ private[pekko] class OutputStreamAdapter(
@scala.throws(classOf[IOException])
override def close(): Unit = {
try {
Await.result(sendToStage.invokeWithFeedback(Close), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Close).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import java.io.{ InputStream, OutputStream }
import java.util.Spliterators
import java.util.stream.{ Collector, StreamSupport }

import scala.concurrent.{ Await, Future }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.duration.Duration._

import org.apache.pekko
import pekko.NotUsed
Expand Down Expand Up @@ -197,7 +196,8 @@ object StreamConverters {
var nextElement: Option[T] = _

override def hasNext: Boolean = {
nextElement = Await.result(nextElementFuture, Inf)
import pekko.util.Helpers._
nextElement = nextElementFuture.await()
nextElement.isDefined
}

Expand Down

0 comments on commit 83ffb6b

Please sign in to comment.