Skip to content

Commit

Permalink
Fix the NPE from IS update metrics (apache#13313)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Jun 4, 2024
1 parent 164cd81 commit 970d9b9
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.common.utils.helix;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -102,20 +102,21 @@ public static IdealState cloneIdealState(IdealState idealState) {
* @param updater A function that returns an updated ideal state given an input ideal state
* @return updated ideal state if successful, null if not
*/
public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy, final boolean noChangeOk) {
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) {
// NOTE: ControllerMetrics could be null because this method might be invoked by Broker.
ControllerMetrics controllerMetrics = ControllerMetrics.get();
try {
long startTimeMs = System.currentTimeMillis();
IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
int retries = policy.attempt(new Callable<Boolean>() {
int retries = policy.attempt(new Callable<>() {
@Override
public Boolean call() {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the the idealState above to pass it to the updater
// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = cloneIdealState(idealState);
Expand Down Expand Up @@ -197,20 +198,21 @@ private boolean shouldCompress(IdealState is) {
numChars += entry.getValue().length();
}
numChars *= is.getNumPartitions();
if (_minNumCharsInISToTurnOnCompression > 0
&& numChars > _minNumCharsInISToTurnOnCompression) {
return true;
}
return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression;
}
return false;
}
});
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
}
Expand Down

0 comments on commit 970d9b9

Please sign in to comment.