Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingStreamingHttpService: drop trailers if users didn't create any #3151

Merged
merged 6 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.ScanMapper;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
Expand Down Expand Up @@ -76,7 +79,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
// still be propagated.
final Processor exceptionProcessor = newCompletableProcessor();
final BufferHttpPayloadWriter payloadWriter = new BufferHttpPayloadWriter(
ctx.headersFactory().newTrailers());
() -> ctx.headersFactory().newTrailers());
DefaultBlockingStreamingHttpServerResponse response = null;
try {
final Consumer<DefaultHttpResponseMetaData> sendMeta = (metaData) -> {
Expand All @@ -102,7 +105,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
Publisher<Object> messageBody = fromSource(exceptionProcessor)
.merge(payloadWriter.connect());
if (addTrailers) {
messageBody = messageBody.concat(succeeded(payloadWriter.trailers()));
messageBody = messageBody.scanWithMapper(() -> new TrailersMapper(payloadWriter));
}
messageBody = messageBody.beforeSubscription(() -> new Subscription() {
@Override
Expand Down Expand Up @@ -169,10 +172,12 @@ public Completable closeAsyncGracefully() {

private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter<>();
private final HttpHeaders trailers;
private final Supplier<HttpHeaders> trailersFactory;
@Nullable
private HttpHeaders trailers;

BufferHttpPayloadWriter(final HttpHeaders trailers) {
this.trailers = trailers;
BufferHttpPayloadWriter(final Supplier<HttpHeaders> trailersFactory) {
this.trailersFactory = trailersFactory;
}

@Override
Expand All @@ -197,11 +202,61 @@ public void close(final Throwable cause) throws IOException {

@Override
public HttpHeaders trailers() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A "getter" method that also creates seems off to me. IDK if there is a better naming scheme where trailers0 is basically just trailers and trailers is getOrCreateTrailers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "lazy get" 😄
We use the same trick in AbstractHttpMetaData.context(). When API is written in stone but we need to defer action.
From users point of view it will be non-visible change.

if (trailers == null) {
trailers = trailersFactory.get();
}
return trailers;
}

@Nullable
HttpHeaders trailers0() {
return trailers;
}

Publisher<Buffer> connect() {
return payloadWriter.connect();
}
}

private static final class TrailersMapper implements ScanMapper<Object, Object>, ScanMapper.MappedTerminal<Object> {
private final BufferHttpPayloadWriter payloadWriter;

TrailersMapper(final BufferHttpPayloadWriter payloadWriter) {
this.payloadWriter = payloadWriter;
}

@Nullable
@Override
public Object mapOnNext(@Nullable final Object next) {
return next;
}

@Nullable
@Override
public MappedTerminal<Object> mapOnError(final Throwable cause) {
return null;
}

@Override
public MappedTerminal<Object> mapOnComplete() {
return this;
}

@Nullable
@Override
public Object onNext() {
return payloadWriter.trailers0();
}

@Override
public boolean onNextValid() {
return payloadWriter.trailers0() != null;
daschl marked this conversation as resolved.
Show resolved Hide resolved
}

@Nullable
@Override
public Throwable terminal() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public <T> HttpRequest payloadBody(final T pojo, final HttpSerializer2<T> serial
public HttpHeaders trailers() {
if (trailers == null) {
trailers = original.payloadHolder().headersFactory().newTrailers();
original.transform(this);
original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seemed odd to me and was one thing I had trouble grokking when I was trying to track this down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 😞
Unfortunately, we currently don't have a nice way to update PayloadInfo flags without invoking transformation that will do it for us. Maybe we refactor in the future.

}
return trailers;
}
Expand Down Expand Up @@ -277,6 +277,9 @@ public StreamingHttpRequest toStreamingRequest() {
@Nullable
final Publisher<Object> payload;
if (trailers != null) {
// We can not drop empty Trailers here bcz users could do type conversion intermediately, while still
// referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers
// before sending the message or converting it back to an aggregated one.
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public StreamingHttpResponse toStreamingResponse() {
@Nullable
final Publisher<Object> payload;
if (trailers != null) {
// We can not drop empty Trailers here bcz users could do type conversion intermediately, while still
// referencing the original HttpHeaders object from an aggregated type and keep using it to add trailers
// before sending the message or converting it back to an aggregated one.
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
Expand All @@ -127,7 +130,7 @@ public BlockingStreamingHttpResponse toBlockingStreamingResponse() {
public HttpHeaders trailers() {
if (trailers == null) {
trailers = original.payloadHolder().headersFactory().newTrailers();
original.transform(this);
original.transform(this); // Invoke "transform" to set PayloadInfo.mayHaveTrailers() flag
}
return trailers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ static Single<PayloadAndTrailers> aggregatePayloadAndTrailers(final DefaultPaylo
if (isAlwaysEmpty(pair.payload)) {
payloadInfo.setEmpty(true);
}
// We can not drop empty Trailers here bcz users could do type conversion intermediately multiple times,
// while still referencing the original HttpHeaders object from an aggregated type and keep using it to add
// trailers before sending the message or converting it back to an aggregated one.
if (pair.trailers == null) {
payloadInfo.setMayHaveTrailersAndGenericTypeBuffer(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,42 @@
*/
public interface HttpPayloadWriter<T> extends PayloadWriter<T>, TrailersHolder {

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
HttpHeaders trailers();

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> addTrailer(final CharSequence name, final CharSequence value) {
TrailersHolder.super.addTrailer(name, value);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> addTrailers(final HttpHeaders trailers) {
TrailersHolder.super.addTrailers(trailers);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> setTrailer(final CharSequence name, final CharSequence value) {
TrailersHolder.super.setTrailer(name, value);
return this;
}

/**
* <b>Note:</b> modifying trailers after the payload writer is {@link #close() closed} is not allowed.
*/
@Override
default HttpPayloadWriter<T> setTrailers(final HttpHeaders trailers) {
TrailersHolder.super.setTrailers(trailers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.oio.api.PayloadWriter;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand Down Expand Up @@ -63,8 +65,10 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.function.Function.identity;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -78,7 +82,8 @@ class BlockingStreamingToStreamingServiceTest {
private static final String HELLO_WORLD = "Hello\nWorld\n";

@RegisterExtension
final ExecutorExtension<Executor> executorExtension = ExecutorExtension.withCachedExecutor();
static final ExecutorExtension<Executor> executorExtension = ExecutorExtension.withCachedExecutor()
.setClassLevel(true);

@Mock
private HttpExecutionContext mockExecutionCtx;
Expand All @@ -93,14 +98,26 @@ void setup() {
mockCtx = new TestHttpServiceContext(DefaultHttpHeadersFactory.INSTANCE, reqRespFactory, mockExecutionCtx);
}

@Test
void defaultResponseStatusNoPayload() throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> response.sendMetaData().close();
@ParameterizedTest(name = "{displayName} [{index}] withEmptyTrailers={0}")
@ValueSource(booleans = {false, true})
void defaultResponseStatusNoPayload(boolean withEmptyTrailers) throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> {
HttpPayloadWriter<Buffer> writer = response.sendMetaData();
if (withEmptyTrailers) {
writer.trailers(); // accessing trailers before close should preserve trailers in message body
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use-case of empty trailers?

Copy link
Member Author

@idelpivnitskiy idelpivnitskiy Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Looking at it with a fresh morning look.

My original thinking was that if users didn't touch trailers at all, then it's absolutely safe to drop them. But if they did, they could do something like:

HttpHeaders trailers = writer.trailers();
....
trailers.add(name, value);

However, if they did that before close(), then we will see that. If they added smth after close(), it's anyway racy and not guaranteed.

Another thing is how we treat trailers in all other places:

  1. DefaultHttpRequest.toStreamingRequest() or DefaultHttpResponse.toStreamingResponse()

@Nullable
final Publisher<Object> payload;
if (trailers != null) {
payload = emptyPayloadBody ? from(trailers) : from(payloadBody, trailers);
} else {
payload = emptyPayloadBody ? null : from(payloadBody);
}

  1. HttpDataSourceTransformations.aggregatePayloadAndTrailers:

}).map(pair -> {
if (isAlwaysEmpty(pair.payload)) {
payloadInfo.setEmpty(true);
}
if (pair.trailers == null) {
payloadInfo.setMayHaveTrailersAndGenericTypeBuffer(false);
}
return pair;
});

Those account for the case when users can grab a reference to trailers, then convert the response and still use that reference to add trailers. Something like:

HttpResponse aggregatedResponse = ...;
HttpHeaders trailers = aggregatedResponse.trailers();
StreamingHttpResponse streamingResponse = aggregatedResponse.toStreamingResponse();
trailers.add(name, value);
return Single.succeeded(streamingResponse);

However, now I see that BlockingStreamingHttpServerResponse doesn't have conversion methods at all, and this trick won't be possible with HttpPayloadWriter anyway bcz we already sent meta-data.

The only real reason to keep it as a null check instead of "null or empty" is to keep it consistent with the above 2 cases. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping the cases consistent makes sense but if there is a way to simplify I would prefer that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for starting in a consistent way and if we feel different, we can update them all together

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation, makes sense to me to keep it consistent for now 👍

}
writer.close();
writer.trailers(); // accessing trailers after close should not modify output
};

List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(OK, response);
assertPayloadBody("", response, false);
assertEmptyTrailers(response);
if (withEmptyTrailers) {
assertEmptyTrailers(response);
} else {
assertNoTrailers(response);
}
}

@Test
Expand All @@ -111,7 +128,7 @@ void customResponseStatusNoPayload() throws Exception {
List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(NO_CONTENT, response);
assertPayloadBody("", response, false);
assertEmptyTrailers(response);
assertNoTrailers(response);
}

@Test
Expand All @@ -127,24 +144,34 @@ void receivePayloadBody() throws Exception {
.payloadBody(from("Hello\n", "World\n"), appSerializerUtf8FixLen()));
assertMetaData(OK, response);
assertPayloadBody("", response, true);
assertEmptyTrailers(response);
assertNoTrailers(response);

assertThat(receivedPayload.toString(), is(HELLO_WORLD));
}

@Test
void respondWithPayloadBody() throws Exception {
@ParameterizedTest(name = "{displayName} [{index}] withEmptyTrailers={0}")
@ValueSource(booleans = {false, true})
void respondWithPayloadBody(boolean withEmptyTrailers) throws Exception {
BlockingStreamingHttpService syncService = (ctx, request, response) -> {
try (PayloadWriter<Buffer> pw = response.sendMetaData()) {
pw.write(ctx.executionContext().bufferAllocator().fromAscii("Hello\n"));
pw.write(ctx.executionContext().bufferAllocator().fromAscii("World\n"));
BufferAllocator alloc = ctx.executionContext().bufferAllocator();
HttpPayloadWriter<Buffer> writer = response.sendMetaData();
writer.write(alloc.fromAscii("Hello\n"));
if (withEmptyTrailers) {
writer.trailers(); // accessing trailers before close should preserve trailers in message body
}
writer.write(alloc.fromAscii("World\n"));
writer.close();
writer.trailers(); // accessing trailers after close should not modify output
};

List<Object> response = invokeService(syncService, reqRespFactory.get("/"));
assertMetaData(OK, response);
assertPayloadBody(HELLO_WORLD, response, false);
assertEmptyTrailers(response);
if (withEmptyTrailers) {
assertEmptyTrailers(response);
} else {
assertNoTrailers(response);
}
}

@Test
Expand Down Expand Up @@ -533,6 +560,10 @@ private static void assertPayloadBody(String expectedPayloadBody, List<Object> r
assertThat(payloadBody, is(expectedPayloadBody));
}

private static void assertNoTrailers(List<Object> response) {
assertThat(response, not(containsInAnyOrder(instanceOf(HttpHeaders.class))));
}

private static void assertEmptyTrailers(List<Object> response) {
HttpHeaders trailers = (HttpHeaders) response.get(response.size() - 1);
assertThat(trailers, is(notNullValue()));
Expand Down
Loading