Skip to content

Commit

Permalink
bug fix for cache selection
Browse files Browse the repository at this point in the history
  • Loading branch information
ctbrennan committed Sep 26, 2024
1 parent 8376c2c commit d682c96
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(long warehouseId,

private final Set<Long> selectedWorkerIds;

private boolean allowGetAnyWorker = false;

@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
Expand Down Expand Up @@ -162,9 +163,33 @@ public Collection<ComputeNode> getAllWorkers() {
return availableID2ComputeNode.values();
}

// Functionality: If allowGetAnyWorker is set, then getWorkerById will use global state to get a ComputeNode
// reference instead of its available node map.
// Where it's used: As of writing this, this is only used for CACHE SELECT statements,
// Why: This is necessary because FragmentAssignmentStrategy classes, Coordinator, and CoordinatorPreprocessors use
// the WorkerProvider interface to get ComputeNode references. During a CACHE SELECT execution (which is the only
// type of statement that should be selecting workers from an un-matching resource isolation group), they will need
// to get the ComputeNode addresses for nodes outside the leader group.
// Alternative considered: Another way of achieving this would be to allow any caller to get a ComputeNode reference
// for any compute node, but that would leave space for bugs during the execution of other types of statements which
// really shouldn't be getting ComputeNode references for un-matching resource isolation groups or unhealthy
// ComputeNodes. Instead of changing a bunch of code which uses the WorkerProvider in a specific way, this way
// limits scope to only change behavior when the user of the WorkerProvider sets this very specific option.
public void setAllowGetAnyWorker(boolean allowGetAnyWorker) {
this.allowGetAnyWorker = allowGetAnyWorker;
}

@Override
public ComputeNode getWorkerById(long workerId) {
return availableID2ComputeNode.get(workerId);
ComputeNode cn = availableID2ComputeNode.get(workerId);
if (cn == null && allowGetAnyWorker) {
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
cn = systemInfoService.getBackendOrComputeNode(workerId);
if (cn == null) {
LOG.warn(String.format("could not get worker by id: %s", workerId));
}
}
return cn;
}

@Override
Expand Down Expand Up @@ -313,7 +338,8 @@ private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableM
ImmutableMap.Builder<Long, ComputeNode> builder = new ImmutableMap.Builder<>();
for (Map.Entry<Long, ComputeNode> entry : workers.entrySet()) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) &&
resourceIsolationGroupMatches(resourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
resourceIsolationGroupMatches(resourceIsolationGroup,
entry.getValue().getResourceIsolationGroup())) {
builder.put(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@
import com.starrocks.system.TabletComputeNodeMapper;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TScanRangeParams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.qe.scheduler.Utils.getOptionalTabletId;

// This class should only be used in shared data mode.
public class CacheSelectBackendSelector implements BackendSelector {
private static final Logger LOG = LogManager.getLogger(CacheSelectBackendSelector.class);

// Inputs
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
Expand All @@ -57,14 +62,14 @@ public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> l
}

private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
List<Long> cnIdsOrderedByPreference =
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
props.numReplicasDesired, resourceIsolationGroupId));
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, resourceIsolationGroupId, tabletId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}
Expand All @@ -79,7 +84,7 @@ private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceI
while (selectedCn.size() < props.numReplicasDesired) {
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (selectedCn.contains(targetBackendId)) {
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) {
workerProvider.reportDataNodeNotFoundException();
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
Expand Down Expand Up @@ -125,6 +130,13 @@ public void computeScanRangeAssignment() throws UserException {
assignedCnByBackupWorker(scanRangeLocations.getLocations().get(0).getBackend_id(),
resourceIsolationGroupId);
}
LOG.debug(String.format(
"done doing assignment for resource isolation group %s, tablet %d, location %s: CN chosen are %s",
resourceIsolationGroupId,
tabletId.orElse(-1L),
scanRangeLocations.getLocations().get(0),
selectedCn.stream().map(String::valueOf).collect(Collectors.joining(","))));

for (Long cnId : selectedCn) {
assignment.put(cnId, scanNode.getId().asInt(), scanRangeParams);
allSelectedWorkerIds.add(cnId);
Expand All @@ -137,5 +149,8 @@ public void computeScanRangeAssignment() throws UserException {
for (long workerId : allSelectedWorkerIds) {
callerWorkerProvider.selectWorkerUnchecked(workerId);
}
// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute
// nodes chosen in this function, so we must enable getting any worker regardless of availability.
callerWorkerProvider.setAllowGetAnyWorker(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ WorkerProvider captureAvailableWorkers(SystemInfoService systemInfoService,

Collection<ComputeNode> getAllWorkers();

// Makes it so getWorkerById is not restricted by which nodes are "available"/of the same resource isolation group.
default void setAllowGetAnyWorker(boolean allowGetAnyWorker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("setAllowGetAnyWorker is not supported.");
}

ComputeNode getWorkerById(long workerId);

boolean isDataNodeAvailable(long dataNodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.assertj.core.util.Sets;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -740,4 +741,30 @@ public void testCollocationBackendSelectorWithSharedDataWorkerProvider() {
Assert.assertThrows(NonRecoverableException.class, selector::computeScanRangeAssignment);
}
}

@Test
public void testGetAnyWorker(@Mocked SystemInfoService systemInfoService) {
long nodeId1 = 1L;
long nodeId2 = 2L;
ComputeNode availNode = id2AllNodes.get(nodeId1);
WorkerProvider provider = new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes),
ImmutableMap.of(availNode.getId(), availNode));

Assert.assertNotNull(provider.getWorkerById(nodeId1));
Assert.assertNull(provider.getWorkerById(nodeId2));

new Expectations() {
{
systemInfoService.getBackendOrComputeNode(nodeId2);
times = 1;
result = new ComputeNode(nodeId2, "whatever", 100);
}
};

provider.setAllowGetAnyWorker(true);
Assert.assertNotNull(provider.getWorkerById(nodeId2));

provider.setAllowGetAnyWorker(false);
Assert.assertNull(provider.getWorkerById(nodeId2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.stream.Stream;

import static com.starrocks.server.WarehouseManager.DEFAULT_WAREHOUSE_ID;
import static org.junit.Assert.assertThrows;

public class CacheSelectBackendSelectorTest {

Expand Down Expand Up @@ -180,6 +181,9 @@ public void testSelectBackendKnownTabletIdAndInternalMapping(@Mocked SystemInfoS
callerWorkerProvider.selectWorkerUnchecked(cnId);
minTimes = 1;
}

callerWorkerProvider.setAllowGetAnyWorker(true);
times = 1;
}
};

Expand Down Expand Up @@ -253,6 +257,9 @@ public void testSelectBackendKnownTabletIdNoInternalTabletMapper(@Mocked Warehou
callerWorkerProvider.selectWorkerUnchecked(cnId);
minTimes = 1;
}

callerWorkerProvider.setAllowGetAnyWorker(true);
times = 1;
}
};

Expand Down Expand Up @@ -325,6 +332,9 @@ public void testSelectBackendUnknownTabletId(@Mocked WarehouseManager warehouseM
callerWorkerProvider.selectWorkerUnchecked(cnId);
minTimes = 1;
}

callerWorkerProvider.setAllowGetAnyWorker(true);
times = 1;
}
};

Expand All @@ -345,4 +355,67 @@ public void testSelectBackendUnknownTabletId(@Mocked WarehouseManager warehouseM
Assert.assertEquals(expectedAssignment, assignment);
}
}

@Test
public void testInsufficientWorkerCountThrowsExceptionNoTablet(@Mocked WarehouseManager warehouseManager,
@Mocked SystemInfoService systemInfoService,
@Mocked WorkerProvider callerWorkerProvider) {
ScanNode scanNode = newOlapScanNode(1, 1);
Map<Long, ComputeNode> nodes = new HashMap<>();
List<Long> nodeIds = new ArrayList<>();
{
ComputeNode cn = new ComputeNode(1L, "whatever", 100);
cn.setAlive(true);
cn.setResourceIsolationGroup("group1");
nodes.put(cn.getId(), cn);
nodeIds.add(cn.getId());
}
{
ComputeNode cn = new ComputeNode(2L, "whatever", 100);
cn.setAlive(true);
cn.setResourceIsolationGroup("group1");
nodes.put(cn.getId(), cn);
nodeIds.add(cn.getId());
}
{
ComputeNode cn = new ComputeNode(3L, "whatever", 100);
cn.setAlive(true);
cn.setResourceIsolationGroup("group2");
nodes.put(cn.getId(), cn);
nodeIds.add(cn.getId());
}
for (ComputeNode cn : nodes.values()) {
new Expectations() {
{
systemInfoService.getBackendOrComputeNode(cn.getId());
result = cn;
}
};
}

// Non-internal scans don't have tabletIds and should therefore use the workerProviders to get backups.
List<TScanRangeLocations> locations = generateScanRangeLocations(nodes, 1, 1, false);
// Confirm our assumption that the TScanRangeLocations we used in the test has the 1st CN assigned.
Assert.assertEquals(1, locations.get(0).locations.get(0).backend_id);

FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
CacheSelectComputeNodeSelectionProperties props =
new CacheSelectComputeNodeSelectionProperties(List.of("group1", "group2"), 2);
CacheSelectBackendSelector selector =
new CacheSelectBackendSelector(scanNode, locations, assignment, callerWorkerProvider, props,
DEFAULT_WAREHOUSE_ID);
new Expectations() {
{
warehouseManager.getAllComputeNodeIds(DEFAULT_WAREHOUSE_ID);
result = nodeIds;
}
};

UserException exception = assertThrows(UserException.class, selector::computeScanRangeAssignment);
Assert.assertEquals("Compute node not found. Check if any compute node is down." +
" nodeId: -1 compute node: [whatever alive: true, available: false, inBlacklist: false]" +
" [whatever alive: true, available: false, inBlacklist: false]" +
" [whatever alive: true, available: true, inBlacklist: false] ", exception.getMessage());

}
}

0 comments on commit d682c96

Please sign in to comment.