Skip to content

Commit

Permalink
YARN-11154. Make router support proxy server. (#5946)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu authored Aug 19, 2023
1 parent ebc32fb commit 1361113
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4401,6 +4401,9 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;

public static final String ROUTER_WEBAPP_PROXY_ENABLE = ROUTER_WEBAPP_PREFIX + "proxy.enable";
public static final boolean DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE = true;

private static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg.";

// The number of threads to use for the GPG scheduled executor service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5264,6 +5264,14 @@
<value>false</value>
</property>

<property>
<description>
Whether to enable proxy service in router. Default is true.
</description>
<name>yarn.router.webapp.proxy.enable</name>
<value>true</value>
</property>

<property>
<description>
The number of threads to use for the GPG scheduled executor service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,6 +45,10 @@
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebApp;
import org.apache.hadoop.yarn.server.webproxy.FedAppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
Expand Down Expand Up @@ -91,6 +96,7 @@ public class Router extends CompositeService {
@VisibleForTesting
protected String webAppAddress;
private static long clusterTimeStamp = System.currentTimeMillis();
private FedAppReportFetcher fetcher = null;

/**
* Priority of the Router shutdown hook.
Expand Down Expand Up @@ -209,9 +215,29 @@ public void startWepApp() {

Builder<Object> builder =
WebApps.$for("cluster", null, null, "ws").with(conf).at(webAppAddress);
if (RouterServerUtil.isRouterWebProxyEnable(conf)) {
fetcher = new FedAppReportFetcher(conf);
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC,
WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
String proxyHostAndPort = getProxyHostAndPort(conf);
String[] proxyParts = proxyHostAndPort.split(":");
builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
}
webApp = builder.start(new RouterWebApp(this));
}

public static String getProxyHostAndPort(Configuration conf) {
String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
if(addr == null || addr.isEmpty()) {
InetSocketAddress address = conf.getSocketAddr(YarnConfiguration.ROUTER_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PORT);
addr = WebAppUtils.getResolvedAddress(address);
}
return addr;
}

public static void main(String[] argv) {
Configuration conf = new YarnConfiguration();
Thread
Expand Down Expand Up @@ -267,4 +293,9 @@ private String getHostName(Configuration config)
public static long getClusterTimeStamp() {
return clusterTimeStamp;
}

@VisibleForTesting
public FedAppReportFetcher getFetcher() {
return fetcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -801,4 +801,9 @@ public static ApplicationSubmissionContext getTrimmedAppSubmissionContext(

return trimmedContext;
}

public static boolean isRouterWebProxyEnable(Configuration conf) {
return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.router.clientrm;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
Expand Down Expand Up @@ -104,11 +107,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -138,6 +143,7 @@ public class RouterClientRMService extends AbstractService
// and remove the oldest used ones.
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;

private URL redirectURL;
private RouterDelegationTokenSecretManager routerDTSecretManager;

public RouterClientRMService() {
Expand All @@ -157,6 +163,10 @@ protected void serviceStart() throws Exception {
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);

if (RouterServerUtil.isRouterWebProxyEnable(conf)) {
redirectURL = getRedirectURL();
}

int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
Expand Down Expand Up @@ -318,7 +328,22 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getApplicationReport(request);
GetApplicationReportResponse response = pipeline.getRootInterceptor()
.getApplicationReport(request);
if (RouterServerUtil.isRouterWebProxyEnable(getConfig())) {
// After redirect url, tracking url in application report will
// redirect to embeded proxy server of router
URL url = new URL(response.getApplicationReport().getTrackingUrl());
String redirectUrl = new URL(redirectURL.getProtocol(),
redirectURL.getHost(), redirectURL.getPort(), url.getFile())
.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("The tracking url of application {} is redirect from {} to {}",
response.getApplicationReport().getApplicationId(), url, redirectUrl);
}
response.getApplicationReport().setTrackingUrl(redirectUrl);
}
return response;
}

@Override
Expand Down Expand Up @@ -623,4 +648,20 @@ public void initUserPipelineMap(Configuration conf) {
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
}

private URL getRedirectURL() throws Exception {
Configuration conf = getConfig();
String webAppAddress = WebAppUtils.getWebAppBindURL(conf, YarnConfiguration.ROUTER_BIND_HOST,
WebAppUtils.getRouterWebAppURLWithoutScheme(conf));
String[] hostPort = StringUtils.split(webAppAddress, ':');
if (hostPort.length != 2) {
throw new YarnRuntimeException("Router can't get valid redirect proxy url");
}
String host = hostPort[0];
int port = Integer.parseInt(hostPort[1]);
if (StringUtils.isBlank(host) || host.equals("0.0.0.0")) {
host = InetAddress.getLocalHost().getCanonicalHostName();
}
return new URL(YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http", host, port, "");
}
}
Loading

0 comments on commit 1361113

Please sign in to comment.