From ba09de8f5e63eed6679ba46701ed46c712fb93a2 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Fri, 3 Jan 2025 00:43:42 +0800 Subject: [PATCH] chore: Add FutureOps which with an await style. --- .../actor/LightArrayRevolverScheduler.scala | 5 +++-- .../apache/pekko/pattern/CircuitBreaker.scala | 18 ++++++++++-------- .../pekko/serialization/AsyncSerializer.scala | 9 +++++---- .../scala/org/apache/pekko/util/Helpers.scala | 17 +++++++++++++++++ .../pekko/stream/SystemMaterializer.scala | 10 ++++++---- .../impl/io/OutputStreamSourceStage.scala | 7 ++++--- .../stream/scaladsl/StreamConverters.scala | 6 +++--- 7 files changed, 48 insertions(+), 24 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala index b012cf979d2..0162b7c055a 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import scala.annotation.{ nowarn, tailrec } import scala.collection.immutable -import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -178,7 +178,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac } } - Await.result(stop(), getShutdownTimeout).foreach { + import pekko.util.Helpers._ + stop().await(getShutdownTimeout).foreach { case task: Scheduler.TaskRunOnClose => runTask(task) case holder: TaskHolder => // don't run diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index 38bf58f2739..f6c8dc25ecb 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.BiFunction import java.util.function.Consumer import scala.annotation.nowarn -import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } @@ -446,13 +446,15 @@ class CircuitBreaker( * @param defineFailureFn function that define what should be consider failure and thus increase failure count * @return The result of the call */ - def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = - Await.result( - withCircuitBreaker( - try Future.successful(body) - catch { case NonFatal(t) => Future.failed(t) }, - defineFailureFn), - callTimeout) + def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = { + import pekko.util.Helpers._ + withCircuitBreaker( + try Future.successful(body) + catch { + case NonFatal(t) => Future.failed(t) + }, + defineFailureFn).await(callTimeout) + } /** * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. diff --git a/actor/src/main/scala/org/apache/pekko/serialization/AsyncSerializer.scala b/actor/src/main/scala/org/apache/pekko/serialization/AsyncSerializer.scala index d0df72cca77..5d010ab2b85 100644 --- a/actor/src/main/scala/org/apache/pekko/serialization/AsyncSerializer.scala +++ b/actor/src/main/scala/org/apache/pekko/serialization/AsyncSerializer.scala @@ -15,8 +15,7 @@ package org.apache.pekko.serialization import java.util.concurrent.CompletionStage -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration.Duration +import scala.concurrent.Future import org.apache.pekko import pekko.actor.ExtendedActorSystem @@ -59,14 +58,16 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) log.warning( "Async serializer called synchronously. This will block. Async serializers should only be used for pekko persistence plugins that support them. Class: {}", o.getClass) - Await.result(toBinaryAsync(o), Duration.Inf) + import pekko.util.Helpers._ + toBinaryAsync(o).await() } final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { log.warning( "Async serializer called synchronously. This will block. Async serializers should only be used for Pekko persistence plugins that support them. Manifest: [{}]", manifest) - Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf) + import pekko.util.Helpers._ + fromBinaryAsync(bytes, manifest).await() } } diff --git a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala index 8b93060302d..1ff701a62c9 100644 --- a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala +++ b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala @@ -35,6 +35,8 @@ import scala.annotation.tailrec import scala.concurrent.duration.{ Duration, FiniteDuration } import com.typesafe.config.{ Config, ConfigRenderOptions } +import scala.concurrent.{ Await, Future } + object Helpers { def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT) @@ -195,4 +197,19 @@ object Helpers { Duration(config.getDuration(path, unit), unit) } + /** + * INTERNAL API + */ + private[pekko] final implicit class FutureOps[T](val future: Future[T]) extends AnyVal { + + /** + * Wait for the future to complete and return the result, or throw an exception if the future failed. + * Optimize for the case when the future is already completed. + * @since 1.2.0 + */ + def await(atMost: Duration = Duration.Inf): T = future.value match { + case Some(value) => value.get + case None => Await.result(future, atMost) + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/SystemMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/SystemMaterializer.scala index efd829d47d8..5868cf98ef1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/SystemMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/SystemMaterializer.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream import scala.annotation.nowarn -import scala.concurrent.Await import scala.concurrent.Promise import org.apache.pekko @@ -77,7 +76,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension { private[pekko] def createAdditionalSystemMaterializer(): Materializer = { val started = (materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted] - Await.result(started, materializerTimeout.duration).materializer + import pekko.util.Helpers._ + started.await(materializerTimeout.duration).materializer } /** @@ -91,12 +91,14 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension { val started = (materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings)) .mapTo[MaterializerGuardian.MaterializerStarted] - Await.result(started, materializerTimeout.duration).materializer + import pekko.util.Helpers._ + started.await(materializerTimeout.duration).materializer } val materializer: Materializer = { // block on async creation to make it effectively final - Await.result(systemMaterializerPromise.future, materializerTimeout.duration) + import pekko.util.Helpers._ + systemMaterializerPromise.future.await(materializerTimeout.duration) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/OutputStreamSourceStage.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/OutputStreamSourceStage.scala index d0dbb81c27e..29be6d2eae6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/OutputStreamSourceStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/OutputStreamSourceStage.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream.impl.io import java.io.{ IOException, OutputStream } import java.util.concurrent.{ Semaphore, TimeUnit } -import scala.concurrent.Await import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal @@ -87,7 +86,8 @@ private[pekko] class OutputStreamAdapter( } try { - Await.result(sendToStage.invokeWithFeedback(Send(data)), writeTimeout) + import pekko.util.Helpers._ + sendToStage.invokeWithFeedback(Send(data)).await(writeTimeout) } catch { case NonFatal(e) => throw new IOException(e) } @@ -115,7 +115,8 @@ private[pekko] class OutputStreamAdapter( @scala.throws(classOf[IOException]) override def close(): Unit = { try { - Await.result(sendToStage.invokeWithFeedback(Close), writeTimeout) + import pekko.util.Helpers._ + sendToStage.invokeWithFeedback(Close).await(writeTimeout) } catch { case NonFatal(e) => throw new IOException(e) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala index 7b1a743889a..32b1eabe2f9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala @@ -17,9 +17,8 @@ import java.io.{ InputStream, OutputStream } import java.util.Spliterators import java.util.stream.{ Collector, StreamSupport } -import scala.concurrent.{ Await, Future } +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.concurrent.duration.Duration._ import org.apache.pekko import pekko.NotUsed @@ -197,7 +196,8 @@ object StreamConverters { var nextElement: Option[T] = _ override def hasNext: Boolean = { - nextElement = Await.result(nextElementFuture, Inf) + import pekko.util.Helpers._ + nextElement = nextElementFuture.await() nextElement.isDefined }