Skip to content

Commit

Permalink
Merge pull request #12 from pinterest/cbrennan/resource-isolation-3.3…
Browse files Browse the repository at this point in the history
…_cacheselect

Resource Isolation Feature Checkpoint 4: Extend cache warmup to allow multiple resource isolation groups and multiple replicas
  • Loading branch information
ctbrennan authored Oct 11, 2024
2 parents ebd319c + e30a6ef commit bec060d
Show file tree
Hide file tree
Showing 14 changed files with 780 additions and 17 deletions.
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public enum ErrorCode {
ERR_WAREHOUSE_SUSPENDED(10003, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s has been suspended."),
ERR_WAREHOUSE_UNAVAILABLE(10004, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s is not available."),
ERR_NO_NODES_IN_WAREHOUSE(10005, new byte[] {'4', '2', '0', '0', '0'},
"No alive backend or compute node in warehouse %s."),
"No alive backend or compute node in warehouse %s. Also possible that there are no CN of the " +
"resource isolation group matching the FE."),
ERR_INVALID_WAREHOUSE_NAME(10006, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse name can not be null or empty"),

ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE(10007, new byte[] {'4', '2', '0', '0', '0'},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
tmpSessionVariable.setDataCachePriority(statement.getPriority());
tmpSessionVariable.setDatacacheTTLSeconds(statement.getTTLSeconds());
tmpSessionVariable.setEnableCacheSelect(true);
// Note that although setting these values in the SessionVariable is not ideal, it's way more disruptive to pipe
// this information to where it needs to be through the insertStmt.
if (statement.getNumReplicasDesired() > 1) {
// We only set this value if it is larger than the default assumption.
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
}
if (statement.getResourceIsolationGroups() != null && !statement.getResourceIsolationGroups().isEmpty()) {
// We only set this value if it is the non-default.
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
}
connectContext.setSessionVariable(tmpSessionVariable);

InsertStmt insertStmt = statement.getInsertStmt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
int numUsedComputeNodes,
ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy,
long warehouseId) {

String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
return captureAvailableWorkers(warehouseId, thisFeResourceIsolationGroup);
}
public DefaultSharedDataWorkerProvider captureAvailableWorkers(long warehouseId, String resourceIsolationGroup) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
ImmutableMap.Builder<Long, ComputeNode> builder = ImmutableMap.builder();
List<Long> computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId);
Expand All @@ -86,12 +90,12 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
LOG.debug("idToComputeNode: {}", idToComputeNode);
}

ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode);
ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode,
resourceIsolationGroup);
if (availableComputeNodes.isEmpty()) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}

return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes);
}
}
Expand All @@ -113,10 +117,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService

private final Set<Long> selectedWorkerIds;

private boolean allowGetAnyWorker = false;

@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
ImmutableMap<Long, ComputeNode> availableID2ComputeNode
) {
ImmutableMap<Long, ComputeNode> availableID2ComputeNode) {
this.id2ComputeNode = id2ComputeNode;
this.availableID2ComputeNode = availableID2ComputeNode;
this.selectedWorkerIds = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -158,9 +163,33 @@ public Collection<ComputeNode> getAllWorkers() {
return availableID2ComputeNode.values();
}

// Functionality: If allowGetAnyWorker is set, then getWorkerById will use global state to get a ComputeNode
// reference instead of its available node map.
// Where it's used: As of writing this, this is only used for CACHE SELECT statements,
// Why: This is necessary because FragmentAssignmentStrategy classes, Coordinator, and CoordinatorPreprocessors use
// the WorkerProvider interface to get ComputeNode references. During a CACHE SELECT execution (which is the only
// type of statement that should be selecting workers from an un-matching resource isolation group), they will need
// to get the ComputeNode addresses for nodes outside the leader group.
// Alternative considered: Another way of achieving this would be to allow any caller to get a ComputeNode reference
// for any compute node, but that would leave space for bugs during the execution of other types of statements which
// really shouldn't be getting ComputeNode references for un-matching resource isolation groups or unhealthy
// ComputeNodes. Instead of changing a bunch of code which uses the WorkerProvider in a specific way, this way
// limits scope to only change behavior when the user of the WorkerProvider sets this very specific option.
public void setAllowGetAnyWorker(boolean allowGetAnyWorker) {
this.allowGetAnyWorker = allowGetAnyWorker;
}

@Override
public ComputeNode getWorkerById(long workerId) {
return availableID2ComputeNode.get(workerId);
ComputeNode cn = availableID2ComputeNode.get(workerId);
if (cn == null && allowGetAnyWorker) {
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
cn = systemInfoService.getBackendOrComputeNode(workerId);
if (cn == null) {
LOG.warn(String.format("could not get worker by id: %s", workerId));
}
}
return cn;
}

@Override
Expand Down Expand Up @@ -304,13 +333,13 @@ private static ComputeNode getNextWorker(ImmutableMap<Long, ComputeNode> workers
return workers.values().asList().get(index);
}

private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers) {
String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers,
String resourceIsolationGroup) {
ImmutableMap.Builder<Long, ComputeNode> builder = new ImmutableMap.Builder<>();
for (Map.Entry<Long, ComputeNode> entry : workers.entrySet()) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) &&
resourceIsolationGroupMatches(thisFeResourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
resourceIsolationGroupMatches(resourceIsolationGroup,
entry.getValue().getResourceIsolationGroup())) {
builder.put(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider;
import com.starrocks.planner.ScanNode;
import com.starrocks.qe.scheduler.WorkerProvider;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.SystemInfoService;
import com.starrocks.system.TabletComputeNodeMapper;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TScanRangeParams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.qe.scheduler.Utils.getOptionalTabletId;

// This class should only be used in shared data mode.
public class CacheSelectBackendSelector implements BackendSelector {
private static final Logger LOG = LogManager.getLogger(CacheSelectBackendSelector.class);

// Inputs
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
private final CacheSelectComputeNodeSelectionProperties props;
private final long warehouseId;

// Outputs
private final FragmentScanRangeAssignment assignment;
// This WorkerProvider is used to provide signal to the caller, but not used to select the compute nodes to use.
private final WorkerProvider callerWorkerProvider;

public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment, WorkerProvider callerWorkerProvider,
CacheSelectComputeNodeSelectionProperties props, long warehouseId) {
this.scanNode = scanNode;
this.locations = locations;
this.assignment = assignment;
this.callerWorkerProvider = callerWorkerProvider;
this.props = props;
this.warehouseId = warehouseId;
}

private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
List<Long> cnIdsOrderedByPreference =
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, resourceIsolationGroupId, tabletId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}

private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceIsolationGroupId)
throws UserException {
Set<Long> selectedCn = new HashSet<>();
DefaultSharedDataWorkerProvider workerProvider =
new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
long targetBackendId = mainTargetCnId;
while (selectedCn.size() < props.numReplicasDesired) {
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) {
workerProvider.reportDataNodeNotFoundException();
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
props.numReplicasDesired, resourceIsolationGroupId));
}
}
selectedCn.add(targetBackendId);
}
return selectedCn;
}

@Override
public void computeScanRangeAssignment() throws UserException {
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) {
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" +
" resourceIsolationGroups specified.");
}
if (props.numReplicasDesired < 1) {
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired);
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
Set<Long> allSelectedWorkerIds = new HashSet<>();
for (TScanRangeLocations scanRangeLocations : locations) {
TScanRangeParams scanRangeParams = new TScanRangeParams(scanRangeLocations.scan_range);
Optional<Long> tabletId = getOptionalTabletId(scanRangeLocations.scan_range);
// Try to create assignments for each of the resourceIsolationGroups specified.
for (String resourceIsolationGroupId : props.resourceIsolationGroups) {
Set<Long> selectedCn;
// If we've been provided the relevant tablet id, and we're using resource isolation groups, which
// is when we prefer to use the internal mapping, then we populate the datacaches of the CN which
// are most preferred for the tablet.
if (tabletId.isPresent() && systemInfoService.shouldUseInternalTabletToCnMapper()) {
selectedCn = assignedCnByTabletId(systemInfoService, tabletId.get(), resourceIsolationGroupId);
} else {
if (scanRangeLocations.getLocationsSize() != 1) {
throw new UserException(
"CacheSelectBackendSelector expected to be used in situations where there is exactly" +
" one CN to which any given tablet is officially assigned: " +
scanRangeLocations);
}
selectedCn =
assignedCnByBackupWorker(scanRangeLocations.getLocations().get(0).getBackend_id(),
resourceIsolationGroupId);
}
LOG.debug(String.format(
"done doing assignment for resource isolation group %s, tablet %d, location %s: CN chosen are %s",
resourceIsolationGroupId,
tabletId.orElse(-1L),
scanRangeLocations.getLocations().get(0),
selectedCn.stream().map(String::valueOf).collect(Collectors.joining(","))));

for (Long cnId : selectedCn) {
assignment.put(cnId, scanNode.getId().asInt(), scanRangeParams);
allSelectedWorkerIds.add(cnId);
}
}
}
// Note that although we're not using the provided callerWorkerProvider above, the caller assumes that we used
// it to note the selected backend ids. This is used for things like checking if the worker has died
// and cancelling queries.
for (long workerId : allSelectedWorkerIds) {
callerWorkerProvider.selectWorkerUnchecked(workerId);
}
// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute
// nodes chosen in this function, so we must enable getting any worker regardless of availability.
callerWorkerProvider.setAllowGetAnyWorker(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.qe;

import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.List;

// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data.
// Defaults:
// if resource isolation groups are not specified in the CACHE SELECT statement, we assume the request intends to
// populate the data cache for the current FE's resource isolation group.
// If number of replicas is not specified in the CACHE SELECT statement, we assume the request intends to cache 1 replica.
public class CacheSelectComputeNodeSelectionProperties {
public List<String> resourceIsolationGroups;
public int numReplicasDesired;

public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) {
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) {
this.resourceIsolationGroups = new ArrayList<>();
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf()
.getResourceIsolationGroup());
} else {
this.resourceIsolationGroups = resourceIsolationGroups;
}
this.numReplicasDesired = Math.max(numReplicasDesired, 1);
}
}
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,9 @@ public String getCatalog() {

private boolean enableCacheSelect = false;

private List<String> datacacheSelectResourceGroups = null;
private int numDesiredDatacacheReplicas = -1;

@VariableMgr.VarAttr(name = ENABLE_DYNAMIC_PRUNE_SCAN_RANGE)
private boolean enableDynamicPruneScanRange = true;

Expand Down Expand Up @@ -3978,6 +3981,22 @@ public void setLikePredicateConsolidateMin(int value) {
this.likePredicateConsolidateMin = value;
}

public List<String> getDatacacheSelectResourceGroups() {
return datacacheSelectResourceGroups;
}

public void setDatacacheSelectResourceGroups(List<String> datacacheSelectResourceGroups) {
this.datacacheSelectResourceGroups = datacacheSelectResourceGroups;
}

public int getNumDesiredDatacacheReplicas() {
return numDesiredDatacacheReplicas;
}

public void setNumDesiredDatacacheReplicas(int numDesiredDatacacheReplicas) {
this.numDesiredDatacacheReplicas = numDesiredDatacacheReplicas;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ WorkerProvider captureAvailableWorkers(SystemInfoService systemInfoService,

Collection<ComputeNode> getAllWorkers();

// Makes it so getWorkerById is not restricted by which nodes are "available"/of the same resource isolation group.
default void setAllowGetAnyWorker(boolean allowGetAnyWorker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("setAllowGetAnyWorker is not supported.");
}

ComputeNode getWorkerById(long workerId);

boolean isDataNodeAvailable(long dataNodeId);
Expand Down
Loading

0 comments on commit bec060d

Please sign in to comment.