diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java index a435e35eadb8..2e5e7e2c9646 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java @@ -29,6 +29,10 @@ public void onTrigger(TableRebalanceObserver.Trigger trigger, Map> targetState) { } + @Override + public void onNoop(String msg) { + } + @Override public void onSuccess(String msg) { } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java index e9c5c299cf2c..6139a26d6572 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java @@ -40,6 +40,8 @@ enum Trigger { void onTrigger(Trigger trigger, Map> currentState, Map> targetState); + void onNoop(String msg); + void onSuccess(String msg); void onError(String errorMsg); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 5c3514fa7eb3..765d57654947 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -194,21 +194,23 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb try { currentIdealState = _helixDataAccessor.getProperty(idealStatePropertyKey); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while fetching IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching IdealState: " + e, null, null, null); } if (currentIdealState == null) { - LOGGER.warn("For rebalanceId: {}, cannot find the IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType); + onReturnFailure( + String.format("For rebalanceId: %s, cannot find the IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null, null); } if (!currentIdealState.isEnabled() && !downtime) { - LOGGER.warn("For rebalanceId: {}, cannot rebalance disabled table: {} without downtime, aborting the rebalance", - rebalanceJobId, tableNameWithType); + onReturnFailure(String.format( + "For rebalanceId: %s, cannot rebalance disabled table: %s without downtime, aborting the rebalance", + rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot rebalance disabled table without downtime", null, null, null); } @@ -224,8 +226,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft(); instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { - LOGGER.warn("For rebalanceId: {}, caught exception while fetching/calculating instance partitions for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching/calculating instance partitions for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching/calculating instance partitions: " + e, null, null, null); } @@ -241,9 +244,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft(); tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while fetching/calculating tier instance partitions for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching/calculating tier instance partitions for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching/calculating tier instance partitions: " + e, null, null, null); } @@ -258,8 +261,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { - LOGGER.warn("For rebalanceId: {}, caught exception while calculating target assignment for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while calculating target assignment for table: %s, aborting the " + + "rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while calculating target assignment: " + e, instancePartitionsMap, tierToInstancePartitionsMap, null); @@ -273,6 +277,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb if (segmentAssignmentUnchanged) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) { + _tableRebalanceObserver.onNoop( + String.format("For rebalanceId: %s, instance unchanged and table: %s is already balanced", rebalanceJobId, + tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } else { @@ -281,6 +288,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } else { + _tableRebalanceObserver.onSuccess( + String.format("For rebalanceId: %s, instance reassigned but table: %s is already balanced", + rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -309,16 +319,19 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor() .set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(), AccessOption.PERSISTENT), "Failed to update IdealState"); - LOGGER.info("For rebalanceId: {}, finished rebalancing table: {} with downtime in {}ms.", rebalanceJobId, - tableNameWithType, System.currentTimeMillis() - startTimeMs); + String msg = + String.format("For rebalanceId: %s, finished rebalancing table: %s with downtime in %d ms.", rebalanceJobId, + tableNameWithType, System.currentTimeMillis() - startTimeMs); + LOGGER.info(msg); + _tableRebalanceObserver.onSuccess(msg); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not " + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while updating IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while updating IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -347,12 +360,10 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb if (minReplicasToKeepUpForNoDowntime >= 0) { // For non-negative value, use it as min available replicas if (minReplicasToKeepUpForNoDowntime >= numReplicas) { - String errorMsg = String.format( + onReturnFailure(String.format( "For rebalanceId: %s, Illegal config for minReplicasToKeepUpForNoDowntime: %d for table: %s, " + "must be less than number of replicas: %d, aborting the rebalance", rebalanceJobId, - minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas); - LOGGER.warn(errorMsg); - _tableRebalanceObserver.onError(errorMsg); + minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -446,11 +457,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { - String errorMsg = String.format( + onReturnFailure(String.format( "For rebalanceId: %s, caught exception while re-calculating the target assignment for table: %s, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType); - LOGGER.warn(errorMsg, e); - _tableRebalanceObserver.onError(errorMsg); + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -513,11 +522,8 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb LOGGER.info("For rebalanceId: {}, version changed while updating IdealState for table: {}", rebalanceJobId, tableNameWithType); } catch (Exception e) { - String errorMsg = String.format( - "For rebalanceId: %s, caught exception while updating IdealState for table: %s, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType); - LOGGER.warn(errorMsg, e); - _tableRebalanceObserver.onError(errorMsg); + onReturnFailure(String.format("For rebalanceId: %s, caught exception while updating IdealState for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -525,6 +531,15 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb } } + private void onReturnFailure(String errorMsg, Exception e) { + if (e != null) { + LOGGER.warn(errorMsg, e); + } else { + LOGGER.warn(errorMsg); + } + _tableRebalanceObserver.onError(errorMsg); + } + /** * Gets the instance partitions for instance partition types and also returns a boolean for whether they are unchanged */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 8386544f3c44..b2ea9f8e69cf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -121,6 +121,16 @@ private void updateOnStart(Map> currentState, _tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis()); } + @Override + public void onNoop(String msg) { + _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0); + long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000L; + _tableRebalanceProgressStats.setCompletionStatusMsg(msg); + _tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds); + _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.NO_OP); + trackStatsInZk(); + } + @Override public void onSuccess(String msg) { Preconditions.checkState(RebalanceResult.Status.DONE != _tableRebalanceProgressStats.getStatus(), @@ -140,7 +150,7 @@ public void onSuccess(String msg) { @Override public void onError(String errorMsg) { _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0); - long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000; + long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000L; _tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds); _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED); _tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);