Skip to content

Commit

Permalink
BlockingStreamingHttpService: drop trailers if users didn't create any
Browse files Browse the repository at this point in the history
  • Loading branch information
idelpivnitskiy committed Dec 19, 2024
1 parent e4d4418 commit 39658f9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.ScanMapper;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
Expand Down Expand Up @@ -76,7 +80,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
// still be propagated.
final Processor exceptionProcessor = newCompletableProcessor();
final BufferHttpPayloadWriter payloadWriter = new BufferHttpPayloadWriter(
ctx.headersFactory().newTrailers());
() -> ctx.headersFactory().newTrailers());
DefaultBlockingStreamingHttpServerResponse response = null;
try {
final Consumer<DefaultHttpResponseMetaData> sendMeta = (metaData) -> {
Expand All @@ -102,7 +106,7 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
Publisher<Object> messageBody = fromSource(exceptionProcessor)
.merge(payloadWriter.connect());
if (addTrailers) {
messageBody = messageBody.concat(succeeded(payloadWriter.trailers()));
messageBody = messageBody.scanWithMapper(() -> new TrailersMapper(payloadWriter));
}
messageBody = messageBody.beforeSubscription(() -> new Subscription() {
@Override
Expand Down Expand Up @@ -169,10 +173,12 @@ public Completable closeAsyncGracefully() {

private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter<>();
private final HttpHeaders trailers;
private final Supplier<HttpHeaders> trailersFactory;
@Nullable
private HttpHeaders trailers;

BufferHttpPayloadWriter(final HttpHeaders trailers) {
this.trailers = trailers;
BufferHttpPayloadWriter(final Supplier<HttpHeaders> trailersFactory) {
this.trailersFactory = trailersFactory;
}

@Override
Expand All @@ -197,11 +203,61 @@ public void close(final Throwable cause) throws IOException {

@Override
public HttpHeaders trailers() {
if (trailers == null) {
trailers = trailersFactory.get();
}
return trailers;
}

@Nullable
HttpHeaders trailers0() {
return trailers;
}

Publisher<Buffer> connect() {
return payloadWriter.connect();
}
}

private static final class TrailersMapper implements ScanMapper<Object, Object>, ScanMapper.MappedTerminal<Object> {
private final BufferHttpPayloadWriter payloadWriter;

TrailersMapper(final BufferHttpPayloadWriter payloadWriter) {
this.payloadWriter = payloadWriter;
}

@Nullable
@Override
public Object mapOnNext(@Nullable final Object next) {
return next;
}

@Nullable
@Override
public MappedTerminal<Object> mapOnError(final Throwable cause) {
return null;
}

@Override
public MappedTerminal<Object> mapOnComplete() {
return this;
}

@Nullable
@Override
public Object onNext() {
return payloadWriter.trailers0();
}

@Override
public boolean onNextValid() {
return payloadWriter.trailers0() != null;
}

@Nullable
@Override
public Throwable terminal() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public interface HttpPayloadWriter<T> extends PayloadWriter<T>, TrailersHolder {

// TODO: clarify in javadoc that trailers can not be modified after the writer is closed

@Override
default HttpPayloadWriter<T> addTrailer(final CharSequence name, final CharSequence value) {
TrailersHolder.super.addTrailer(name, value);
Expand Down

0 comments on commit 39658f9

Please sign in to comment.