From 09c3593169c9fff0645134542bc7ee8f6fb66205 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 3 Oct 2023 21:55:35 -0700 Subject: [PATCH] Add Publisher.onCompleteError Motivation: There are some scenarios where a stream terminating with onComplete is not expected, and translating to an error simplifies recovery. --- .../api/OnCompleteErrorPublisher.java | 77 +++++++++++++++++++ .../servicetalk/concurrent/api/Publisher.java | 22 ++++++ .../api/OnCompleteErrorPublisherTest.java | 65 ++++++++++++++++ .../tck/PublisherOnCompleteErrorTckTest.java | 28 +++++++ 4 files changed, 192 insertions(+) create mode 100644 servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisher.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisherTest.java create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherOnCompleteErrorTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisher.java new file mode 100644 index 0000000000..7943874172 --- /dev/null +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisher.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import java.util.function.Supplier; +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +final class OnCompleteErrorPublisher extends AbstractSynchronousPublisherOperator { + private final Supplier errorSupplier; + + OnCompleteErrorPublisher(final Publisher original, final Supplier errorSupplier) { + super(original); + this.errorSupplier = requireNonNull(errorSupplier); + } + + @Override + public Subscriber apply(final Subscriber subscriber) { + return new OnCompleteErrorSubscriber<>(subscriber, errorSupplier); + } + + private static final class OnCompleteErrorSubscriber implements Subscriber { + private final Subscriber subscriber; + private final Supplier errorSupplier; + + private OnCompleteErrorSubscriber(final Subscriber subscriber, + final Supplier errorSupplier) { + this.subscriber = subscriber; + this.errorSupplier = errorSupplier; + } + + @Override + public void onSubscribe(final Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(@Nullable final T t) { + subscriber.onNext(t); + } + + @Override + public void onError(final Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + final Throwable cause; + try { + cause = errorSupplier.get(); + } catch (Throwable cause2) { + subscriber.onError(cause2); + return; + } + if (cause == null) { + subscriber.onComplete(); + } else { + subscriber.onError(cause); + } + } + } +} diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 26d3519370..f8cbb05870 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -561,6 +561,28 @@ public final Publisher onErrorReturn( return onErrorReturn(type::isInstance, rawSupplier); } + /** + * Transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into + * {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}). + *

+ * This method provides a data transformation in sequential programming similar to: + *

{@code
+     *     List results = resultOfThisPublisher();
+     *     terminalOfThisPublisher();
+     *     Throwable cause = errorSupplier.get()
+     *     if (cause != null) {
+     *       throw cause;
+     *     }
+     * }
+ * @param errorSupplier returns the error to emit to {@link Subscriber#onError(Throwable)}. if the return value + * is {@code null} then complete with {@link Subscriber#onComplete()}. + * @return A {@link Publisher} which transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into + * {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}). + */ + public final Publisher onCompleteError(final Supplier errorSupplier) { + return new OnCompleteErrorPublisher<>(this, errorSupplier); + } + /** * Transform errors emitted on this {@link Publisher} which match {@code predicate} into * {@link Subscriber#onNext(Object)} then {@link Subscriber#onComplete()} signals (e.g. swallows the error). diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisherTest.java new file mode 100644 index 0000000000..625288cb29 --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/OnCompleteErrorPublisherTest.java @@ -0,0 +1,65 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +final class OnCompleteErrorPublisherTest { + private final TestPublisherSubscriber subscriber = new TestPublisherSubscriber<>(); + + @Test + void errorPassThrough() { + toSource(Publisher.failed(DELIBERATE_EXCEPTION) + .onCompleteError(() -> new IllegalStateException("shouldn't get here")) + ).subscribe(subscriber); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + + @Test + void nullCompletes() { + toSource(Publisher.empty() + .onCompleteError(() -> null) + ).subscribe(subscriber); + subscriber.awaitOnComplete(); + } + + @ParameterizedTest(name = "{displayName} [{index}] shouldThrow={0}") + @ValueSource(booleans = {false, true}) + void completeToError(boolean shouldThrow) { + toSource(from(1) + .onCompleteError(() -> { + if (shouldThrow) { + throw DELIBERATE_EXCEPTION; + } + return DELIBERATE_EXCEPTION; + }) + ).subscribe(subscriber); + subscriber.awaitSubscription().request(1); + assertThat(subscriber.takeOnNext(), equalTo(1)); + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherOnCompleteErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherOnCompleteErrorTckTest.java new file mode 100644 index 0000000000..4d91e970f6 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherOnCompleteErrorTckTest.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +@Test +public class PublisherOnCompleteErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(final Publisher publisher, final int elements) { + return publisher.onCompleteError(() -> null); + } +}