-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36876] Support external eventLoopGroup for RestClient #25788
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ | |
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; | ||
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; | ||
|
@@ -143,30 +144,42 @@ public class RestClient implements AutoCloseableAsync { | |
ConcurrentHashMap.newKeySet(); | ||
|
||
private final List<OutboundChannelHandlerFactory> outboundChannelHandlerFactories; | ||
private final Boolean useInternalEventLoopGroup; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why not boolean, I assume we do not want this ever to be null |
||
|
||
/** | ||
* Creates a new RestClient for the provided root URL. If the protocol of the URL is "https", | ||
* then SSL is automatically enabled for the REST client. | ||
*/ | ||
public static RestClient forUrl(Configuration configuration, Executor executor, URL rootUrl) | ||
throws ConfigurationException { | ||
return forUrl(configuration, executor, rootUrl, null); | ||
} | ||
|
||
public static RestClient forUrl( | ||
Configuration configuration, Executor executor, URL rootUrl, EventLoopGroup group) | ||
throws ConfigurationException { | ||
Preconditions.checkNotNull(configuration); | ||
Preconditions.checkNotNull(rootUrl); | ||
if ("https".equals(rootUrl.getProtocol())) { | ||
configuration = configuration.clone(); | ||
configuration.set(SSL_REST_ENABLED, true); | ||
} | ||
return new RestClient(configuration, executor, rootUrl.getHost(), rootUrl.getPort()); | ||
return new RestClient(configuration, executor, rootUrl.getHost(), rootUrl.getPort(), group); | ||
} | ||
|
||
public RestClient(Configuration configuration, Executor executor) | ||
throws ConfigurationException { | ||
this(configuration, executor, null, -1); | ||
this(configuration, executor, null, -1, null); | ||
} | ||
|
||
public RestClient(Configuration configuration, Executor executor, String host, int port) | ||
public RestClient( | ||
Configuration configuration, | ||
Executor executor, | ||
String host, | ||
int port, | ||
EventLoopGroup group) | ||
throws ConfigurationException { | ||
this(configuration, executor, host, port, DefaultSelectStrategyFactory.INSTANCE); | ||
this(configuration, executor, host, port, DefaultSelectStrategyFactory.INSTANCE, group); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -175,15 +188,16 @@ public RestClient(Configuration configuration, Executor executor, String host, i | |
Executor executor, | ||
SelectStrategyFactory selectStrategyFactory) | ||
throws ConfigurationException { | ||
this(configuration, executor, null, -1, selectStrategyFactory); | ||
this(configuration, executor, null, -1, selectStrategyFactory, null); | ||
} | ||
|
||
private RestClient( | ||
Configuration configuration, | ||
Executor executor, | ||
String host, | ||
int port, | ||
SelectStrategyFactory selectStrategyFactory) | ||
SelectStrategyFactory selectStrategyFactory, | ||
EventLoopGroup group) | ||
throws ConfigurationException { | ||
Preconditions.checkNotNull(configuration); | ||
this.executor = Preconditions.checkNotNull(executor); | ||
|
@@ -264,15 +278,21 @@ protected void initChannel(SocketChannel socketChannel) { | |
} | ||
}; | ||
|
||
// No NioEventLoopGroup constructor available that allows passing nThreads, threadFactory, | ||
// and selectStrategyFactory without also passing a SelectorProvider, so mimicking its | ||
// default value seen in other constructors | ||
NioEventLoopGroup group = | ||
new NioEventLoopGroup( | ||
1, | ||
new ExecutorThreadFactory("flink-rest-client-netty"), | ||
SelectorProvider.provider(), | ||
selectStrategyFactory); | ||
if (group == null) { | ||
// No NioEventLoopGroup constructor available that allows passing nThreads, | ||
// threadFactory, | ||
// and selectStrategyFactory without also passing a SelectorProvider, so mimicking its | ||
// default value seen in other constructors | ||
group = | ||
new NioEventLoopGroup( | ||
1, | ||
new ExecutorThreadFactory("flink-rest-client-netty"), | ||
SelectorProvider.provider(), | ||
selectStrategyFactory); | ||
useInternalEventLoopGroup = true; | ||
} else { | ||
useInternalEventLoopGroup = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check that the supplied group is not shutting down or shutdown There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the channel is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the extenal service doesn't care the type of SocketChannel. As the jira says, it just pass a shared event group to avoid heap memory leak. Maybe it's a better choice to specify the contruct param |
||
} | ||
|
||
bootstrap = new Bootstrap(); | ||
bootstrap | ||
|
@@ -317,7 +337,7 @@ private CompletableFuture<Void> shutdownInternally(Duration timeout) { | |
LOG.debug("Shutting down rest endpoint."); | ||
|
||
if (bootstrap != null) { | ||
if (bootstrap.config().group() != null) { | ||
if (bootstrap.config().group() != null && useInternalEventLoopGroup) { | ||
bootstrap | ||
.config() | ||
.group() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we tag with @nullable?