Skip to content

Commit

Permalink
Fix NullPointerException in FuseableStreamMessage (#4859)
Browse files Browse the repository at this point in the history
Related issue: #4852

Motivation:

`FuseableStreamMessage.collect()` throws a `NPE` when only `mapError()`
is applied to a normal `StreamMessage`.
```java
StreamMessage.of(1, 2, 3, 4)
             .mapError(IllegalStateException::new)
             // Raises a NullPointerException
             .collect().join();
```

The `function` field in `FuseableStreamMessage` can be null if no
`map()` is applied to `FuseableStreamMessage`.

https://github.com/line/armeria/blob/871d87297e4d051241589cb1ae95641cbc83f880/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java#L162

The problem only happens when `collect()` method is called.
`subscribe()` method already takes the case into account.

Modifications:

- Use the original object as is if `function` is null.

Result:

- You no longer see `NullPointerException` when `.mapError()` is applied
to `StreamMessage`
- Closes #4852
  • Loading branch information
ikhoon authored May 8, 2023
1 parent 1def296 commit 92e7017
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private FuseableStreamMessage(StreamMessage<? extends T> source,
@Nullable MapperFunction<T, U> function,
@Nullable Function<? super Throwable, ? extends Throwable> errorFunction) {
requireNonNull(source, "source");
assert function != null || errorFunction != null;
assert (function != null && errorFunction == null) || (function == null && errorFunction != null)
: "function and errorFunction should be mutually exclusive";

source = peel(source);
if (source instanceof FuseableStreamMessage) {
Expand Down Expand Up @@ -106,7 +107,6 @@ private StreamMessage<? extends T> peel(StreamMessage<? extends T> source) {
}

do {
//noinspection unchecked
source = ((NonOverridableStreamMessageWrapper<? extends T, ?>) source).delegate();
} while (source instanceof NonOverridableStreamMessageWrapper);

Expand Down Expand Up @@ -159,7 +159,15 @@ public CompletableFuture<List<U>> collect(EventExecutor executor, SubscriptionOp
}

try {
U result = function.apply(obj);
U result;
if (function == null) {
// no transformation function is set. Use the object as is.
//noinspection unchecked
result = (U) obj;
} else {
result = function.apply(obj);
}

if (result != null) {
result = StreamMessageUtil.touchOrCopyAndClose(result, withPooledObjects);
builder.add(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ void mapError() {
.verify();
}

@Test
void mapErrorWithNoError() {
StreamMessage<Integer> fixed = StreamMessage.of(1, 2, 3, 4);
StreamMessage<Integer> mapped = fixed.mapError(IllegalStateException::new);
// Test subscribe()
StepVerifier.create(mapped)
.expectNext(1, 2, 3, 4)
.expectComplete()
.verify();

fixed = StreamMessage.of(1, 2, 3, 4, 5);
mapped = fixed.mapError(IllegalStateException::new);
// Test collect()
assertThat(mapped.collect().join()).containsExactly(1, 2, 3, 4, 5);
}

@CsvSource({ "true", "false" })
@ParameterizedTest
void mapWithPooledObjects_collect(boolean withPooledObjects) {
Expand Down

0 comments on commit 92e7017

Please sign in to comment.