Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Mar 7, 2023
1 parent 152aff5 commit a5cb584
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1548,10 +1548,10 @@ public final <R> Publisher<R> flatMapConcatIterable(Function<? super T, ? extend
* @param other The {@link Publisher} to merge with.
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #mergeWithDelayError(Publisher)
* @see #merge(Publisher[])
* @see #mergeDelayError(Publisher)
* @see #mergeAll(Publisher[])
*/
public final Publisher<T> mergeWith(Publisher<? extends T> other) {
public final Publisher<T> merge(Publisher<? extends T> other) {
return from(this, other).flatMapMerge(identity(), 2);
}

Expand Down Expand Up @@ -1587,10 +1587,10 @@ public final Publisher<T> mergeWith(Publisher<? extends T> other) {
* @param other The {@link Publisher} to merge with,
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #mergeWith(Publisher)
* @see #mergeDelayError(Publisher[])
* @see #merge(Publisher)
* @see #mergeAllDelayError(Publisher[])
*/
public final Publisher<T> mergeWithDelayError(Publisher<? extends T> other) {
public final Publisher<T> mergeDelayError(Publisher<? extends T> other) {
return from(this, other).flatMapMergeDelayError(identity(), 2, 2);
}

Expand Down Expand Up @@ -4114,11 +4114,90 @@ public static <T> Publisher<T> defer(Supplier<? extends Publisher<? extends T>>
* @param <T> Type of items emitted by the returned {@link Publisher}.
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #mergeDelayError(Publisher[])
* @see #mergeAll(int, Publisher[])
* @see #mergeAllDelayError(Publisher[])
*/
@SafeVarargs
public static <T> Publisher<T> merge(Publisher<? extends T>... publishers) {
return from(publishers).flatMapMerge(identity(), publishers.length);
public static <T> Publisher<T> mergeAll(Publisher<? extends T>... publishers) {
return from(publishers).flatMapMerge(identity());
}

/**
* Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned
* {@link Publisher}.
* <p>
* This method provides similar capabilities as expanding each result into a collection and concatenating each
* collection in sequential programming:
* <pre>{@code
* List<T> mergedResults = ...; // concurrent safe list
* for (T t : resultOfPublisher1()) {
* futures.add(e.submit(() -> {
* return mergedResults.add(t);
* }));
* }
* for (T t : resultOfOtherPublisher()) {
* futures.add(e.submit(() -> {
* return mergedResults.add(t);
* }));
* }
* for (Future<R> future : futures) {
* future.get(); // Throws if the processing for this item failed.
* }
* return mergedResults;
* }</pre>
* @param maxConcurrency The maximum amount of {@link Publisher}s from {@code publishers} to subscribe to
* concurrently.
* @param publishers The {@link Publisher}s to merge together.
* @param <T> Type of items emitted by the returned {@link Publisher}.
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #mergeAllDelayError(int, Publisher[])
*/
@SafeVarargs
public static <T> Publisher<T> mergeAll(int maxConcurrency, Publisher<? extends T>... publishers) {
return from(publishers).flatMapMerge(identity(), maxConcurrency);
}

/**
* Merge all {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned
* {@link Publisher}. If any {@link Publisher} terminates in an error, the error propagation will be delayed until
* all terminate.
* <p>
* This method provides similar capabilities as expanding each result into a collection and concatenating each
* collection in sequential programming:
* <pre>{@code
* List<T> mergedResults = ...; // concurrent safe list
* for (T t : resultOfPublisher1()) {
* futures.add(e.submit(() -> {
* return mergedResults.add(t);
* }));
* }
* for (T t : resultOfOtherPublisher()) {
* futures.add(e.submit(() -> {
* return mergedResults.add(t);
* }));
* }
* List<Throwable> errors = ...;
* for (Future<R> future : futures) {
* try {
* future.get(); // Throws if the processing for this item failed.
* } catch (Throwable cause) {
* errors.add(cause);
* }
* }
* throwExceptionIfNotEmpty(errors);
* return mergedResults;
* }</pre>
* @param publishers The {@link Publisher}s to merge together.
* @param <T> Type of items emitted by the returned {@link Publisher}.
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #mergeAllDelayError(int, Publisher[])
* @see #mergeAll(Publisher[])
*/
@SafeVarargs
public static <T> Publisher<T> mergeAllDelayError(Publisher<? extends T>... publishers) {
return from(publishers).flatMapMergeDelayError(identity());
}

/**
Expand Down Expand Up @@ -4155,11 +4234,11 @@ public static <T> Publisher<T> merge(Publisher<? extends T>... publishers) {
* @param <T> Type of items emitted by the returned {@link Publisher}.
* @return A {@link Publisher} which is the result of this {@link Publisher} and {@code other} merged together.
* @see <a href="https://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator</a>
* @see #merge(Publisher[])
* @see #mergeAll(Publisher[])
*/
@SafeVarargs
public static <T> Publisher<T> mergeDelayError(Publisher<? extends T>... publishers) {
return from(publishers).flatMapMergeDelayError(identity(), publishers.length, publishers.length);
public static <T> Publisher<T> mergeAllDelayError(int maxConcurrency, Publisher<? extends T>... publishers) {
return from(publishers).flatMapMergeDelayError(identity(), maxConcurrency);
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock;
import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue;
Expand Down Expand Up @@ -175,6 +176,10 @@ public void request(final long n) {
incMappedDemand(n);
} else {
subscription.request(n);
// If the upstream source has already sent an onComplete signal, it won't be able to send an error.
// We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to
// ensure we see an error.
enqueueAndDrain(error(newExceptionForInvalidRequestN(n)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock;
import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedMpscQueue;
Expand Down Expand Up @@ -161,6 +162,10 @@ public void request(long n) {
}
} else {
subscription.request(n);
// If the upstream source has already sent an onComplete signal, it won't be able to send an error.
// We propagate invalid demand upstream to clean-up upstream (if necessary) and force an error here to
// ensure we see an error.
enqueueAndDrain(error(newExceptionForInvalidRequestN(n)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.ArrayList;
import java.util.List;

import static io.servicetalk.concurrent.api.Publisher.merge;
import static io.servicetalk.concurrent.api.Publisher.mergeDelayError;
import static io.servicetalk.concurrent.api.Publisher.mergeAll;
import static io.servicetalk.concurrent.api.Publisher.mergeAllDelayError;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -66,7 +66,7 @@ private static Iterable<Arguments> completeSource() {
@ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}")
@MethodSource("completeSource")
void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) {
toSource(delayError ? first.mergeWithDelayError(second) : first.mergeWith(second)).subscribe(subscriber);
toSource(delayError ? first.mergeDelayError(second) : first.merge(second)).subscribe(subscriber);
subscriber.awaitSubscription().request(2);
int i = 3;
int j = 4;
Expand Down Expand Up @@ -115,7 +115,7 @@ void bothComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean first
@ParameterizedTest(name = "inOrderOnNext={0} inOrderTerminate={1} firstOnError={2} delayError={3}")
@MethodSource("completeSource")
void allComplete(boolean inOrderOnNext, boolean inOrderTerminate, boolean firstOnError, boolean delayError) {
toSource(delayError ? mergeDelayError(first, second, third) : merge(first, second, third))
toSource(delayError ? mergeAllDelayError(first, second, third) : mergeAll(first, second, third))
.subscribe(subscriber);
subscriber.awaitSubscription().request(3);
int i = 3;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;

import org.testng.annotations.Ignore;

import static io.servicetalk.concurrent.api.Publisher.empty;

public class PublisherMergeDelayErrorTckTest extends AbstractPublisherOperatorTckTest<Integer> {
@Override
protected Publisher<Integer> createServiceTalkPublisher(long elements) {
int numElements = TckUtils.requestNToInt(elements);

if (numElements <= 1) {
return TckUtils.newPublisher(numElements).mergeDelayError(empty());
}

int halfElements = numElements / 2;

// Calculate the number of elements that will not be emitted by the first publisher so we create another
// one in composePublisher(...) that will emit these. The sum of both should be == elements.
return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements);
}

@Override
protected Publisher<Integer> composePublisher(final Publisher<Integer> publisher, final int elements) {
return publisher.mergeDelayError(TckUtils.newPublisher(elements));
}

@Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes")
@Override
public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
}

@Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes")
@Override
public void required_spec309_requestZeroMustSignalIllegalArgumentException() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;

import io.servicetalk.concurrent.api.Publisher;

import org.testng.annotations.Ignore;

import static io.servicetalk.concurrent.api.Publisher.empty;

public class PublisherMergeTckTest extends AbstractPublisherOperatorTckTest<Integer> {
@Override
protected Publisher<Integer> createServiceTalkPublisher(long elements) {
int numElements = TckUtils.requestNToInt(elements);

if (numElements <= 1) {
return TckUtils.newPublisher(numElements).merge(empty());
}

int halfElements = numElements / 2;

// Calculate the number of elements that will not be emitted by the first publisher so we create another
// one in composePublisher(...) that will emit these. The sum of both should be == elements.
return composePublisher(TckUtils.newPublisher(halfElements), numElements - halfElements);
}

@Override
protected Publisher<Integer> composePublisher(final Publisher<Integer> publisher, final int elements) {
return publisher.merge(TckUtils.newPublisher(elements));
}

@Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes")
@Override
public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
// merge operator proactively requests from upstream, and will not deliver errors until demand comes.
}

@Ignore("merge operator proactively requests from upstream, and will not deliver errors until demand comes")
@Override
public void required_spec309_requestZeroMustSignalIllegalArgumentException() {
}
}

This file was deleted.

This file was deleted.

0 comments on commit a5cb584

Please sign in to comment.