Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make segment completion FSM pluggable #14088

Merged
merged 12 commits into from
Dec 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
Expand Down Expand Up @@ -489,9 +490,12 @@ private void setUpPinotController() {
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);

SegmentCompletionConfig segmentCompletionConfig = new SegmentCompletionConfig(_config);

_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), segmentCompletionConfig);

_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;


public class SegmentCompletionConfig {
public static final String FSM_SCHEME =
CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION + ".fsm.scheme.";
public static final String DEFAULT_FSM_SCHEME_KEY =
CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION + ".fsm.scheme.default";
public static final String DEFAULT_FSM_SCHEME = "default";
private final Map<String, String> _fsmSchemes = new HashMap<>();
private final String _defaultFsmScheme;

public SegmentCompletionConfig(PinotConfiguration configuration) {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
// Parse properties to extract FSM schemes
// Assuming properties keys are in the format scheme=className
for (String key : configuration.getKeys()) {
if (key.startsWith(FSM_SCHEME)) {
String scheme = key.substring(FSM_SCHEME.length());
String className = configuration.getProperty(key);
_fsmSchemes.put(scheme, className);
}
}

// Get the default FSM scheme
_defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_SCHEME);
}

public Map<String, String> getFsmSchemes() {
return _fsmSchemes;
}

public String getDefaultFsmScheme() {
return _defaultFsmScheme;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;


/**
* Interface for managing the state machine transitions related to segment completion
* in a real-time table within Apache Pinot.
*
* The segment completion process is crucial for handling real-time ingestion, ensuring
* that segments are correctly built, committed, or discarded based on the server's state
* and the protocol defined for low-level consumer (LLC) tables. This interface abstracts
* the controller-side logic for responding to server events during the lifecycle of a
* segment's completion.
*/
public interface SegmentCompletionFSM {
KKcorps marked this conversation as resolved.
Show resolved Hide resolved

/**
* Initializes the FSM to its initial state based on the message type.
* @param msgType The message type that triggered the FSM initialization.
* This is sent by the server when triggering the commit.
* The current supported values are segmentConsumed, segmentCommit
* ,segmentCommitStart,segmentUpload,segmentCommitEnd,segmentCommitEndWithMetadata
* ,segmentStoppedConsuming,extendBuildTime
*
*/
void transitionToInitialState(String msgType);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved

/**
* Checks whether the segment completion process has completed.
*
* The process is considered complete when the segment has been either successfully
* committed or marked as aborted. This method helps determine if the FSM can be
* removed from memory.
*
* @return {@code true} if the FSM has reached a terminal state, {@code false} otherwise.
*/
boolean isDone();

/**
* Processes the event where a server indicates it has consumed up to a specified offset.
*
* This is typically triggered when a server finishes consuming data for a segment due
* to reaching a row limit, an end-of-partition signal, or another stopping condition.
* The FSM evaluates the reported offset and determines the next state or action for
* the server, such as holding, catching up, or committing the segment.
*
* @param instanceId The ID of the server instance reporting consumption.
* @param offset The offset up to which the server has consumed.
* @param stopReason The reason the server stopped consuming (e.g., row limit or end of partition).
* @return A response indicating the next action for the server (e.g., HOLD, CATCHUP, or COMMIT).
*/
SegmentCompletionProtocol.Response segmentConsumed(String instanceId, StreamPartitionMsgOffset offset,
String stopReason);

/**
* Processes the start of a segment commit from a server.
*
* This occurs when a server signals its intention to commit a segment it has built.
* The FSM verifies whether the server is eligible to commit based on its previous
* state and the reported offset, and transitions to a committing state if appropriate.
*
* @param instanceId The ID of the server instance attempting to commit.
* @param offset The offset being committed by the server.
* @return A response indicating the next action for the server (e.g., CONTINUE or FAILED).
*/
SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, StreamPartitionMsgOffset offset);

/**
* Handles the event where a server indicates it has stopped consuming.
*
* This is triggered when a server cannot continue consuming for a segment, potentially
* due to resource constraints, errors, or a manual stop. The FSM updates its state
* and determines whether the server can participate in subsequent actions for the segment.
*
* @param instanceId The ID of the server instance reporting the stopped consumption.
* @param offset The offset at which the server stopped consuming.
* @param reason The reason for stopping consumption (e.g., resource constraints or errors).
* @return A response indicating the next action for the server (e.g., PROCESSED or FAILED).
*/
SegmentCompletionProtocol.Response stoppedConsuming(String instanceId, StreamPartitionMsgOffset offset,
String reason);

/**
* Handles a request to extend the time allowed for segment building.
*
* If a server requires more time to build a segment, it can request an extension.
* The FSM evaluates the request in the context of the current state and the protocol's
* constraints, and either grants or denies the extension.
*
* @param instanceId The ID of the server instance requesting an extension.
* @param offset The offset at which the server is currently consuming.
* @param extTimeSec The additional time (in seconds) requested for segment building.
* @return A response indicating whether the extension was accepted or denied.
*/
SegmentCompletionProtocol.Response extendBuildTime(String instanceId, StreamPartitionMsgOffset offset,
int extTimeSec);

/**
* Processes the end of a segment commit from a server.
*
* This method is triggered when a server has completed uploading the segment and
* signals the end of the commit process. The FSM validates the commit, updates metadata,
* and finalizes the segment's state. Depending on the outcome, the segment is either
* successfully committed or the FSM transitions to an error state.
*
* @param reqParams The parameters of the commit request.
* @param committingSegmentDescriptor Metadata about the segment being committed.
* @return A response indicating whether the commit was successful or failed.
*/
SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
CommittingSegmentDescriptor committingSegmentDescriptor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentCompletionFSMFactory {
private SegmentCompletionFSMFactory() {
// Private constructor to prevent instantiation
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final Map<String, Class<? extends SegmentCompletionFSM>> FSM_CLASS_MAP = new HashMap<>();

// Static block to register the default FSM
static {
register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME, BlockingSegmentCompletionFSM.class);
}

/**
* Registers an FSM class with a specific scheme/type.
*
* @param scheme The scheme or type key.
* @param fsmClass The class for the FSM.
*/
public static void register(String scheme, Class<? extends SegmentCompletionFSM> fsmClass) {
Preconditions.checkNotNull(scheme, "Scheme cannot be null");
Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
Preconditions.checkState(FSM_CLASS_MAP.put(scheme, fsmClass) == null,
"FSM class already registered for scheme: " + scheme);
LOGGER.info("Registered SegmentCompletionFSM class {} for scheme {}", fsmClass, scheme);
}

/**
* Initializes the factory with configurations.
*
* @param fsmFactoryConfig The configuration object containing FSM schemes and classes.
*/
public static void init(SegmentCompletionConfig fsmFactoryConfig) {
Map<String, String> schemesConfig = fsmFactoryConfig.getFsmSchemes();
for (Map.Entry<String, String> entry : schemesConfig.entrySet()) {
String scheme = entry.getKey();
String className = entry.getValue();
try {
LOGGER.info("Initializing SegmentCompletionFSM for scheme {}, classname {}", scheme, className);
Class<?> clazz = Class.forName(className);
register(scheme, (Class<? extends SegmentCompletionFSM>) clazz);
} catch (Exception e) {
LOGGER.error("Could not register FSM class for class {} with scheme {}", className, scheme, e);
throw new RuntimeException(e);
}
}
}

/**
* Creates an FSM instance based on the scheme/type.
*
* @param scheme The scheme or type key.
* @param manager The SegmentCompletionManager instance.
* @param segmentManager The PinotLLCRealtimeSegmentManager instance.
* @param llcSegmentName The segment name.
* @param segmentMetadata The segment metadata.
* @return An instance of SegmentCompletionFSM.
*/
public static SegmentCompletionFSM createFSM(String scheme,
SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata) {
Class<? extends SegmentCompletionFSM> fsmClass = FSM_CLASS_MAP.get(scheme);
Preconditions.checkState(fsmClass != null, "No FSM registered for scheme: " + scheme);
try {
return fsmClass.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class
).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata);
} catch (Exception e) {
LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
throw new RuntimeException(e);
}
}

/**
* Checks if a scheme is supported.
*
* @param factoryType The scheme to check.
* @return True if supported, false otherwise.
*/
public static boolean isFactoryTypeSupported(String factoryType) {
return FSM_CLASS_MAP.containsKey(factoryType);
}

/**
* Clears all registered FSM classes.
*/
public static void shutdown() {
FSM_CLASS_MAP.clear();
}
}
Loading
Loading