-
Notifications
You must be signed in to change notification settings - Fork 436
RATIS-2329. NettyRpcProxy should support handling netty channel exception to prevent replication stuck #1285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Env: we use celeborn 0.4.1 version, which depends on ratis 2.5.1 version and configure netty mode. |
@szetszwo Can you help review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leixm , thanks a lot for working on this! The change looks good. Just have some minor comments inlined.
private void failOutstandingRequests(Throwable cause) { | ||
replies.forEach(f -> f.completeExceptionally(cause)); | ||
replies.clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move it out. Then, it can also be used in close()
.
BTW, could you also change the IOException to AlreadyClosedException?
@@ -153,9 +169,14 @@ public class NettyRpcProxy implements Closeable {
@Override
public synchronized void close() {
client.close();
+ failOutstandingRequests(new AlreadyClosedException("Closing connection to " + peer));
+ }
+
+ private void failOutstandingRequests(Throwable cause) {
if (!replies.isEmpty()) {
- final IOException e = new IOException("Connection to " + peer + " is closed.");
- replies.stream().forEach(f -> f.completeExceptionally(e));
+ LOG.warn("Still have {} requests outstanding from {} connection: {}",
+ replies.size(), peer, cause.toString());
+ replies.forEach(f -> f.completeExceptionally(cause));
replies.clear();
}
}
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | ||
if (!replies.isEmpty()) { | ||
LOG.error( | ||
"Still have {} requests outstanding when caught exception from {} connection", | ||
replies.size(), | ||
peer, | ||
cause); | ||
failOutstandingRequests(cause); | ||
} | ||
client.close(); | ||
} | ||
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext ctx) { | ||
if (!replies.isEmpty()) { | ||
LOG.error( | ||
"Still have {} requests outstanding when connection from {} is closed", | ||
replies.size(), | ||
peer); | ||
failOutstandingRequests(new IOException("Connection to " + peer + " is closed.")); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, these two methods become very short. BTW, let's
- Wrap the cause with IOException in the first method so it could include the peer.
- Use
AlreadyClosedException
for the second method and use a different message.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
failOutstandingRequests(new IOException("Caught an exception for the connection to " + peer, cause));
client.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive."));
}
@szetszwo Thank you very much for your guidance. I have two questions to discuss:
Can we add a new exception type, such as NettyRpcException, to wrap both of the above cases into a NettyRpcException, and then modify IOUtils#shouldReconnect to support reconnecting when a NettyRpcException is encountered? |
That's a good idea. Let's add it. |
return ReflectionUtils.isInstance(e, | ||
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class, | ||
AlreadyClosedException.class); | ||
AlreadyClosedException.class, NettyRpcException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leixm , Actually, NettyRpcException
is not needed. Could we just use TimeoutIOException
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed NettyRpcException, but we should call failOutstandingRequests(cause);
in exceptionCaught, and we should not wrap it to IOException, because there may have been a SocketTimeoutException that required reconnection, but because we wrapped it into an IOException, it ultimately did not reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check also the causes?
static boolean shouldReconnect(Throwable e) {
for (; e != null; e = e.getCause()) {
if (ReflectionUtils.isInstance(e,
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
AlreadyClosedException.class, TimeoutIOException.class)) {
return true;
}
}
return false;
}
@leixm , thanks for the update. There is a test failure. Please take a look. We probably have to update the test. [INFO] Running org.apache.ratis.netty.TestRaftAsyncWithNetty
Error: Tests run: 14, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.67 s <<< FAILURE! -- in org.apache.ratis.netty.TestRaftAsyncWithNetty
Error: org.apache.ratis.netty.TestRaftAsyncWithNetty.testRequestAsyncWithRetryFailureAfterInitialMessages -- Time elapsed: 5.590 s <<< FAILURE!
java.lang.AssertionError: The test "retry-failure-0" does not throw anything.
at org.apache.ratis.BaseTest.testFailureCase(BaseTest.java:161)
at org.apache.ratis.BaseTest.testFailureCase(BaseTest.java:169)
at org.apache.ratis.RaftAsyncTests.runTestRequestAsyncWithRetryFailure(RaftAsyncTests.java:195)
at org.apache.ratis.RaftAsyncTests.lambda$runTestRequestAsyncWithRetryFailure$0(RaftAsyncTests.java:134)
at org.apache.ratis.server.impl.MiniRaftCluster$Factory$Get.runWithNewCluster(MiniRaftCluster.java:142)
at org.apache.ratis.server.impl.MiniRaftCluster$Factory$Get.runWithNewCluster(MiniRaftCluster.java:125)
at org.apache.ratis.RaftAsyncTests.runTestRequestAsyncWithRetryFailure(RaftAsyncTests.java:134)
at org.apache.ratis.RaftAsyncTests.testRequestAsyncWithRetryFailureAfterInitialMessages(RaftAsyncTests.java:129)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at java.util.ArrayList.forEach(ArrayList.java:1259) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just left minor issues! I have change the title to make this bug's impact more clear
ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
Outdated
Show resolved
Hide resolved
ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances NettyRpcProxy to handle Netty channel exceptions to prevent replication stuck scenarios. The changes improve error handling by adding proper exception handling for channel failures and modifying the reconnection logic to traverse exception causes.
- Added exception handling for Netty channel failures in NettyRpcProxy
- Enhanced reconnection logic to check exception cause chain
- Improved logging and error reporting for outstanding requests
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
NettyRpcProxy.java | Added exception handlers for channel failures, improved thread-safe queue usage, and enhanced error handling for outstanding requests |
IOUtils.java | Modified shouldReconnect method to traverse exception cause chain and added TimeoutIOException to reconnectable exceptions |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | ||
failOutstandingRequests(new IOException("Caught an exception for the connection to " + peer, cause)); | ||
client.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exceptionCaught method should call super.exceptionCaught(ctx, cause) to ensure proper exception propagation in the Netty pipeline, or at minimum call ctx.close() to properly close the channel context.
client.close(); | |
client.close(); | |
ctx.close(); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx.close() and client.close() are equivalent and do not need to be called repeatedly.
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext ctx) { | ||
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channelInactive method should call super.channelInactive(ctx) to ensure proper cleanup in the Netty pipeline.
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive.")); | |
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive.")); | |
super.channelInactive(ctx); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot seems to have a legitimate suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leixm , thanks for the update! Please see the comment inlined.
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext ctx) { | ||
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot seems to have a legitimate suggestion.
What changes were proposed in this pull request?
NettyRpcProxy should support handling netty channel exception
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-2329
How was this patch tested?
Manual tests