Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sbt-pekko-build to 0.3.3 #69

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/generate-doc-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ jobs:
sudo apt-get install graphviz

- name: Compile testClass&docs for all Scala versions
run: sbt ";TestJdk9 / compile ; +compile:doc"
run: sbt ";+TestJdk9 / compile ; +compile:doc"
18 changes: 18 additions & 0 deletions .github/workflows/nightly-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,21 @@ jobs:
sbt \
-Dpekko.build.scalaVersion=${{ matrix.scalaVersion }} \
"+~ ${{ matrix.scalaVersion }} publishLocal publishM2"

- name: Install scala-cli
if: ${{ contains('11,17,21', matrix.javaVersion) }}
run: |-
curl -sS "https://virtuslab.github.io/scala-cli-packages/KEY.gpg" | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/scala-cli.gpg 2>/dev/null
sudo curl -s --compressed -o /etc/apt/sources.list.d/scala_cli_packages.list "https://virtuslab.github.io/scala-cli-packages/debian/scala_cli_packages.list"
sudo apt update
sudo apt install scala-cli

- name: Use Scala-CLI to verify jdk 9 classes
if: ${{ contains('11,17,21', matrix.javaVersion) }}
run: |-
scala-cli --version
echo "Starting verification with Scala-CLI"
scala-cli stream/target/scala-cli/VerifyJDK9Classes.sc && echo "Verification successful" || (
echo "Error when VerifyJDK9Classes"
exit 1
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.pekko.annotation.InternalApi
*/
@InternalApi private[pekko] object PartialFunction {

inline def fromFunction[A, B](f: A => B): scala.PartialFunction[A, B] =
def fromFunction[A, B](f: A => B): scala.PartialFunction[A, B] =
scala.PartialFunction.fromFunction(f)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ import pekko.annotation.InternalApi
*/
@InternalApi
private[dispatch] object SameThreadExecutionContext {
inline def apply(): ExecutionContext = ExecutionContext.parasitic
def apply(): ExecutionContext = ExecutionContext.parasitic

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ private[pekko] object FutureConverters {
def asJava[T](f: Future[T]): CompletionStage[T] = javaapi.FutureConverters.asJava(f)

implicit final class FutureOps[T](private val f: Future[T]) extends AnyVal {
inline def asJava: CompletionStage[T] = javaapi.FutureConverters.asJava(f)
def asJava: CompletionStage[T] = javaapi.FutureConverters.asJava(f)
}

// Ideally this should have the Scala 3 inline keyword but then Java sources are
// unable to call this method, see https://github.com/lampepfl/dotty/issues/19346
def asScala[T](cs: CompletionStage[T]): Future[T] = javaapi.FutureConverters.asScala(cs)

implicit final class CompletionStageOps[T](private val cs: CompletionStage[T]) extends AnyVal {
inline def asScala: Future[T] = javaapi.FutureConverters.asScala(cs)
def asScala: Future[T] = javaapi.FutureConverters.asScala(cs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/

package org.apache.pekko.util

import java.time.{ Duration => JDuration }

import scala.concurrent.duration.{ Duration, FiniteDuration }
Expand All @@ -29,10 +30,10 @@ private[pekko] object JavaDurationConverters {
def asFiniteDuration(duration: JDuration): FiniteDuration = duration.asScala

final implicit class JavaDurationOps(val self: JDuration) extends AnyVal {
inline def asScala: FiniteDuration = Duration.fromNanos(self.toNanos)
def asScala: FiniteDuration = Duration.fromNanos(self.toNanos)
}

final implicit class ScalaDurationOps(val self: Duration) extends AnyVal {
inline def asJava: JDuration = JDuration.ofNanos(self.toNanos)
def asJava: JDuration = JDuration.ofNanos(self.toNanos)
}
}
22 changes: 11 additions & 11 deletions actor/src/main/scala-3/org/apache/pekko/util/OptionConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.jdk.OptionShape
/**
* INTERNAL API
*
* Remove this once Scala 2.12 support is dropped since all methods are in Scala 2.13+ stdlib
* Remove this once Scala 2.12 support is dropped since all methods are in Scala 2.13+ stdlib.
*/
@InternalStableApi
private[pekko] object OptionConverters {
Expand All @@ -35,45 +35,45 @@ private[pekko] object OptionConverters {
final def toJava[A](o: Option[A]): Optional[A] = scala.jdk.javaapi.OptionConverters.toJava(o)

implicit final class RichOptional[A](private val o: java.util.Optional[A]) extends AnyVal {
inline def toScala: Option[A] = scala.jdk.OptionConverters.RichOptional(o).toScala
def toScala: Option[A] = scala.jdk.OptionConverters.RichOptional(o).toScala

inline def toJavaPrimitive[O](implicit shape: OptionShape[A, O]): O =
def toJavaPrimitive[O](implicit shape: OptionShape[A, O]): O =
scala.jdk.OptionConverters.RichOptional(o).toJavaPrimitive
}

implicit final class RichOption[A](private val o: Option[A]) extends AnyVal {
inline def toJava: Optional[A] = scala.jdk.OptionConverters.RichOption(o).toJava
def toJava: Optional[A] = scala.jdk.OptionConverters.RichOption(o).toJava

inline def toJavaPrimitive[O](implicit shape: OptionShape[A, O]): O =
def toJavaPrimitive[O](implicit shape: OptionShape[A, O]): O =
scala.jdk.OptionConverters.RichOption(o).toJavaPrimitive
}

implicit class RichOptionalDouble(private val o: OptionalDouble) extends AnyVal {

/** Convert a Java `OptionalDouble` to a Scala `Option` */
inline def toScala: Option[Double] = scala.jdk.OptionConverters.RichOptionalDouble(o).toScala
def toScala: Option[Double] = scala.jdk.OptionConverters.RichOptionalDouble(o).toScala

/** Convert a Java `OptionalDouble` to a generic Java `Optional` */
inline def toJavaGeneric: Optional[Double] = scala.jdk.OptionConverters.RichOptionalDouble(o).toJavaGeneric
def toJavaGeneric: Optional[Double] = scala.jdk.OptionConverters.RichOptionalDouble(o).toJavaGeneric
}

/** Provides conversions from `OptionalInt` to Scala `Option` and the generic `Optional` */
implicit class RichOptionalInt(private val o: OptionalInt) extends AnyVal {

/** Convert a Java `OptionalInt` to a Scala `Option` */
inline def toScala: Option[Int] = scala.jdk.OptionConverters.RichOptionalInt(o).toScala
def toScala: Option[Int] = scala.jdk.OptionConverters.RichOptionalInt(o).toScala

/** Convert a Java `OptionalInt` to a generic Java `Optional` */
inline def toJavaGeneric: Optional[Int] = scala.jdk.OptionConverters.RichOptionalInt(o).toJavaGeneric
def toJavaGeneric: Optional[Int] = scala.jdk.OptionConverters.RichOptionalInt(o).toJavaGeneric
}

/** Provides conversions from `OptionalLong` to Scala `Option` and the generic `Optional` */
implicit class RichOptionalLong(private val o: OptionalLong) extends AnyVal {

/** Convert a Java `OptionalLong` to a Scala `Option` */
inline def toScala: Option[Long] = scala.jdk.OptionConverters.RichOptionalLong(o).toScala
def toScala: Option[Long] = scala.jdk.OptionConverters.RichOptionalLong(o).toScala

/** Convert a Java `OptionalLong` to a generic Java `Optional` */
inline def toJavaGeneric: Optional[Long] = scala.jdk.OptionConverters.RichOptionalLong(o).toJavaGeneric
def toJavaGeneric: Optional[Long] = scala.jdk.OptionConverters.RichOptionalLong(o).toJavaGeneric
}
}
40 changes: 21 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ ThisBuild / versionScheme := Some(VersionScheme.SemVerSpec)
sourceDistName := "apache-pekko"
sourceDistIncubating := true

commands := commands.value.filterNot { command =>
command.nameOption.exists { name =>
name.contains("sonatypeRelease") || name.contains("sonatypeBundleRelease")
}
}

ThisBuild / reproducibleBuildsCheckResolver := Resolver.ApacheMavenStagingRepo

ThisBuild / pekkoCoreProject := true
Expand Down Expand Up @@ -132,7 +126,7 @@ lazy val actor = pekkoModule("actor")
.settings(AddMetaInfLicenseFiles.actorSettings)
.settings(VersionGenerator.settings)
.settings(serialversionRemoverPluginSettings)
.enablePlugins(BoilerplatePlugin)
.enablePlugins(BoilerplatePlugin, SbtOsgi)

lazy val actorTests = pekkoModule("actor-tests")
.dependsOn(testkit % "compile->compile;test->test", actor)
Expand Down Expand Up @@ -167,7 +161,7 @@ lazy val cluster = pekkoModule("cluster")
.settings(OSGi.cluster)
.settings(Protobuf.settings)
.settings(Test / parallelExecution := false)
.enablePlugins(MultiNodeScalaTest)
.enablePlugins(MultiNodeScalaTest, SbtOsgi)

lazy val clusterMetrics = pekkoModule("cluster-metrics")
.dependsOn(
Expand All @@ -180,7 +174,7 @@ lazy val clusterMetrics = pekkoModule("cluster-metrics")
.settings(Protobuf.settings)
.settings(SigarLoader.sigarSettings)
.settings(Test / parallelExecution := false)
.enablePlugins(MultiNodeScalaTest)
.enablePlugins(MultiNodeScalaTest, SbtOsgi)

lazy val clusterSharding = pekkoModule("cluster-sharding")
// TODO pekko-persistence dependency should be provided in pom.xml artifact.
Expand All @@ -197,8 +191,7 @@ lazy val clusterSharding = pekkoModule("cluster-sharding")
.settings(AutomaticModuleName.settings("pekko.cluster.sharding"))
.settings(OSGi.clusterSharding)
.settings(Protobuf.settings)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
.enablePlugins(Jdk9)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams, Jdk9, SbtOsgi)

lazy val clusterTools = pekkoModule("cluster-tools")
.dependsOn(
Expand All @@ -209,7 +202,7 @@ lazy val clusterTools = pekkoModule("cluster-tools")
.settings(AutomaticModuleName.settings("pekko.cluster.tools"))
.settings(OSGi.clusterTools)
.settings(Protobuf.settings)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams, SbtOsgi)

lazy val distributedData = pekkoModule("distributed-data")
.dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm", jackson % "test->test")
Expand All @@ -218,7 +211,7 @@ lazy val distributedData = pekkoModule("distributed-data")
.settings(AddMetaInfLicenseFiles.distributedDataSettings)
.settings(OSGi.distributedData)
.settings(Protobuf.settings)
.enablePlugins(MultiNodeScalaTest)
.enablePlugins(MultiNodeScalaTest, SbtOsgi)

lazy val docs = pekkoModule("docs")
.configs(Jdk9.TestJdk9)
Expand Down Expand Up @@ -274,7 +267,7 @@ lazy val jackson = pekkoModule("serialization-jackson")
.settings(AutomaticModuleName.settings("pekko.serialization.jackson"))
.settings(OSGi.jackson)
.settings(javacOptions += "-parameters")
.enablePlugins(ScaladocNoVerificationOfDiagrams)
.enablePlugins(ScaladocNoVerificationOfDiagrams, SbtOsgi)

lazy val multiNodeTestkit = pekkoModule("multi-node-testkit")
.dependsOn(remote, testkit)
Expand All @@ -289,6 +282,7 @@ lazy val osgi = pekkoModule("osgi")
.settings(AutomaticModuleName.settings("pekko.osgi"))
.settings(OSGi.osgi)
.settings(Test / parallelExecution := false)
.enablePlugins(SbtOsgi)

lazy val persistence = pekkoModule("persistence")
.dependsOn(actor, stream, testkit % "test->test")
Expand All @@ -297,6 +291,7 @@ lazy val persistence = pekkoModule("persistence")
.settings(OSGi.persistence)
.settings(Protobuf.settings)
.settings(Test / fork := true)
.enablePlugins(SbtOsgi)

lazy val persistenceQuery = pekkoModule("persistence-query")
.dependsOn(
Expand All @@ -312,7 +307,7 @@ lazy val persistenceQuery = pekkoModule("persistence-query")
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "remote" / "src" / "main" / "protobuf"))
.settings(Test / fork := true)
.enablePlugins(ScaladocNoVerificationOfDiagrams)
.enablePlugins(ScaladocNoVerificationOfDiagrams, SbtOsgi)

lazy val persistenceShared = pekkoModule("persistence-shared")
.dependsOn(persistence % "test->test", testkit % "test->test", remote % Test)
Expand Down Expand Up @@ -357,7 +352,7 @@ lazy val protobufV3 = pekkoModule("protobuf-v3")
.settings(OSGi.protobufV3)
.settings(AutomaticModuleName.settings("pekko.protobuf.v3"))
.settings(AddMetaInfLicenseFiles.protobufV3Settings)
.enablePlugins(ScaladocNoVerificationOfDiagrams)
.enablePlugins(ScaladocNoVerificationOfDiagrams, SbtOsgi)
.disablePlugins(MimaPlugin)
.settings(
libraryDependencies += Dependencies.Compile.Provided.protobufRuntime,
Expand Down Expand Up @@ -407,7 +402,7 @@ lazy val remote =
.settings(Protobuf.settings)
.settings(Test / parallelExecution := false)
.settings(serialversionRemoverPluginSettings)
.enablePlugins(Jdk9)
.enablePlugins(Jdk9, SbtOsgi)

lazy val remoteTests = pekkoModule("remote-tests")
.dependsOn(
Expand All @@ -427,20 +422,23 @@ lazy val slf4j = pekkoModule("slf4j")
.settings(Dependencies.slf4j)
.settings(AutomaticModuleName.settings("pekko.slf4j"))
.settings(OSGi.slf4j)
.enablePlugins(SbtOsgi)

lazy val stream = pekkoModule("stream")
.dependsOn(actor, protobufV3)
.settings(Dependencies.stream)
.settings(AutomaticModuleName.settings("pekko.stream"))
.settings(OSGi.stream)
.settings(Protobuf.settings)
.enablePlugins(BoilerplatePlugin, Jdk9)
.settings(VerifyJDK9Classes.settings)
.enablePlugins(BoilerplatePlugin, Jdk9, SbtOsgi)

lazy val streamTestkit = pekkoModule("stream-testkit")
.dependsOn(stream, testkit % "compile->compile;test->test")
.settings(Dependencies.streamTestkit)
.settings(AutomaticModuleName.settings("pekko.stream.testkit"))
.settings(OSGi.streamTestkit)
.enablePlugins(SbtOsgi)

lazy val streamTests = pekkoModule("stream-tests")
.configs(Jdk9.TestJdk9)
Expand All @@ -467,6 +465,7 @@ lazy val testkit = pekkoModule("testkit")
.settings(AutomaticModuleName.settings("pekko.actor.testkit"))
.settings(OSGi.testkit)
.settings(initialCommands += "import org.apache.pekko.testkit._")
.enablePlugins(SbtOsgi)

lazy val actorTyped = pekkoModule("actor-typed")
.dependsOn(actor, slf4j)
Expand All @@ -487,7 +486,7 @@ lazy val actorTyped = pekkoModule("actor-typed")

implicit val timeout = Timeout(5 seconds)
""")
.enablePlugins(Jdk9)
.enablePlugins(Jdk9, SbtOsgi)

lazy val persistenceTyped = pekkoModule("persistence-typed")
.dependsOn(
Expand All @@ -508,6 +507,7 @@ lazy val persistenceTyped = pekkoModule("persistence-typed")
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "remote" / "src" / "main" / "protobuf"))
.settings(OSGi.persistenceTyped)
.enablePlugins(SbtOsgi)

lazy val clusterTyped = pekkoModule("cluster-typed")
.dependsOn(
Expand Down Expand Up @@ -580,12 +580,14 @@ lazy val discovery = pekkoModule("discovery")
.settings(Dependencies.discovery)
.settings(AutomaticModuleName.settings("pekko.discovery"))
.settings(OSGi.discovery)
.enablePlugins(SbtOsgi)

lazy val coordination = pekkoModule("coordination")
.dependsOn(actor, testkit % "test->test", actorTests % "test->test")
.settings(Dependencies.coordination)
.settings(AutomaticModuleName.settings("pekko.coordination"))
.settings(OSGi.coordination)
.enablePlugins(SbtOsgi)

lazy val billOfMaterials = Project("bill-of-materials", file("bill-of-materials"))
.enablePlugins(BillOfMaterialsPlugin)
Expand Down
47 changes: 47 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/forall.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Sink.forall

A `Sink` that will test the given predicate `p` for every received element and completes with the result.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.forall](Sink$) { scala="#forall[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" }

## Description
forall applies a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false.

It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.

Notes that if source is empty, it will return true

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for all elements;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for any element.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

## Example

This example tests all elements in the stream is `<=` 100.

Scala
: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala) { #forall }

Java
: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java) { #forall }

## Reactive Streams Semantics

@@@div { .callout }

***Completes*** when upstream completes or the predicate `p` returns `false`

**cancels** when predicate `p` returns `false`

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
Expand Down Expand Up @@ -459,6 +460,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
* [foldWhile](Source-or-Flow/foldWhile.md)
* [foldWhile](Sink/foldWhile.md)
* [forall](Sink/forall.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
Expand Down
Loading