diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 19e5861a30..757c8c4003 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -65,6 +65,7 @@ import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; import static io.servicetalk.utils.internal.DurationUtils.toNanos; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; /** * An asynchronous computation that produces 0, 1 or more elements and may or may not terminate successfully or with @@ -1521,6 +1522,78 @@ public final Publisher flatMapConcatIterable(Function(this, mapper); } + /** + * Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfThisPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *         future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param other The {@link Publisher} to merge with. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeDelayError(Publisher) + * @see #mergeAll(Publisher[]) + */ + public final Publisher merge(Publisher other) { + return from(this, other).flatMapMerge(identity(), 2); + } + + /** + * Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If either {@link Publisher} fails the error propagation will be delayed until both terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfThisPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param other The {@link Publisher} to merge with, + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #merge(Publisher) + * @see #mergeAllDelayError(Publisher[]) + */ + public final Publisher mergeDelayError(Publisher other) { + return from(this, other).flatMapMergeDelayError(identity(), 2, 2); + } + /** * Invokes the {@code onSubscribe} {@link Consumer} argument when * {@link Subscriber#onSubscribe(PublisherSource.Subscription)} is called for {@link Subscriber}s of the returned @@ -4014,6 +4087,162 @@ public static Publisher defer(Supplier> return new PublisherDefer<>(publisherSupplier); } + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *        future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAll(int, Publisher[]) + * @see #mergeAllDelayError(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAll(Publisher... publishers) { + return from(publishers).flatMapMerge(identity()); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (Future future : futures) {
+     *        future.get(); // Throws if the processing for this item failed.
+     *     }
+     *     return mergedResults;
+     * }
+ * @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to + * concurrently. + * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAll(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMerge(identity(), maxConcurrency); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until + * all terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAllDelayError(int, Publisher[]) + * @see #mergeAll(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAllDelayError(Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity()); + } + + /** + * Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned + * {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until + * all terminate. + *

+ * This method provides similar capabilities as expanding each result into a collection and concatenating each + * collection in sequential programming: + *

{@code
+     *     List mergedResults = ...; // concurrent safe list
+     *     for (T t : resultOfPublisher1()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     for (T t : resultOfOtherPublisher()) {
+     *         futures.add(e.submit(() -> {
+     *             return mergedResults.add(t);
+     *         }));
+     *     }
+     *     List errors = ...;
+     *     for (Future future : futures) {
+     *         try {
+     *           future.get(); // Throws if the processing for this item failed.
+     *         } catch (Throwable cause) {
+     *           errors.add(cause);
+     *         }
+     *     }
+     *     throwExceptionIfNotEmpty(errors);
+     *     return mergedResults;
+     * }
+ * @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to + * concurrently. + * @param publishers The {@link Publisher}s to merge together. + * @param Type of items emitted by the returned {@link Publisher}. + * @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together. + * @see ReactiveX merge operator + * @see #mergeAll(Publisher[]) + */ + @SafeVarargs + public static Publisher mergeAllDelayError(int maxConcurrency, Publisher... publishers) { + return from(publishers).flatMapMergeDelayError(identity(), maxConcurrency); + } + // // Static Utility Methods End // diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java index 36761c332a..081df88867 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapMerge.java @@ -39,6 +39,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -175,6 +176,10 @@ public void request(final long n) { incMappedDemand(n); } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java index cb9987f98e..5fc1dac72f 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherFlatMapSingle.java @@ -43,6 +43,7 @@ import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock; import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription; import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid; +import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN; import static io.servicetalk.concurrent.internal.TerminalNotification.complete; import static io.servicetalk.concurrent.internal.TerminalNotification.error; import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue; @@ -161,6 +162,10 @@ public void request(long n) { } } else { subscription.request(n); + // If the upstream source has already sent an onComplete signal, it won't be able to send an error. + // We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to + // ensure we see an error. + enqueueAndDrain(error(newExceptionForInvalidRequestN(n))); } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java new file mode 100644 index 0000000000..6789788e3f --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherMergeWithTest.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; + +import static io.servicetalk.concurrent.api.Publisher.mergeAll; +import static io.servicetalk.concurrent.api.Publisher.mergeAllDelayError; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.sameInstance; + +final class PublisherMergeWithTest { + private TestPublisher first; + private TestPublisher second; + private TestPublisher third; + private TestPublisherSubscriber subscriber; + + @BeforeEach + void setUp() { + first = new TestPublisher<>(); + second = new TestPublisher<>(); + third = new TestPublisher<>(); + subscriber = new TestPublisherSubscriber<>(); + } + + @SuppressWarnings("unused") + private static Iterable completeSource() { + List parameters = new ArrayList<>(); + for (boolean inOrderOnNext : asList(false, true)) { + for (boolean inOrderTerminate : asList(false, true)) { + for (boolean firstOnError : asList(false, true)) { + for (boolean delayError : asList(false, true)) { + parameters.add(Arguments.of(inOrderOnNext, inOrderTerminate, firstOnError, delayError)); + } + } + } + } + return parameters; + } + + @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") + @MethodSource("completeSource") + void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { + toSource(delayError ? first.mergeDelayError(second) : first.merge(second)).subscribe(subscriber); + subscriber.awaitSubscription().request(2); + int i = 3; + int j = 4; + if (inOrderOnNext) { + first.onNext(i); + second.onNext(j); + assertThat(subscriber.takeOnNext(2), contains(i, j)); + } else { + second.onNext(j); + first.onNext(i); + assertThat(subscriber.takeOnNext(2), contains(j, i)); + } + + if (inOrderTerminate) { + if (firstOnError) { + first.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + first.onComplete(); + } + + second.onComplete(); + } else { + if (firstOnError) { + second.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + second.onComplete(); + } + first.onComplete(); + } + + if (!firstOnError) { + subscriber.awaitOnComplete(); + } else if (delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } + + @ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}") + @MethodSource("completeSource") + void allComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) { + toSource(delayError ? mergeAllDelayError(first, second, third) : mergeAll(first, second, third)) + .subscribe(subscriber); + subscriber.awaitSubscription().request(3); + int i = 3; + int j = 4; + int x = 5; + if (inOrderOnNext) { + first.onNext(i); + second.onNext(j); + third.onNext(x); + assertThat(subscriber.takeOnNext(3), contains(i, j, x)); + } else { + second.onNext(j); + third.onNext(x); + first.onNext(i); + assertThat(subscriber.takeOnNext(3), contains(j, x, i)); + } + + if (inOrderTerminate) { + if (firstOnError) { + first.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + first.onComplete(); + } + + second.onComplete(); + third.onComplete(); + } else { + if (firstOnError) { + third.onError(DELIBERATE_EXCEPTION); + + if (!delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } else { + third.onComplete(); + } + second.onComplete(); + first.onComplete(); + } + + if (!firstOnError) { + subscriber.awaitOnComplete(); + } else if (delayError) { + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java new file mode 100644 index 0000000000..46a5d58402 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeDelayErrorTckTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).mergeDelayError(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.mergeDelayError(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java new file mode 100644 index 0000000000..cdd012ec33 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherMergeTckTest.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Ignore; + +import static io.servicetalk.concurrent.api.Publisher.empty; + +public class PublisherMergeTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher createServiceTalkPublisher(long elements) { + int numElements = TckUtils.requestNToInt(elements); + + if (numElements <= 1) { + return TckUtils.newPublisher(numElements).merge(empty()); + } + + int halfElements = numElements / 2; + + // Calculate the number of elements that will not be emitted by the first publisher so we create another + // one in composePublisher(...) that will emit these. The sum of both should be == elements. + return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements); + } + + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.merge(TckUtils.newPublisher(elements)); + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { + // merge operator proactively requests from upstream, and will not deliver errors until demand comes. + } + + @Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes") + @Override + public void required_spec309_requestZeroMustSignalIllegalArgumentException() { + } +}