diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java index cdfcd0413d8..bd11a9ca3a3 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java @@ -67,7 +67,8 @@ private FuseableStreamMessage(StreamMessage source, @Nullable MapperFunction function, @Nullable Function errorFunction) { requireNonNull(source, "source"); - assert function != null || errorFunction != null; + assert (function != null && errorFunction == null) || (function == null && errorFunction != null) + : "function and errorFunction should be mutually exclusive"; source = peel(source); if (source instanceof FuseableStreamMessage) { @@ -106,7 +107,6 @@ private StreamMessage peel(StreamMessage source) { } do { - //noinspection unchecked source = ((NonOverridableStreamMessageWrapper) source).delegate(); } while (source instanceof NonOverridableStreamMessageWrapper); @@ -159,7 +159,15 @@ public CompletableFuture> collect(EventExecutor executor, SubscriptionOp } try { - U result = function.apply(obj); + U result; + if (function == null) { + // no transformation function is set. Use the object as is. + //noinspection unchecked + result = (U) obj; + } else { + result = function.apply(obj); + } + if (result != null) { result = StreamMessageUtil.touchOrCopyAndClose(result, withPooledObjects); builder.add(result); diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/FuseableStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/FuseableStreamMessageTest.java index 6881b06bfad..78955420292 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/FuseableStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/FuseableStreamMessageTest.java @@ -241,6 +241,22 @@ void mapError() { .verify(); } + @Test + void mapErrorWithNoError() { + StreamMessage fixed = StreamMessage.of(1, 2, 3, 4); + StreamMessage mapped = fixed.mapError(IllegalStateException::new); + // Test subscribe() + StepVerifier.create(mapped) + .expectNext(1, 2, 3, 4) + .expectComplete() + .verify(); + + fixed = StreamMessage.of(1, 2, 3, 4, 5); + mapped = fixed.mapError(IllegalStateException::new); + // Test collect() + assertThat(mapped.collect().join()).containsExactly(1, 2, 3, 4, 5); + } + @CsvSource({ "true", "false" }) @ParameterizedTest void mapWithPooledObjects_collect(boolean withPooledObjects) {