Skip to content

Commit

Permalink
Merge branch 'main' into update/sbt-pekko-build-0.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 4, 2025
2 parents d7cbfc1 + 0cfde81 commit bae277d
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 7 deletions.
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.32")
addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.1.0")
addSbtPlugin("com.github.pjfanning" % "sbt-source-dist" % "0.1.12")
addSbtPlugin("com.github.pjfanning" % "sbt-pekko-build" % "0.4.2")
addSbtPlugin("com.github.reibitto" % "sbt-welcome" % "0.4.0")
addSbtPlugin("com.github.reibitto" % "sbt-welcome" % "0.5.0")
addSbtPlugin("com.github.sbt" % "sbt-license-report" % "1.7.0")
addSbtPlugin("io.github.roiocam" % "sbt-depend-walker" % "0.1.1")
addSbtPlugin("io.github.siculo" % "sbt-bom" % "0.3.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl;

import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
Expand Down Expand Up @@ -1663,4 +1664,33 @@ public void mustBeAbleToConvertToJavaInJava() {
org.apache.pekko.stream.scaladsl.Flow.apply();
Flow<Integer, Integer, NotUsed> javaFlow = scalaFlow.asJava();
}

@Test
public void useFlatMapPrefix() {
final List<Integer> resultList =
Source.range(1, 2)
.via(
Flow.of(Integer.class)
.flatMapPrefix(
1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix))))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.join();
Assert.assertEquals(Arrays.asList(1, 2), resultList);
}

@Test
public void useFlatMapPrefixSubSource() {
final Set<Integer> resultSet =
Source.range(1, 2)
.via(
Flow.of(Integer.class)
.groupBy(2, i -> i % 2)
.flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix)))
.mergeSubstreams())
.runWith(Sink.collect(Collectors.toSet()), system)
.toCompletableFuture()
.join();
Assert.assertEquals(Sets.newHashSet(1, 2), resultSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl;

import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
Expand Down Expand Up @@ -1504,4 +1505,28 @@ public void flattenOptionalOptional() throws Exception {
.get(3, TimeUnit.SECONDS);
Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList);
}

@Test
public void useFlatMapPrefix() {
final List<Integer> resultList =
Source.range(1, 2)
.flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix)))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.join();
Assert.assertEquals(Arrays.asList(1, 2), resultList);
}

@Test
public void useFlatMapPrefixSubSource() {
final Set<Integer> resultSet =
Source.range(1, 2)
.groupBy(2, i -> i % 2)
.flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix)))
.mergeSubstreams()
.runWith(Sink.collect(Collectors.toSet()), system)
.toCompletableFuture()
.join();
Assert.assertEquals(Sets.newHashSet(1, 2), resultSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Make flatMapPrefix javadsl using java.util.List
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefix")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefixMat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefix")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefixMat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubFlow.flatMapPrefix")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubSource.flatMapPrefix")
Original file line number Diff line number Diff line change
Expand Up @@ -2500,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = {
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Flow(newDelegate)
}
Expand All @@ -2511,7 +2511,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Flow[In, Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3980,7 +3980,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = {
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.Source(newDelegate)
}
Expand All @@ -3991,7 +3991,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*/
def flatMapPrefixMat[Out2, Mat2, Mat3](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]],
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]],
matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Source[Out2, Mat3] = {
val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) =>
matF(m1, fm2.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ class SubFlow[In, Out, Mat](
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = {
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubFlow(newDelegate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ class SubSource[Out, Mat](
*/
def flatMapPrefix[Out2, Mat2](
n: Int,
f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = {
f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = {
val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala)
new javadsl.SubSource(newDelegate)
}
Expand Down

0 comments on commit bae277d

Please sign in to comment.