diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java index 6f648ad8b055..9e3cdc84fa69 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java @@ -73,7 +73,8 @@ public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM { public final Logger _logger; - SegmentCompletionFSMState _state = SegmentCompletionFSMState.HOLDING; // Typically start off in HOLDING state. + BlockingSegmentCompletionFSMState _state = BlockingSegmentCompletionFSMState.HOLDING; + // Typically start off in HOLDING state. final long _startTimeMs; private final LLCSegmentName _segmentName; private final String _rawTableName; @@ -135,11 +136,11 @@ public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManage StreamPartitionMsgOffsetFactory factory = _segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName); StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset()); - _state = SegmentCompletionFSMState.COMMITTED; + _state = BlockingSegmentCompletionFSMState.COMMITTED; _winningOffset = endOffset; _winner = "UNKNOWN"; } else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) { - _state = SegmentCompletionFSMState.PARTIAL_CONSUMING; + _state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING; } } @@ -151,7 +152,8 @@ public String toString() { // SegmentCompletionManager releases the FSM from the hashtable when it is done. public boolean isDone() { - return _state.equals(SegmentCompletionFSMState.COMMITTED) || _state.equals(SegmentCompletionFSMState.ABORTED); + return _state.equals(BlockingSegmentCompletionFSMState.COMMITTED) || _state.equals( + BlockingSegmentCompletionFSMState.ABORTED); } /* @@ -333,7 +335,7 @@ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProt return abortAndReturnFailed(); } _logger.info("Processing segmentCommitEnd({}, {})", instanceId, offset); - if (!_state.equals(SegmentCompletionFSMState.COMMITTER_UPLOADING) || !instanceId.equals(_winner) + if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING) || !instanceId.equals(_winner) || offset.compareTo(_winningOffset) != 0) { // State changed while we were out of sync. return a failed commit. _logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state, @@ -400,14 +402,14 @@ private SegmentCompletionProtocol.Response hold(String instanceId, StreamPartiti private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, StreamPartitionMsgOffset offset) { - _state = SegmentCompletionFSMState.ABORTED; + _state = BlockingSegmentCompletionFSMState.ABORTED; _segmentCompletionManager.getControllerMetrics() .addMeteredTableValue(_rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return hold(instanceId, offset); } private SegmentCompletionProtocol.Response abortAndReturnFailed() { - _state = SegmentCompletionFSMState.ABORTED; + _state = BlockingSegmentCompletionFSMState.ABORTED; _segmentCompletionManager.getControllerMetrics() .addMeteredTableValue(_rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return SegmentCompletionProtocol.RESP_FAILED; @@ -433,7 +435,7 @@ private SegmentCompletionProtocol.Response partialConsumingConsumed(String insta // This is the first time we are getting segmentConsumed() for this segment. // Some instance thinks we can close this segment, so go to HOLDING state, and process as normal. // We will just be looking for less replicas. - _state = SegmentCompletionFSMState.HOLDING; + _state = BlockingSegmentCompletionFSMState.HOLDING; return holdingConsumed(instanceId, offset, now, stopReason); } @@ -473,11 +475,11 @@ private SegmentCompletionProtocol.Response holdingConsumed(String instanceId, St if (_winner.equals(instanceId)) { _logger.info("{}:Committer notified winner instance={} offset={}", _state, instanceId, offset); response = commit(instanceId, offset); - _state = SegmentCompletionFSMState.COMMITTER_NOTIFIED; + _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED; } else { _logger.info("{}:Committer decided winner={} offset={}", _state, _winner, _winningOffset); response = catchup(instanceId, offset); - _state = SegmentCompletionFSMState.COMMITTER_DECIDED; + _state = BlockingSegmentCompletionFSMState.COMMITTER_DECIDED; } } else { response = hold(instanceId, offset); @@ -517,7 +519,7 @@ private SegmentCompletionProtocol.Response committerDecidedConsumed(String insta if (_winningOffset.compareTo(offset) == 0) { _logger.info("{}:Notifying winner instance={} offset={}", _state, instanceId, offset); response = commit(instanceId, offset); - _state = SegmentCompletionFSMState.COMMITTER_NOTIFIED; + _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED; } else { // Winner coming back with a different offset. _logger @@ -578,7 +580,7 @@ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String inst // Something seriously wrong. Abort the FSM response = discard(instanceId, offset); _logger.warn("{}:Aborting for instance={} offset={}", _state, instanceId, offset); - _state = SegmentCompletionFSMState.ABORTED; + _state = BlockingSegmentCompletionFSMState.ABORTED; } } else { // Common case: A different instance is reporting. @@ -608,7 +610,7 @@ private SegmentCompletionProtocol.Response committerNotifiedCommit(String instan return response; } _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset); - _state = SegmentCompletionFSMState.COMMITTER_UPLOADING; + _state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING; long commitTimeMs = now - _startTimeMs; if (commitTimeMs > _initialCommitTimeMs) { // We assume that the commit time holds for all partitions. It is possible, though, that one partition @@ -754,14 +756,14 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); - if (!_state.equals(SegmentCompletionFSMState.COMMITTER_UPLOADING)) { + if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING)) { // State changed while we were out of sync. return a failed commit. _logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state, _segmentName.getSegmentName(), _winner, _winningOffset); return SegmentCompletionProtocol.RESP_FAILED; } _logger.info("Committing segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId); - _state = SegmentCompletionFSMState.COMMITTING; + _state = BlockingSegmentCompletionFSMState.COMMITTING; // In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation, // so we need to move the segment file to its permanent location first before committing the metadata. // The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata @@ -791,7 +793,7 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc return SegmentCompletionProtocol.RESP_FAILED; } - _state = SegmentCompletionFSMState.COMMITTED; + _state = BlockingSegmentCompletionFSMState.COMMITTED; _logger.info("Committed segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId); return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMState.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java similarity index 96% rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMState.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java index 1815f2e51c08..57bf5094460b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMState.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.controller.helix.core.realtime; -public enum SegmentCompletionFSMState { +public enum BlockingSegmentCompletionFSMState { PARTIAL_CONSUMING, // Indicates that at least one replica has reported that it has stopped consuming. HOLDING, // the segment has started finalizing. COMMITTER_DECIDED, // We know who the committer will be, we will let them know next time they call segmentConsumed() diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java index 439696865434..5f9242b67a7e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java @@ -25,7 +25,10 @@ public class SegmentCompletionConfig { public static final String FSM_SCHEME = "pinot.controller.segment.completion.fsm.scheme."; + public static final String DEFAULT_FSM_SCHEME_KEY = "pinot.controller.segment.completion.fsm.scheme.default."; + public static final String DEFAULT_FSM_SCHEME = "BlockingSegmentCompletionFSM"; private final Map _fsmSchemes = new HashMap<>(); + private final String _defaultFsmScheme; public SegmentCompletionConfig(PinotConfiguration configuration) { // Parse properties to extract FSM schemes @@ -37,9 +40,16 @@ public SegmentCompletionConfig(PinotConfiguration configuration) { _fsmSchemes.put(scheme, className); } } + + // Get the default FSM scheme + _defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_SCHEME); } public Map getFsmSchemes() { return _fsmSchemes; } + + public String getDefaultFsmScheme() { + return _defaultFsmScheme; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java index 63a753311ae8..cf637cb59b72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java @@ -38,7 +38,7 @@ private SegmentCompletionFSMFactory() { static { try { Class clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM"); - register("BlockingSegmentCompletionFSMFactory", (Class) clazz); + register("BlockingSegmentCompletionFSM", (Class) clazz); LOGGER.info("Registered default BlockingSegmentCompletionFSM"); } catch (Exception e) { LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 46464b5920da..af40f8d72fe3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -60,7 +60,7 @@ public class SegmentCompletionManager { private final PinotLLCRealtimeSegmentManager _segmentManager; private final ControllerMetrics _controllerMetrics; private final LeadControllerManager _leadControllerManager; - private final SegmentCompletionConfig _fsmFactoryConfig; + private final SegmentCompletionConfig _segmentCompletionConfig; // Half hour max commit time for all segments @@ -75,17 +75,17 @@ public static int getMaxCommitTimeForAllSegmentsSeconds() { public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, - int segmentCommitTimeoutSeconds, SegmentCompletionConfig fsmFactoryConfig) { + int segmentCommitTimeoutSeconds, SegmentCompletionConfig segmentCompletionConfig) { _helixManager = helixManager; _segmentManager = segmentManager; _controllerMetrics = controllerMetrics; _leadControllerManager = leadControllerManager; SegmentCompletionProtocol.setMaxSegmentCommitTimeMs( TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS)); - _fsmFactoryConfig = fsmFactoryConfig; + _segmentCompletionConfig = segmentCompletionConfig; // Initialize the FSM Factory - SegmentCompletionFSMFactory.init(_fsmFactoryConfig); + SegmentCompletionFSMFactory.init(_segmentCompletionConfig); } public String getControllerVipUrl() { @@ -128,7 +128,7 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg Preconditions.checkState(segmentMetadata != null, "Failed to find ZK metadata for segment: %s", segmentName); SegmentCompletionFSM fsm; - String factoryName = "BlockingSegmentCompletionFSMFactory"; + String factoryName = _segmentCompletionConfig.getDefaultFsmScheme(); Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName), "No FSM registered for name: " + factoryName);