From d973281a9051f7ea8161442d4d6bf0303dcda37a Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 7 Mar 2023 08:32:53 -0800 Subject: [PATCH 1/4] Add Publisher#mergeWith operators Motivation: Merging Publishers of the same type can be done with operator composition. However it is not obvious what operators need to be used and adding another operator for this operation will aid discovery. --- .../servicetalk/concurrent/api/Publisher.java | 148 +++++++++++++++ .../api/PublisherMergeWithTest.java | 169 ++++++++++++++++++ .../PublisherZipWithDelayErrorTckTest.java | 27 +++ .../tck/PublisherZipWithTckTest.java | 27 +++ 4 files changed, 371 insertions(+) create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 19e5861a30..5b2b8341fc 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -65,6 +65,7 @@ import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; import static io.servicetalk.utils.internal.DurationUtils.toNanos; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; /** * An asynchronous computation that produces 0, 1 or more elements and may or may not terminate successfully or with @@ -1521,6 +1522,78 @@ public final Publisher flatMapConcatIterable(Function(this, mapper); } + /** + * Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfThisPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *         future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param other The {@link Publisher} to merge with. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeWithDelayError(Publisher) + * @see #merge(Publisher[]) + */ + public final Publisher mergeWith(Publisher other) { + return from(this, other).flatMapMerge(identity(), 2); + } + + /** + * Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If either {@link Publisher} fails the error propagation will be delayed until both terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfThisPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param other The {@link Publisher} to merge with, + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeWith(Publisher) + * @see #mergeDelayError(Publisher[]) + */ + public final Publisher mergeWithDelayError(Publisher other) { + return from(this, other).flatMapMergeDelayError(identity(), 2, 2); + } + /** * Invokes the {@code onSubscribe} {@link Consumer} argument when * {@link Subscriber#onSubscribe(PublisherSource.Subscription)} is called for {@link Subscriber}s of the returned @@ -4014,6 +4087,81 @@ public static Publisher defer(Supplier> return new PublisherDefer<>(publisherSupplier); } + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *        future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeDelayError(Publisher[]) + */ + @SafeVarargs + public static Publisher merge(Publisher... publishers) { + return from(publishers).flatMapMerge(identity(), publishers.length); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until + * all terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #merge(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeDelayError(Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity(), publishers.length, publishers.length); + } + // // Static Utility Methods End // diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java new file mode 100644 index 0000000000..dc2cf5d30c --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; + +import static io.servicetalk.concurrent.api.Publisher.merge; +import static io.servicetalk.concurrent.api.Publisher.mergeDelayError; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.sameInstance; + +final class PublisherMergeWithTest { + private TestPublisher first; + private TestPublisher second; + private TestPublisher third; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + first = new TestPublisher<>(); + second = new TestPublisher<>(); + third = new TestPublisher<>(); + subscriber = new TestPublisherSubscriber<>(); + } + + @SuppressWarnings("unused") + private static Iterable completeSource() { + List parameters = new ArrayList<>(); + for (boolean inOrderOnNext : asList(false, true)) { + for (boolean inOrderTerminate : asList(false, true)) { + for (boolean firstOnError : asList(false, true)) { + for (boolean delayError : asList(false, true)) { + parameters.add(Arguments.of(inOrderOnNext, inOrderTerminate, firstOnError, delayError)); + } + } + } + } + return parameters; + } + + @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") + @MethodSource("completeSource") + void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { + toSource(delayError ? first.mergeWithDelayError(second) : first.mergeWith(second)).subscribe(subscriber); + subscriber.awaitSubscription().request(2); + int i = 3; + int j = 4; + if (inOrderOnNext) { + first.onNext(i); + second.onNext(j); + assertThat(subscriber.takeOnNext(2), contains(i, j)); + } else { + second.onNext(j); + first.onNext(i); + assertThat(subscriber.takeOnNext(2), contains(j, i)); + } + + if (inOrderTerminate) { + if (firstOnError) { + first.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + first.onComplete(); + } + + second.onComplete(); + } else { + if (firstOnError) { + second.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + second.onComplete(); + } + first.onComplete(); + } + + if (!firstOnError) { + subscriber.awaitOnComplete(); + } else if (delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } + + @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") + @MethodSource("completeSource") + void allComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { + toSource(delayError ? mergeDelayError(first, second, third) : merge(first, second, third)) + .subscribe(subscriber); + subscriber.awaitSubscription().request(3); + int i = 3; + int j = 4; + int x = 5; + if (inOrderOnNext) { + first.onNext(i); + second.onNext(j); + third.onNext(x); + assertThat(subscriber.takeOnNext(3), contains(i, j, x)); + } else { + second.onNext(j); + third.onNext(x); + first.onNext(i); + assertThat(subscriber.takeOnNext(3), contains(j, x, i)); + } + + if (inOrderTerminate) { + if (firstOnError) { + first.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + first.onComplete(); + } + + second.onComplete(); + third.onComplete(); + } else { + if (firstOnError) { + third.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + third.onComplete(); + } + second.onComplete(); + first.onComplete(); + } + + if (!firstOnError) { + subscriber.awaitOnComplete(); + } else if (delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java new file mode 100644 index 0000000000..ee98615cb5 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherZipWithDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.mergeWithDelayError(empty()); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java new file mode 100644 index 0000000000..b1a525b8b7 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherZipWithTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.mergeWith(empty()); + } +} From 152aff5a182e85dddc418bd6722027850cd0ee41 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 7 Mar 2023 09:13:18 -0800 Subject: [PATCH 2/4] use extends --- .../java/io/servicetalk/concurrent/api/Publisher.java | 8 ++++---- .../servicetalk/http/api/StreamingHttpPayloadHolder.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 5b2b8341fc..e63da75343 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -1551,7 +1551,7 @@ public final Publisher flatMapConcatIterable(Function mergeWith(Publisher other) { + public final Publisher mergeWith(Publisher other) { return from(this, other).flatMapMerge(identity(), 2); } @@ -1590,7 +1590,7 @@ public final Publisher mergeWith(Publisher other) { * @see #mergeWith(Publisher) * @see #mergeDelayError(Publisher[]) */ - public final Publisher mergeWithDelayError(Publisher other) { + public final Publisher mergeWithDelayError(Publisher other) { return from(this, other).flatMapMergeDelayError(identity(), 2, 2); } @@ -4117,7 +4117,7 @@ public static Publisher defer(Supplier> * @see #mergeDelayError(Publisher[]) */ @SafeVarargs - public static Publisher merge(Publisher... publishers) { + public static Publisher merge(Publisher... publishers) { return from(publishers).flatMapMerge(identity(), publishers.length); } @@ -4158,7 +4158,7 @@ public static Publisher merge(Publisher... publishers) { * @see #merge(Publisher[]) */ @SafeVarargs - public static Publisher mergeDelayError(Publisher... publishers) { + public static Publisher mergeDelayError(Publisher... publishers) { return from(publishers).flatMapMergeDelayError(identity(), publishers.length, publishers.length); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java index d4d3cc0591..ce8d98730d 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java @@ -267,7 +267,7 @@ private static void throwOnNextAfterTrailersException(HttpHeaders trailers, @Nul * @param s The {@link Single} to merge. * @return The result of the merge operation. */ - private static Publisher merge(Publisher p, Single s) { + private static Publisher merge(Publisher p, Single s) { // We filter null from the Single in case the publisher completes and we didn't find trailers. return from(p, s.toPublisher().filter(Objects::nonNull)).flatMapMerge(identity(), 2); } From 52f30fc927de90f5d7688f453ef4ed9473f3a3b8 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 7 Mar 2023 11:08:24 -0800 Subject: [PATCH 3/4] review comments --- .../servicetalk/concurrent/api/Publisher.java | 103 ++++++++++++++++-- .../concurrent/api/PublisherFlatMapMerge.java | 5 + .../api/PublisherFlatMapSingle.java | 5 + .../api/PublisherMergeWithTest.java | 8 +- .../tck/PublisherMergeDelayErrorTckTest.java | 54 +++++++++ .../tck/PublisherMergeTckTest.java | 55 ++++++++++ .../PublisherZipWithDelayErrorTckTest.java | 27 ----- .../tck/PublisherZipWithTckTest.java | 27 ----- .../http/api/StreamingHttpPayloadHolder.java | 2 +- 9 files changed, 215 insertions(+), 71 deletions(-) create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java delete mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java delete mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index e63da75343..5b9edb64d4 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -1548,10 +1548,10 @@ public final Publisher flatMapConcatIterable(FunctionReactiveX merge operator - * @see #mergeWithDelayError(Publisher) - * @see #merge(Publisher[]) + * @see #mergeDelayError(Publisher) + * @see #mergeAll(Publisher[]) */ - public final Publisher mergeWith(Publisher other) { + public final Publisher merge(Publisher other) { return from(this, other).flatMapMerge(identity(), 2); } @@ -1587,10 +1587,10 @@ public final Publisher mergeWith(Publisher other) { * @param other The {@link Publisher} to merge with, * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #mergeWith(Publisher) - * @see #mergeDelayError(Publisher[]) + * @see #merge(Publisher) + * @see #mergeAllDelayError(Publisher[]) */ - public final Publisher mergeWithDelayError(Publisher other) { + public final Publisher mergeDelayError(Publisher other) { return from(this, other).flatMapMergeDelayError(identity(), 2, 2); } @@ -4114,11 +4114,90 @@ public static Publisher defer(Supplier> * @param Type of items emitted by the returned {@link Publisher}. * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #mergeDelayError(Publisher[]) + * @see #mergeAll(int, Publisher[]) + * @see #mergeAllDelayError(Publisher[]) */ @SafeVarargs - public static Publisher merge(Publisher... publishers) { - return from(publishers).flatMapMerge(identity(), publishers.length); + public static Publisher mergeAll(Publisher... publishers) { + return from(publishers).flatMapMerge(identity()); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *        future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to + * concurrently. + * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAll(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMerge(identity(), maxConcurrency); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until + * all terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + * @see #mergeAll(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAllDelayError(Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity()); } /** @@ -4155,11 +4234,11 @@ public static Publisher merge(Publisher... publishers) { * @param Type of items emitted by the returned {@link Publisher}. * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. * @see ReactiveX merge operator - * @see #merge(Publisher[]) + * @see #mergeAll(Publisher[]) */ @SafeVarargs - public static Publisher mergeDelayError(Publisher... publishers) { - return from(publishers).flatMapMergeDelayError(identity(), publishers.length, publishers.length); + public static Publisher mergeAllDelayError(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity(), maxConcurrency); } // diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java index 36761c332a..081df88867 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java @@ -39,6 +39,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -175,6 +176,10 @@ public void request(final long n) { incMappedDemand(n); } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java index cb9987f98e..5fc1dac72f 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java @@ -43,6 +43,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -161,6 +162,10 @@ public void request(long n) { } } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java index dc2cf5d30c..6789788e3f 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java @@ -25,8 +25,8 @@ import java.util.ArrayList; import java.util.List; -import static io.servicetalk.concurrent.api.Publisher.merge; -import static io.servicetalk.concurrent.api.Publisher.mergeDelayError; +import static io.servicetalk.concurrent.api.Publisher.mergeAll; +import static io.servicetalk.concurrent.api.Publisher.mergeAllDelayError; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static java.util.Arrays.asList; @@ -66,7 +66,7 @@ private static Iterable completeSource() { @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") @MethodSource("completeSource") void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { - toSource(delayError ? first.mergeWithDelayError(second) : first.mergeWith(second)).subscribe(subscriber); + toSource(delayError ? first.mergeDelayError(second) : first.merge(second)).subscribe(subscriber); subscriber.awaitSubscription().request(2); int i = 3; int j = 4; @@ -115,7 +115,7 @@ void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean first @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") @MethodSource("completeSource") void allComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { - toSource(delayError ? mergeDelayError(first, second, third) : merge(first, second, third)) + toSource(delayError ? mergeAllDelayError(first, second, third) : mergeAll(first, second, third)) .subscribe(subscriber); subscriber.awaitSubscription().request(3); int i = 3; diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java new file mode 100644 index 0000000000..46a5d58402 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).mergeDelayError(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.mergeDelayError(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java new file mode 100644 index 0000000000..cdd012ec33 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).merge(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.merge(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + // merge operator proactively requests from upstream, and will not deliver errors until demand comes. + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java deleted file mode 100644 index ee98615cb5..0000000000 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithDelayErrorTckTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright © 2023 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.concurrent.reactivestreams.tck; - -import io.servicetalk.concurrent.api.Publisher; - -import static io.servicetalk.concurrent.api.Publisher.empty; - -public class PublisherZipWithDelayErrorTckTest extends AbstractPublisherOperatorTckTest { - @Override - protected Publisher composePublisher(final Publisher publisher, final int elements) { - return publisher.mergeWithDelayError(empty()); - } -} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java deleted file mode 100644 index b1a525b8b7..0000000000 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherZipWithTckTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright © 2023 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.concurrent.reactivestreams.tck; - -import io.servicetalk.concurrent.api.Publisher; - -import static io.servicetalk.concurrent.api.Publisher.empty; - -public class PublisherZipWithTckTest extends AbstractPublisherOperatorTckTest { - @Override - protected Publisher composePublisher(final Publisher publisher, final int elements) { - return publisher.mergeWith(empty()); - } -} diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java index ce8d98730d..d4d3cc0591 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpPayloadHolder.java @@ -267,7 +267,7 @@ private static void throwOnNextAfterTrailersException(HttpHeaders trailers, @Nul * @param s The {@link Single} to merge. * @return The result of the merge operation. */ - private static Publisher merge(Publisher p, Single s) { + private static Publisher merge(Publisher p, Single s) { // We filter null from the Single in case the publisher completes and we didn't find trailers. return from(p, s.toPublisher().filter(Objects::nonNull)).flatMapMerge(identity(), 2); } From ea575ce234a608beaa3a9d37737da88e00c18379 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 7 Mar 2023 12:07:44 -0800 Subject: [PATCH 4/4] javadoc --- .../src/main/java/io/servicetalk/concurrent/api/Publisher.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 5b9edb64d4..757c8c4003 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -4230,6 +4230,8 @@ public static Publisher mergeAllDelayError(Publisher... publ * throwExceptionIfNotEmpty(errors); * return mergedResults; * } + * @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to + * concurrently. * @param publishers The {@link Publisher}s to merge together. * @param Type of items emitted by the returned {@link Publisher}. * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.