Skip to content

Commit

Permalink
Fix build after merging #13646 (#14072)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Sep 24, 2024
1 parent 90e7562 commit 49896eb
Showing 1 changed file with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2353,9 +2353,9 @@ public void assignTableSegment(String tableNameWithType, String segmentName) {
// Assign a list of segments in batch mode
public void assignTableSegments(String tableNameWithType, List<String> segmentNames) {
Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
for (String segmentName: segmentNames) {
String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName);
for (String segmentName : segmentNames) {
String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName);
segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
}
// Assign instances for the segment and add it into IdealState
Expand All @@ -2370,11 +2370,12 @@ public void assignTableSegments(String tableNameWithType, List<String> segmentNa
if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
for (String segmentName: segmentNames) {
for (String segmentName : segmentNames) {
// Update segment tier to support direct assignment for multiple data directories
updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment(
tableNameWithType, segmentName, sortedTiers, _helixZkManager);
InstancePartitions tierInstancePartitions =
TierConfigUtils.getTieredInstancePartitionsForSegment(tableNameWithType, segmentName, sortedTiers,
_helixZkManager);
if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
// Override instance partitions for offline table
LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}",
Expand All @@ -2386,35 +2387,33 @@ public void assignTableSegments(String tableNameWithType, List<String> segmentNa

SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics);
synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
long segmentAssignmentStartTs = System.currentTimeMillis();
Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap;
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
for (String segmentName: segmentNames) {
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
if (currentAssignment.containsKey(segmentName)) {
LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
tableNameWithType);
} else {
List<String> assignedInstances =
segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
}
long segmentAssignmentStartMs = System.currentTimeMillis();
Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap;
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
for (String segmentName : segmentNames) {
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
if (currentAssignment.containsKey(segmentName)) {
LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
tableNameWithType);
} else {
List<String> assignedInstances =
segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
return idealState;
});
LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames,
tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartTs);
}
}
return idealState;
});
LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames,
tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartMs);
} catch (Exception e) {
LOGGER.error(
"Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata",
segmentNames, tableNameWithType, e);
for (Map.Entry<String, String> segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) {
for (Map.Entry<String, String> segmentZKMetadataPathEntry : segmentZKMetadataPathMap.entrySet()) {
String segmentName = segmentZKMetadataPathEntry.getKey();
String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue();
if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) {
Expand Down

0 comments on commit 49896eb

Please sign in to comment.