From 57c568500d2088765d26c3422671575cef0db15b Mon Sep 17 00:00:00 2001 From: Kartik Khare Date: Wed, 11 Dec 2024 19:18:54 +0530 Subject: [PATCH] Fix schemes --- .../BlockingSegmentCompletionFSM.java | 37 ++++++++++++++++--- .../BlockingSegmentCompletionFSMState.java | 30 --------------- .../realtime/SegmentCompletionConfig.java | 2 +- .../core/realtime/SegmentCompletionFSM.java | 20 ++++++---- .../realtime/SegmentCompletionFSMFactory.java | 14 +++---- .../realtime/SegmentCompletionManager.java | 4 +- 6 files changed, 54 insertions(+), 53 deletions(-) delete mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java index 3bfbba5eb4c1..98b360074c36 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java @@ -57,6 +57,18 @@ */ public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM { public static final Logger LOGGER = LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class); + + public enum BlockingSegmentCompletionFSMState { + PARTIAL_CONSUMING, // Indicates that at least one replica has reported that it has stopped consuming. + HOLDING, // the segment has started finalizing. + COMMITTER_DECIDED, // We know who the committer will be, we will let them know next time they call segmentConsumed() + COMMITTER_NOTIFIED, // we notified the committer to commit. + COMMITTER_UPLOADING, // committer is uploading. + COMMITTING, // we are in the process of committing to zk + COMMITTED, // We already committed a segment. + ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in. + } + // We will have some variation between hosts, so we add 10% to the max hold time to pick a winner. // If there is more than 10% variation, then it is handled as an error case (i.e. the first few to // come in will have a winner, and the later ones will just download the segment) @@ -97,10 +109,9 @@ public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM { private long _maxTimeAllowedToCommitMs; private final String _controllerVipUrl; - // Ctor that starts the FSM in HOLDING state public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, - SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, SegmentZKMetadata segmentMetadata, - String msgType) { + SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, + SegmentZKMetadata segmentMetadata) { _segmentName = segmentName; _rawTableName = _segmentName.getTableName(); _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(_rawTableName); @@ -139,9 +150,24 @@ public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManage _state = BlockingSegmentCompletionFSMState.COMMITTED; _winningOffset = endOffset; _winner = "UNKNOWN"; - } else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) { + } + } + + @Override + /** + * A new method that sets the initial FSM state based on the incoming message type. + */ + public void transitionToInitialState(String msgType) { + if (_state == BlockingSegmentCompletionFSMState.COMMITTED) { + // Already set; no need to do anything here. + return; + } + + // If we receive a STOPPED_CONSUMING message before any others, switch to PARTIAL_CONSUMING + if (SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING.equals(msgType)) { _state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING; } + // Otherwise, we remain in HOLDING } @Override @@ -324,7 +350,8 @@ public SegmentCompletionProtocol.Response extendBuildTime(final String instanceI * the _winner. */ @Override - public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, CommittingSegmentDescriptor committingSegmentDescriptor) { + public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, + CommittingSegmentDescriptor committingSegmentDescriptor) { String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java deleted file mode 100644 index 57bf5094460b..000000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSMState.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.realtime; - -public enum BlockingSegmentCompletionFSMState { - PARTIAL_CONSUMING, // Indicates that at least one replica has reported that it has stopped consuming. - HOLDING, // the segment has started finalizing. - COMMITTER_DECIDED, // We know who the committer will be, we will let them know next time they call segmentConsumed() - COMMITTER_NOTIFIED, // we notified the committer to commit. - COMMITTER_UPLOADING, // committer is uploading. - COMMITTING, // we are in the process of committing to zk - COMMITTED, // We already committed a segment. - ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in. -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java index 5f9242b67a7e..29aab8531eac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java @@ -26,7 +26,7 @@ public class SegmentCompletionConfig { public static final String FSM_SCHEME = "pinot.controller.segment.completion.fsm.scheme."; public static final String DEFAULT_FSM_SCHEME_KEY = "pinot.controller.segment.completion.fsm.scheme.default."; - public static final String DEFAULT_FSM_SCHEME = "BlockingSegmentCompletionFSM"; + public static final String DEFAULT_FSM_SCHEME = "default"; private final Map _fsmSchemes = new HashMap<>(); private final String _defaultFsmScheme; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java index 42caa0c754fc..1f44ae1ade74 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java @@ -36,14 +36,20 @@ public interface SegmentCompletionFSM { /** - * Checks whether the segment completion process has completed. - * - * The process is considered complete when the segment has been either successfully - * committed or marked as aborted. This method helps determine if the FSM can be - * removed from memory. - * - * @return {@code true} if the FSM has reached a terminal state, {@code false} otherwise. + * Initializes the FSM to its initial state based on the message type. + * @param msgType */ + void transitionToInitialState(String msgType); + + /** + * Checks whether the segment completion process has completed. + * + * The process is considered complete when the segment has been either successfully + * committed or marked as aborted. This method helps determine if the FSM can be + * removed from memory. + * + * @return {@code true} if the FSM has reached a terminal state, {@code false} otherwise. + */ boolean isDone(); /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java index cf637cb59b72..239c2052e9c7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java @@ -33,12 +33,13 @@ private SegmentCompletionFSMFactory() { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class); private static final Map> FSM_CLASS_MAP = new HashMap<>(); + public static final String DEFAULT = "default"; // Static block to register the default FSM static { try { - Class clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM"); - register("BlockingSegmentCompletionFSM", (Class) clazz); + Class clazz = Class.forName(BlockingSegmentCompletionFSM.class.getCanonicalName()); + register(DEFAULT, (Class) clazz); LOGGER.info("Registered default BlockingSegmentCompletionFSM"); } catch (Exception e) { LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e); @@ -91,15 +92,13 @@ public static void init(SegmentCompletionConfig fsmFactoryConfig) { * @param segmentManager The PinotLLCRealtimeSegmentManager instance. * @param llcSegmentName The segment name. * @param segmentMetadata The segment metadata. - * @param msgType The message type. * @return An instance of SegmentCompletionFSM. */ public static SegmentCompletionFSM createFSM(String scheme, SegmentCompletionManager manager, PinotLLCRealtimeSegmentManager segmentManager, LLCSegmentName llcSegmentName, - SegmentZKMetadata segmentMetadata, - String msgType) { + SegmentZKMetadata segmentMetadata) { Class fsmClass = FSM_CLASS_MAP.get(scheme); Preconditions.checkState(fsmClass != null, "No FSM registered for scheme: " + scheme); try { @@ -107,9 +106,8 @@ public static SegmentCompletionFSM createFSM(String scheme, PinotLLCRealtimeSegmentManager.class, SegmentCompletionManager.class, LLCSegmentName.class, - SegmentZKMetadata.class, - String.class - ).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType); + SegmentZKMetadata.class + ).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata); } catch (Exception e) { LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e); throw new RuntimeException(e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index af40f8d72fe3..10416eed0597 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -132,8 +132,8 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName), "No FSM registered for name: " + factoryName); - fsm = SegmentCompletionFSMFactory.createFSM(factoryName, this, _segmentManager, llcSegmentName, segmentMetadata, - msgType); + fsm = SegmentCompletionFSMFactory.createFSM(factoryName, this, _segmentManager, llcSegmentName, segmentMetadata); + fsm.transitionToInitialState(msgType); LOGGER.info("Created FSM {}", fsm); return fsm;