Skip to content

Commit

Permalink
chore: Add FutureOps which with an await style.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 2, 2025
1 parent 2b6d2dc commit ef0b126
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import scala.annotation.{ nowarn, tailrec }
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -178,7 +178,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
}
}

Await.result(stop(), getShutdownTimeout).foreach {
import pekko.util.Helpers._
stop().await(getShutdownTimeout).foreach {
case task: Scheduler.TaskRunOnClose =>
runTask(task)
case holder: TaskHolder => // don't run
Expand Down
18 changes: 10 additions & 8 deletions actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.BiFunction
import java.util.function.Consumer
import scala.annotation.nowarn
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -446,13 +446,15 @@ class CircuitBreaker(
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T =
Await.result(
withCircuitBreaker(
try Future.successful(body)
catch { case NonFatal(t) => Future.failed(t) },
defineFailureFn),
callTimeout)
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = {
import pekko.util.Helpers._
withCircuitBreaker(
try Future.successful(body)
catch {
case NonFatal(t) => Future.failed(t)
},
defineFailureFn).await(callTimeout)
}

/**
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ package org.apache.pekko.serialization

import java.util.concurrent.CompletionStage

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.Future

import org.apache.pekko
import pekko.actor.ExtendedActorSystem
Expand Down Expand Up @@ -59,14 +58,16 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem)
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for pekko persistence plugins that support them. Class: {}",
o.getClass)
Await.result(toBinaryAsync(o), Duration.Inf)
import pekko.util.Helpers._
toBinaryAsync(o).await()
}

final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for Pekko persistence plugins that support them. Manifest: [{}]",
manifest)
Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf)
import pekko.util.Helpers._
fromBinaryAsync(bytes, manifest).await()
}
}

Expand Down
16 changes: 16 additions & 0 deletions actor/src/main/scala/org/apache/pekko/util/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }
import com.typesafe.config.{ Config, ConfigRenderOptions }

import scala.concurrent.{ Await, Future }

object Helpers {

def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -195,4 +197,18 @@ object Helpers {
Duration(config.getDuration(path, unit), unit)
}

/**
* INTERNAL API
*/
private[pekko] final implicit class FutureOps[T](val future: Future[T]) extends AnyVal {

/**
* Wait for the future to complete and return the result, or throw an exception if the future failed.
* Optimize for the case when the future is already completed.
*/
def await(atMost: Duration = Duration.Inf): T = future.value match {
case Some(value) => value.get
case None => Await.result(future, atMost)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.pekko.stream

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Promise

import org.apache.pekko
Expand Down Expand Up @@ -77,7 +76,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
private[pekko] def createAdditionalSystemMaterializer(): Materializer = {
val started =
(materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

/**
Expand All @@ -91,12 +91,14 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
val started =
(materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings))
.mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
import pekko.util.Helpers._
started.await(materializerTimeout.duration).materializer
}

val materializer: Materializer = {
// block on async creation to make it effectively final
Await.result(systemMaterializerPromise.future, materializerTimeout.duration)
import pekko.util.Helpers._
systemMaterializerPromise.future.await(materializerTimeout.duration)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.stream.impl.io
import java.io.{ IOException, OutputStream }
import java.util.concurrent.{ Semaphore, TimeUnit }

import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

Expand Down Expand Up @@ -87,7 +86,8 @@ private[pekko] class OutputStreamAdapter(
}

try {
Await.result(sendToStage.invokeWithFeedback(Send(data)), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Send(data)).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down Expand Up @@ -115,7 +115,8 @@ private[pekko] class OutputStreamAdapter(
@scala.throws(classOf[IOException])
override def close(): Unit = {
try {
Await.result(sendToStage.invokeWithFeedback(Close), writeTimeout)
import pekko.util.Helpers._
sendToStage.invokeWithFeedback(Close).await(writeTimeout)
} catch {
case NonFatal(e) => throw new IOException(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import java.io.{ InputStream, OutputStream }
import java.util.Spliterators
import java.util.stream.{ Collector, StreamSupport }

import scala.concurrent.{ Await, Future }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.duration.Duration._

import org.apache.pekko
import pekko.NotUsed
Expand Down Expand Up @@ -197,7 +196,8 @@ object StreamConverters {
var nextElement: Option[T] = _

override def hasNext: Boolean = {
nextElement = Await.result(nextElementFuture, Inf)
import pekko.util.Helpers._
nextElement = nextElementFuture.await()
nextElement.isDefined
}

Expand Down

0 comments on commit ef0b126

Please sign in to comment.