Skip to content

Commit

Permalink
Fix schemes
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Dec 12, 2024
1 parent 7c1e04f commit 57c5685
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
*/
public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
public static final Logger LOGGER = LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class);

public enum BlockingSegmentCompletionFSMState {
PARTIAL_CONSUMING, // Indicates that at least one replica has reported that it has stopped consuming.
HOLDING, // the segment has started finalizing.
COMMITTER_DECIDED, // We know who the committer will be, we will let them know next time they call segmentConsumed()
COMMITTER_NOTIFIED, // we notified the committer to commit.
COMMITTER_UPLOADING, // committer is uploading.
COMMITTING, // we are in the process of committing to zk
COMMITTED, // We already committed a segment.
ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
}

// We will have some variation between hosts, so we add 10% to the max hold time to pick a winner.
// If there is more than 10% variation, then it is handled as an error case (i.e. the first few to
// come in will have a winner, and the later ones will just download the segment)
Expand Down Expand Up @@ -97,10 +109,9 @@ public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
private long _maxTimeAllowedToCommitMs;
private final String _controllerVipUrl;

// Ctor that starts the FSM in HOLDING state
public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, SegmentZKMetadata segmentMetadata,
String msgType) {
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName,
SegmentZKMetadata segmentMetadata) {
_segmentName = segmentName;
_rawTableName = _segmentName.getTableName();
_realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
Expand Down Expand Up @@ -139,9 +150,24 @@ public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManage
_state = BlockingSegmentCompletionFSMState.COMMITTED;
_winningOffset = endOffset;
_winner = "UNKNOWN";
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
}
}

@Override
/**
* A new method that sets the initial FSM state based on the incoming message type.
*/
public void transitionToInitialState(String msgType) {
if (_state == BlockingSegmentCompletionFSMState.COMMITTED) {
// Already set; no need to do anything here.
return;
}

// If we receive a STOPPED_CONSUMING message before any others, switch to PARTIAL_CONSUMING
if (SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING.equals(msgType)) {
_state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING;
}
// Otherwise, we remain in HOLDING
}

@Override
Expand Down Expand Up @@ -324,7 +350,8 @@ public SegmentCompletionProtocol.Response extendBuildTime(final String instanceI
* the _winner.
*/
@Override
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, CommittingSegmentDescriptor committingSegmentDescriptor) {
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset =
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class SegmentCompletionConfig {
public static final String FSM_SCHEME = "pinot.controller.segment.completion.fsm.scheme.";
public static final String DEFAULT_FSM_SCHEME_KEY = "pinot.controller.segment.completion.fsm.scheme.default.";
public static final String DEFAULT_FSM_SCHEME = "BlockingSegmentCompletionFSM";
public static final String DEFAULT_FSM_SCHEME = "default";
private final Map<String, String> _fsmSchemes = new HashMap<>();
private final String _defaultFsmScheme;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@
public interface SegmentCompletionFSM {

/**
* Checks whether the segment completion process has completed.
*
* The process is considered complete when the segment has been either successfully
* committed or marked as aborted. This method helps determine if the FSM can be
* removed from memory.
*
* @return {@code true} if the FSM has reached a terminal state, {@code false} otherwise.
* Initializes the FSM to its initial state based on the message type.
* @param msgType
*/
void transitionToInitialState(String msgType);

/**
* Checks whether the segment completion process has completed.
*
* The process is considered complete when the segment has been either successfully
* committed or marked as aborted. This method helps determine if the FSM can be
* removed from memory.
*
* @return {@code true} if the FSM has reached a terminal state, {@code false} otherwise.
*/
boolean isDone();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ private SegmentCompletionFSMFactory() {

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final Map<String, Class<? extends SegmentCompletionFSM>> FSM_CLASS_MAP = new HashMap<>();
public static final String DEFAULT = "default";

// Static block to register the default FSM
static {
try {
Class<?> clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM");
register("BlockingSegmentCompletionFSM", (Class<? extends SegmentCompletionFSM>) clazz);
Class<?> clazz = Class.forName(BlockingSegmentCompletionFSM.class.getCanonicalName());
register(DEFAULT, (Class<? extends SegmentCompletionFSM>) clazz);
LOGGER.info("Registered default BlockingSegmentCompletionFSM");
} catch (Exception e) {
LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e);
Expand Down Expand Up @@ -91,25 +92,22 @@ public static void init(SegmentCompletionConfig fsmFactoryConfig) {
* @param segmentManager The PinotLLCRealtimeSegmentManager instance.
* @param llcSegmentName The segment name.
* @param segmentMetadata The segment metadata.
* @param msgType The message type.
* @return An instance of SegmentCompletionFSM.
*/
public static SegmentCompletionFSM createFSM(String scheme,
SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType) {
SegmentZKMetadata segmentMetadata) {
Class<? extends SegmentCompletionFSM> fsmClass = FSM_CLASS_MAP.get(scheme);
Preconditions.checkState(fsmClass != null, "No FSM registered for scheme: " + scheme);
try {
return fsmClass.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType);
SegmentZKMetadata.class
).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata);
} catch (Exception e) {
LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
"No FSM registered for name: " + factoryName);

fsm = SegmentCompletionFSMFactory.createFSM(factoryName, this, _segmentManager, llcSegmentName, segmentMetadata,
msgType);
fsm = SegmentCompletionFSMFactory.createFSM(factoryName, this, _segmentManager, llcSegmentName, segmentMetadata);
fsm.transitionToInitialState(msgType);

LOGGER.info("Created FSM {}", fsm);
return fsm;
Expand Down

0 comments on commit 57c5685

Please sign in to comment.