Skip to content

Commit

Permalink
More refactoring plus removed hardcoded
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Dec 11, 2024
1 parent 705cdc3 commit 25340aa
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {

public final Logger _logger;

SegmentCompletionFSMState _state = SegmentCompletionFSMState.HOLDING; // Typically start off in HOLDING state.
BlockingSegmentCompletionFSMState _state = BlockingSegmentCompletionFSMState.HOLDING;
// Typically start off in HOLDING state.
final long _startTimeMs;
private final LLCSegmentName _segmentName;
private final String _rawTableName;
Expand Down Expand Up @@ -135,11 +136,11 @@ public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManage
StreamPartitionMsgOffsetFactory factory =
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset());
_state = SegmentCompletionFSMState.COMMITTED;
_state = BlockingSegmentCompletionFSMState.COMMITTED;
_winningOffset = endOffset;
_winner = "UNKNOWN";
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
_state = SegmentCompletionFSMState.PARTIAL_CONSUMING;
_state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING;
}
}

Expand All @@ -151,7 +152,8 @@ public String toString() {

// SegmentCompletionManager releases the FSM from the hashtable when it is done.
public boolean isDone() {
return _state.equals(SegmentCompletionFSMState.COMMITTED) || _state.equals(SegmentCompletionFSMState.ABORTED);
return _state.equals(BlockingSegmentCompletionFSMState.COMMITTED) || _state.equals(
BlockingSegmentCompletionFSMState.ABORTED);
}

/*
Expand Down Expand Up @@ -333,7 +335,7 @@ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProt
return abortAndReturnFailed();
}
_logger.info("Processing segmentCommitEnd({}, {})", instanceId, offset);
if (!_state.equals(SegmentCompletionFSMState.COMMITTER_UPLOADING) || !instanceId.equals(_winner)
if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING) || !instanceId.equals(_winner)
|| offset.compareTo(_winningOffset) != 0) {
// State changed while we were out of sync. return a failed commit.
_logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state,
Expand Down Expand Up @@ -400,14 +402,14 @@ private SegmentCompletionProtocol.Response hold(String instanceId, StreamPartiti

private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId,
StreamPartitionMsgOffset offset) {
_state = SegmentCompletionFSMState.ABORTED;
_state = BlockingSegmentCompletionFSMState.ABORTED;
_segmentCompletionManager.getControllerMetrics()
.addMeteredTableValue(_rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return hold(instanceId, offset);
}

private SegmentCompletionProtocol.Response abortAndReturnFailed() {
_state = SegmentCompletionFSMState.ABORTED;
_state = BlockingSegmentCompletionFSMState.ABORTED;
_segmentCompletionManager.getControllerMetrics()
.addMeteredTableValue(_rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return SegmentCompletionProtocol.RESP_FAILED;
Expand All @@ -433,7 +435,7 @@ private SegmentCompletionProtocol.Response partialConsumingConsumed(String insta
// This is the first time we are getting segmentConsumed() for this segment.
// Some instance thinks we can close this segment, so go to HOLDING state, and process as normal.
// We will just be looking for less replicas.
_state = SegmentCompletionFSMState.HOLDING;
_state = BlockingSegmentCompletionFSMState.HOLDING;
return holdingConsumed(instanceId, offset, now, stopReason);
}

Expand Down Expand Up @@ -473,11 +475,11 @@ private SegmentCompletionProtocol.Response holdingConsumed(String instanceId, St
if (_winner.equals(instanceId)) {
_logger.info("{}:Committer notified winner instance={} offset={}", _state, instanceId, offset);
response = commit(instanceId, offset);
_state = SegmentCompletionFSMState.COMMITTER_NOTIFIED;
_state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
} else {
_logger.info("{}:Committer decided winner={} offset={}", _state, _winner, _winningOffset);
response = catchup(instanceId, offset);
_state = SegmentCompletionFSMState.COMMITTER_DECIDED;
_state = BlockingSegmentCompletionFSMState.COMMITTER_DECIDED;
}
} else {
response = hold(instanceId, offset);
Expand Down Expand Up @@ -517,7 +519,7 @@ private SegmentCompletionProtocol.Response committerDecidedConsumed(String insta
if (_winningOffset.compareTo(offset) == 0) {
_logger.info("{}:Notifying winner instance={} offset={}", _state, instanceId, offset);
response = commit(instanceId, offset);
_state = SegmentCompletionFSMState.COMMITTER_NOTIFIED;
_state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
} else {
// Winner coming back with a different offset.
_logger
Expand Down Expand Up @@ -578,7 +580,7 @@ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String inst
// Something seriously wrong. Abort the FSM
response = discard(instanceId, offset);
_logger.warn("{}:Aborting for instance={} offset={}", _state, instanceId, offset);
_state = SegmentCompletionFSMState.ABORTED;
_state = BlockingSegmentCompletionFSMState.ABORTED;
}
} else {
// Common case: A different instance is reporting.
Expand Down Expand Up @@ -608,7 +610,7 @@ private SegmentCompletionProtocol.Response committerNotifiedCommit(String instan
return response;
}
_logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset);
_state = SegmentCompletionFSMState.COMMITTER_UPLOADING;
_state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING;
long commitTimeMs = now - _startTimeMs;
if (commitTimeMs > _initialCommitTimeMs) {
// We assume that the commit time holds for all partitions. It is possible, though, that one partition
Expand Down Expand Up @@ -754,14 +756,14 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset =
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
if (!_state.equals(SegmentCompletionFSMState.COMMITTER_UPLOADING)) {
if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING)) {
// State changed while we were out of sync. return a failed commit.
_logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state,
_segmentName.getSegmentName(), _winner, _winningOffset);
return SegmentCompletionProtocol.RESP_FAILED;
}
_logger.info("Committing segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId);
_state = SegmentCompletionFSMState.COMMITTING;
_state = BlockingSegmentCompletionFSMState.COMMITTING;
// In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation,
// so we need to move the segment file to its permanent location first before committing the metadata.
// The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata
Expand Down Expand Up @@ -791,7 +793,7 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc
return SegmentCompletionProtocol.RESP_FAILED;
}

_state = SegmentCompletionFSMState.COMMITTED;
_state = BlockingSegmentCompletionFSMState.COMMITTED;
_logger.info("Committed segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId);
return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.realtime;

public enum SegmentCompletionFSMState {
public enum BlockingSegmentCompletionFSMState {
PARTIAL_CONSUMING, // Indicates that at least one replica has reported that it has stopped consuming.
HOLDING, // the segment has started finalizing.
COMMITTER_DECIDED, // We know who the committer will be, we will let them know next time they call segmentConsumed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@

public class SegmentCompletionConfig {
public static final String FSM_SCHEME = "pinot.controller.segment.completion.fsm.scheme.";
public static final String DEFAULT_FSM_SCHEME_KEY = "pinot.controller.segment.completion.fsm.scheme.default.";
public static final String DEFAULT_FSM_SCHEME = "BlockingSegmentCompletionFSM";
private final Map<String, String> _fsmSchemes = new HashMap<>();
private final String _defaultFsmScheme;

public SegmentCompletionConfig(PinotConfiguration configuration) {
// Parse properties to extract FSM schemes
Expand All @@ -37,9 +40,16 @@ public SegmentCompletionConfig(PinotConfiguration configuration) {
_fsmSchemes.put(scheme, className);
}
}

// Get the default FSM scheme
_defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_SCHEME);
}

public Map<String, String> getFsmSchemes() {
return _fsmSchemes;
}

public String getDefaultFsmScheme() {
return _defaultFsmScheme;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private SegmentCompletionFSMFactory() {
static {
try {
Class<?> clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM");
register("BlockingSegmentCompletionFSMFactory", (Class<? extends SegmentCompletionFSM>) clazz);
register("BlockingSegmentCompletionFSM", (Class<? extends SegmentCompletionFSM>) clazz);
LOGGER.info("Registered default BlockingSegmentCompletionFSM");
} catch (Exception e) {
LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class SegmentCompletionManager {
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final SegmentCompletionConfig _fsmFactoryConfig;
private final SegmentCompletionConfig _segmentCompletionConfig;


// Half hour max commit time for all segments
Expand All @@ -75,17 +75,17 @@ public static int getMaxCommitTimeForAllSegmentsSeconds() {

public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager,
int segmentCommitTimeoutSeconds, SegmentCompletionConfig fsmFactoryConfig) {
int segmentCommitTimeoutSeconds, SegmentCompletionConfig segmentCompletionConfig) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
_fsmFactoryConfig = fsmFactoryConfig;
_segmentCompletionConfig = segmentCompletionConfig;

// Initialize the FSM Factory
SegmentCompletionFSMFactory.init(_fsmFactoryConfig);
SegmentCompletionFSMFactory.init(_segmentCompletionConfig);
}

public String getControllerVipUrl() {
Expand Down Expand Up @@ -128,7 +128,7 @@ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String msg
Preconditions.checkState(segmentMetadata != null, "Failed to find ZK metadata for segment: %s", segmentName);
SegmentCompletionFSM fsm;

String factoryName = "BlockingSegmentCompletionFSMFactory";
String factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
"No FSM registered for name: " + factoryName);

Expand Down

0 comments on commit 25340aa

Please sign in to comment.