Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8335181: Incorrect handling of HTTP/2 GOAWAY frames in HttpClient #1020

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -58,6 +58,10 @@ abstract class ExchangeImpl<T> {

final Exchange<T> exchange;

// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
private volatile boolean unprocessedByPeer;

ExchangeImpl(Exchange<T> e) {
// e == null means a http/2 pushed stream
this.exchange = e;
Expand Down Expand Up @@ -264,4 +268,13 @@ void upgraded() { }
// Called when server returns non 100 response to
// an Expect-Continue
void expectContinueFailed(int rcode) { }

final boolean isUnprocessedByPeer() {
return this.unprocessedByPeer;
}

// Marks the exchange as unprocessed by the peer
final void markUnprocessedByPeer() {
this.unprocessedByPeer = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -396,6 +397,7 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro
private final String key; // for HttpClientImpl.connections map
private final FramesDecoder framesDecoder;
private final FramesEncoder framesEncoder = new FramesEncoder();
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);

/**
* Send Window controller for both connection and stream windows.
Expand Down Expand Up @@ -802,7 +804,9 @@ final int maxConcurrentServerInitiatedStreams() {

void close() {
if (markHalfClosedLocal()) {
if (connection.channel().isOpen()) {
// we send a GOAWAY frame only if the remote side hasn't already indicated
// the intention to close the connection by previously sending a GOAWAY of its own
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0,
ErrorFrame.NO_ERROR,
Expand Down Expand Up @@ -1354,13 +1358,46 @@ private void handlePing(PingFrame frame)
sendUnorderedFrame(frame);
}

private void handleGoAway(GoAwayFrame frame)
throws IOException
{
if (markHalfClosedLRemote()) {
shutdown(new IOException(
connection.channel().getLocalAddress()
+ ": GOAWAY received"));
private void handleGoAway(final GoAwayFrame frame) {
final long lastProcessedStream = frame.getLastStream();
assert lastProcessedStream >= 0 : "unexpected last stream id: "
+ lastProcessedStream + " in GOAWAY frame";

markHalfClosedRemote();
setFinalStream(); // don't allow any new streams on this connection
if (debug.on()) {
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
lastProcessedStream, frame);
}
// see if this connection has previously received a GOAWAY from the peer and if yes
// then check if this new last processed stream id is lesser than the previous
// known last processed stream id. Only update the last processed stream id if the new
// one is lesser than the previous one.
long prevLastProcessed = lastProcessedStreamInGoAway.get();
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
lastProcessedStream)) {
break;
}
prevLastProcessed = lastProcessedStreamInGoAway.get();
}
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
}

private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
streams.forEach((id, exchange) -> {
if (id > lastProcessedStream) {
// any streams with an stream id higher than the last processed stream
// can be retried (on a new connection). we close the exchange as unprocessed
// to facilitate the retrying.
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
numClosed.incrementAndGet();
}
});
if (debug.on()) {
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
+ ", will be closed as unprocessed");
}
}

Expand Down Expand Up @@ -1911,7 +1948,7 @@ private boolean markHalfClosedLocal() {
return markClosedState(HALF_CLOSED_LOCAL);
}

private boolean markHalfClosedLRemote() {
private boolean markHalfClosedRemote() {
return markClosedState(HALF_CLOSED_REMOTE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -90,8 +90,8 @@ class MultiExchange<T> implements Cancelable {
Exchange<T> exchange; // the current exchange
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;
volatile boolean retriedOnce;
volatile HttpResponse<T> response;

// Maximum number of times a request will be retried/redirected
// for any reason
Expand Down Expand Up @@ -436,7 +436,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
return exch.ignoreBody().handle((r,t) -> {
previousreq = currentreq;
currentreq = newrequest;
expiredOnce = false;
retriedOnce = false;
setExchange(new Exchange<>(currentreq, this, acc));
return responseAsyncImpl();
}).thenCompose(Function.identity());
Expand All @@ -449,7 +449,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
return completedFuture(response);
}
// all exceptions thrown are handled here
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
CompletableFuture<Response> errorCF = getExceptionalCF(ex, exch.exchImpl);
if (errorCF == null) {
return responseAsyncImpl();
} else {
Expand Down Expand Up @@ -521,34 +521,39 @@ private Throwable retryCause(Throwable t) {
* Takes a Throwable and returns a suitable CompletableFuture that is
* completed exceptionally, or null.
*/
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?> exchImpl) {
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
if (t.getCause() != null) {
t = t.getCause();
}
}
final boolean retryAsUnprocessed = exchImpl != null && exchImpl.isUnprocessedByPeer();
if (cancelled && !requestCancelled() && t instanceof IOException) {
if (!(t instanceof HttpTimeoutException)) {
t = toTimeoutException((IOException)t);
}
} else if (retryOnFailure(t)) {
} else if (retryAsUnprocessed || retryOnFailure(t)) {
Throwable cause = retryCause(t);

if (!(t instanceof ConnectException)) {
// we may need to start a new connection, and if so
// we want to start with a fresh connect timeout again.
if (connectTimeout != null) connectTimeout.reset();
if (!canRetryRequest(currentreq)) {
return failedFuture(cause); // fails with original cause
if (!retryAsUnprocessed && !canRetryRequest(currentreq)) {
// a (peer) processed request which cannot be retried, fail with
// the original cause
return failedFuture(cause);
}
} // ConnectException: retry, but don't reset the connectTimeout.

// allow the retry mechanism to do its work
retryCause = cause;
if (!expiredOnce) {
if (debug.on())
debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
expiredOnce = true;
if (!retriedOnce) {
if (debug.on()) {
debug.log(t.getClass().getSimpleName()
+ " (async): retrying " + currentreq + " due to: ", t);
}
retriedOnce = true;
// The connection was abruptly closed.
// We return null to retry the same request a second time.
// The request filters have already been applied to the
Expand All @@ -559,7 +564,7 @@ private CompletableFuture<Response> getExceptionalCF(Throwable t) {
} else {
if (debug.on()) {
debug.log(t.getClass().getSimpleName()
+ " (async): already retried once.", t);
+ " (async): already retried once " + currentreq, t);
}
t = cause;
}
Expand Down
67 changes: 57 additions & 10 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -672,20 +672,39 @@ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
stateLock.unlock();
}
try {
int error = frame.getErrorCode();
IOException e = new IOException("Received RST_STREAM: "
+ ErrorFrame.stringForCode(error));
if (errorRef.compareAndSet(null, e)) {
if (subscriber != null) {
subscriber.onError(e);
final int error = frame.getErrorCode();
// A REFUSED_STREAM error code implies that the stream wasn't processed by the
// peer and the client is free to retry the request afresh.
if (error == ErrorFrame.REFUSED_STREAM) {
// Here we arrange for the request to be retried. Note that we don't call
// closeAsUnprocessed() method here because the "closed" state is already set
// to true a few lines above and calling close() from within
// closeAsUnprocessed() will end up being a no-op. We instead do the additional
// bookkeeping here.
markUnprocessedByPeer();
errorRef.compareAndSet(null, new IOException("request not processed by peer"));
if (debug.on()) {
debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
}
} else {
final String reason = ErrorFrame.stringForCode(error);
final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
if (debug.on()) {
debug.log(streamid + " received RST_STREAM with code: " + reason);
}
if (errorRef.compareAndSet(null, failureCause)) {
if (subscriber != null) {
subscriber.onError(failureCause);
}
}
}
completeResponseExceptionally(e);
final Throwable failureCause = errorRef.get();
completeResponseExceptionally(failureCause);
if (!requestBodyCF.isDone()) {
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
}
if (responseBodyCF != null) {
responseBodyCF.completeExceptionally(errorRef.get());
responseBodyCF.completeExceptionally(failureCause);
}
} finally {
connection.decrementStreamsCount(streamid);
Expand Down Expand Up @@ -1698,7 +1717,35 @@ Throwable getCancelCause() {
}

final String dbgString() {
return connection.dbgString() + "/Stream("+streamid+")";
final int id = streamid;
final String sid = id == 0 ? "?" : String.valueOf(id);
return connection.dbgString() + "/Stream(" + sid + ")";
}

/**
* An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
* connection would be notified about such exchanges when it receives a GOAWAY frame with
* a stream id that tells which exchanges have been unprocessed.
* This method is called on such unprocessed exchanges and the implementation of this method
* will arrange for the request, corresponding to this exchange, to be retried afresh on a
* new connection.
*/
void closeAsUnprocessed() {
try {
// We arrange for the request to be retried on a new connection as allowed by the RFC-9113
markUnprocessedByPeer();
this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
if (debug.on()) {
debug.log("closing " + this.request + " as unprocessed by peer");
}
// close the exchange and complete the response CF exceptionally
close();
completeResponseExceptionally(this.errorRef.get());
} finally {
// decrementStreamsCount isn't really needed but we do it to make sure
// the log messages, where these counts/states get reported, show the accurate state.
connection.decrementStreamsCount(streamid);
}
}

private class HeadersConsumer extends ValidatingHeadersConsumer implements DecodingCallback {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -100,13 +100,16 @@ void removeStream(int streamid) {
controllerLock.lock();
try {
Integer old = streams.remove(streamid);
// Odd stream numbers (client streams) should have been registered.
// A client initiated stream might be closed (as unprocessed, due to a
// GOAWAY received on the connection) even before the stream is
// registered with this WindowController instance (when sending out request headers).
// Thus, for client initiated streams, we don't enforce the presence of the
// stream in the registered "streams" map.

// Even stream numbers (server streams - aka Push Streams) should
// not be registered
final boolean isClientStream = (streamid & 0x1) == 1;
if (old == null && isClientStream) {
throw new InternalError("Expected entry for streamid: " + streamid);
} else if (old != null && !isClientStream) {
if (old != null && !isClientStream) {
throw new InternalError("Unexpected entry for streamid: " + streamid);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -57,7 +57,9 @@ int length() {

@Override
public String toString() {
return super.toString() + " Debugdata: " + new String(debugData, UTF_8);
return super.toString()
+ " lastStreamId=" + lastStream
+ ", Debugdata: " + new String(debugData, UTF_8);
}

public int getLastStream() {
Expand Down
Loading