Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2278] feat(coordinator): Introduce AccessSupportRssChecker to reject the un-support application earlier #2277

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker;
import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker;
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
import org.apache.uniffle.coordinator.conf.ClientConfParser;
import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
Expand Down Expand Up @@ -91,8 +94,9 @@ public class CoordinatorConf extends RssBaseConf {
.stringType()
.asList()
.defaultValues(
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker",
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker")
AccessClusterLoadChecker.class.getCanonicalName(),
AccessQuotaChecker.class.getCanonicalName(),
AccessSupportRssChecker.class.getCanonicalName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you modify the document, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.withDescription("Access checkers");
public static final ConfigOption<Integer> COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC =
ConfigOptions.key("rss.coordinator.access.candidates.updateIntervalSec")
Expand Down Expand Up @@ -257,6 +261,15 @@ public class CoordinatorConf extends RssBaseConf {
.defaultValues("appHeartbeat", "heartbeat")
.withDescription("Exclude record rpc audit operation list, separated by ','");

public static final ConfigOption<List<String>> COORDINATOR_UNSUPPORTED_CONFIGS =
ConfigOptions.key("rss.coordinator.unsupportedConfigs")
.stringType()
.asList()
.defaultValues("serializer:org.apache.hadoop.io.serializer.JavaSerialization")
.withDescription(
"The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties "
+ "and they are set to be denied access, the client's access will be rejected.");

public CoordinatorConf() {}

public CoordinatorConf(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@
/** Abstract class for checking the access info from the client-side. */
public abstract class AbstractAccessChecker implements AccessChecker {

protected AbstractAccessChecker(AccessManager accessManager) throws Exception {}
private final AccessManager accessManager;

protected AbstractAccessChecker(AccessManager accessManager) throws Exception {
this.accessManager = accessManager;
}

@Override
public void refreshAccessChecker() {}

public AccessManager getAccessManager() {
return accessManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.access.checker;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

/**
* AccessSupportRssChecker checks whether the extra properties support rss, for example, the
* serializer is java, rss is not supported.
*/
public class AccessSupportRssChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class);
private final HashMap<String, String> unsupportedConfigMap;

public AccessSupportRssChecker(AccessManager accessManager) throws Exception {
super(accessManager);
List<String> unsupportedConfigs =
accessManager.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_UNSUPPORTED_CONFIGS);
unsupportedConfigMap = new HashMap<>();
if (unsupportedConfigs != null && !unsupportedConfigs.isEmpty()) {
for (String keyValue : unsupportedConfigs) {
String[] pair = keyValue.split(":", 2);
if (pair.length == 2) {
unsupportedConfigMap.put(pair[0], pair[1]);
} else {
LOG.error("Unsupported config {} has wrong format, skip it.", keyValue);
}
}
}
}

@Override
public AccessCheckResult check(AccessInfo accessInfo) {
for (Map.Entry<String, String> entry : unsupportedConfigMap.entrySet()) {
String unsupportedConfKey = entry.getKey();
String unsupportedConfValue = entry.getValue();
String actualConfValue = accessInfo.getExtraProperties().get(unsupportedConfKey);
if (Objects.equals(actualConfValue, unsupportedConfValue)) {
String msg =
String.format(
"Denied by AccessSupportRssChecker, %s is %s, AccessSupportRssChecker does not supported.",
unsupportedConfKey, actualConfValue);
LOG.debug(msg);
CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
}

return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CoordinatorMetrics {
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
private static final String TOTAL_SUPPORT_RSS_DENIED_REQUEST = "total_support_rss_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
public static final String APP_NUM_TO_USER = "app_num";
public static final String USER_LABEL = "user_name";
Expand All @@ -57,6 +58,7 @@ public class CoordinatorMetrics {
public static Counter counterTotalCandidatesDeniedRequest;
public static Counter counterTotalQuotaDeniedRequest;
public static Counter counterTotalLoadDeniedRequest;
public static Counter counterTotalSupportRssDeniedRequest;
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();

private static MetricsManager metricsManager;
Expand Down Expand Up @@ -118,5 +120,7 @@ private static void setUpMetrics() {
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
counterTotalSupportRssDeniedRequest =
metricsManager.addCounter(TOTAL_SUPPORT_RSS_DENIED_REQUEST);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.checker;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.ClusterManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

public class AccessSupportRssCheckerTest {

@BeforeEach
public void setUp() {
CoordinatorMetrics.register();
}

@AfterEach
public void clear() {
CoordinatorMetrics.clear();
}

@Test
public void test() throws Exception {
ClusterManager clusterManager = mock(SimpleClusterManager.class);

CoordinatorConf conf = new CoordinatorConf();
conf.set(
COORDINATOR_ACCESS_CHECKERS,
Collections.singletonList(AccessSupportRssChecker.class.getName()));
Map<String, String> properties = new HashMap<>();

/** case1: check success when the serializer config is empty. */
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertTrue(checker.check(accessInfo).isSuccess());
}

/** case2: check failed when the serializer config is JavaSerialization. */
properties.put("serializer", JavaSerialization.class.getCanonicalName());
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertFalse(checker.check(accessInfo).isSuccess());
}

/** case3: check success when the serializer config is other than JavaSerialization. */
properties.put("serializer", WritableSerialization.class.getCanonicalName());
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertTrue(checker.check(accessInfo).isSuccess());
}
}
}
Loading
Loading