Skip to content

Commit

Permalink
Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit) (
Browse files Browse the repository at this point in the history
#3128)

* Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit)

This makes sure that if this method, or in extend next(long, TimeUnit) is
called and times out, the caller is able to retry the operation and also
the upstream source will not be cancelled.

In the context of a Blocking Streaming server, this means that if a timeout
is thrown on the incoming request, the outgoing response can still be modified
since the underlying socket will not be immediately closed.

It also aligns the semantics with Single#toFuture where a blocking get with
a timeout on the future also does not cancel the upstream Single.

A (temporary) system property is introduced which allows to fall back to the
old behavior should incompatibilities be discovered in the wild.

A note for the reader who wonders how to close the subscription now: the close()
method always did cancel the subscription and continues to do so.
  • Loading branch information
daschl authored Dec 16, 2024
1 parent b4be7d1 commit 85fd41b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public BlockingIterator<T> iterator() {
}

private static final class SubscriberAndIterator<T> implements Subscriber<T>, BlockingIterator<T> {
/**
* Allows to re-enable cancelling the subscription on {@link #hasNext(long, TimeUnit)} timeout. This flag
* will be removed after a couple releases and no issues identified with the new behavior.
*/
private static final boolean CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT = Boolean
.getBoolean("io.servicetalk.concurrent.api.cancelSubscriptionOnHasNextTimeout");
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberAndIterator.class);
private static final Object CANCELLED_SIGNAL = new Object();
private static final TerminalNotification COMPLETE_NOTIFICATION = complete();
Expand Down Expand Up @@ -172,7 +178,9 @@ public boolean hasNext(final long timeout, final TimeUnit unit) throws TimeoutEx
next = data.poll(timeout, unit);
if (next == null) {
terminated = true;
subscription.cancel();
if (CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT) {
subscription.cancel();
}
throw new TimeoutException("timed out after: " + timeout + " units: " + unit);
}
requestMoreIfRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void errorEmittedIsThrown() {
DeliberateException de = new DeliberateException();
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -80,7 +80,7 @@ void doubleHashNextWithError() {
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertThat("Second hasNext inconsistent with first.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -93,7 +93,7 @@ void hasNextWithEmpty() {
void nextWithEmpty() {
Iterator<Integer> iterator = Publisher.<Integer>empty().toIterable().iterator();
assertThat("Item not expected but found.", iterator.hasNext(), is(false));
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand All @@ -109,7 +109,10 @@ void hasNextWithTimeout() throws Exception {

assertThrows(TimeoutException.class, () -> iterator.hasNext(10, MILLISECONDS));
assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand All @@ -124,9 +127,11 @@ void nextWithTimeout() throws Exception {
assertThat("Unexpected item found.", iterator.next(-1, MILLISECONDS), is(2));

assertThrows(TimeoutException.class, () -> iterator.next(10, MILLISECONDS));

assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand Down Expand Up @@ -173,7 +178,7 @@ void nextWithoutHasNextAndTerminal() {
source.onNext(2);
assertThat("Unexpected item found.", iterator.next(), is(2));
source.onComplete();
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand Down Expand Up @@ -234,7 +239,7 @@ void delayOnNextThenError() {
DeliberateException de = new DeliberateException();
source.onError(de);
assertThat("Item not expected but found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, is(de));
}

Expand Down Expand Up @@ -310,7 +315,7 @@ void queueFullButAccommodatesOnError() {
source.onError(de);
verifyNextIs(iterator, 1);
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, sameInstance(de));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
@Override
BlockingIterator<T> iterator();

@Override
default void forEach(final Consumer<? super T> action) {
try (BlockingIterator<T> iterator = iterator()) {
while (iterator.hasNext()) {
action.accept(iterator.next());
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Mimics the behavior of {@link #forEach(Consumer)} but uses the {@code timeoutSupplier} to determine the timeout
* value for interactions with the {@link BlockingIterator}.
Expand All @@ -64,9 +77,14 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, TimeUnit unit)
throws TimeoutException {
requireNonNull(action);
BlockingIterator<T> iterator = iterator();
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
try (BlockingIterator<T> iterator = iterator()) {
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

Expand Down Expand Up @@ -96,18 +114,24 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
*/
default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) throws TimeoutException {
requireNonNull(action);
BlockingIterator<T> iterator = iterator();
long remainingTimeoutNanos = unit.toNanos(timeout);
long timeStampANanos = nanoTime();
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
final long timeStampBNanos = nanoTime();
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout of
// <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a fallback value.
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));
try (BlockingIterator<T> iterator = iterator()) {
long remainingTimeoutNanos = unit.toNanos(timeout);
long timeStampANanos = nanoTime();
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
final long timeStampBNanos = nanoTime();
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout
// of <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a
// fallback value.
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));

timeStampANanos = nanoTime();
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
timeStampANanos = nanoTime();
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

Expand Down

0 comments on commit 85fd41b

Please sign in to comment.