Skip to content

Commit

Permalink
Add factory and make all test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Sep 26, 2024
1 parent 125ec96 commit 3e68850
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
Expand Down Expand Up @@ -478,9 +479,12 @@ private void setUpPinotController() {
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);

SegmentCompletionConfig segmentCompletionConfig = new SegmentCompletionConfig(_config);

_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), segmentCompletionConfig);

_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
*/
class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
public static final Logger LOGGER = LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class);
// We will have some variation between hosts, so we add 10% to the max hold time to pick a winner.
// If there is more than 10% variation, then it is handled as an error case (i.e. the first few to
Expand Down Expand Up @@ -83,7 +83,7 @@ class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
private final String _controllerVipUrl;

// Ctor that starts the FSM in HOLDING state
private BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, SegmentZKMetadata segmentMetadata,
String msgType) {
_segmentName = segmentName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package org.apache.pinot.controller.helix.core.realtime;

import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.util.Map;
import org.apache.pinot.spi.env.PinotConfiguration;


public class SegmentCompletionConfig {
private final Map<String, String> fsmSchemes = new HashMap<>();

public SegmentCompletionConfig(Properties properties) {
public SegmentCompletionConfig(PinotConfiguration configuration) {
// Parse properties to extract FSM schemes
// Assuming properties keys are in the format scheme=className
for (String key : properties.stringPropertyNames()) {
for (String key : configuration.getKeys()) {
if (key.startsWith("fsm.scheme.")) {
String scheme = key.substring("fsm.scheme.".length());
String className = properties.getProperty(key);
String className = configuration.getProperty(key);
fsmSchemes.put(scheme, className);
}
}
Expand All @@ -23,4 +24,3 @@ public Map<String, String> getFsmSchemes() {
return fsmSchemes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ public interface SegmentCompletionFSM {
SegmentCompletionProtocol.Response extendBuildTime(String instanceId, StreamPartitionMsgOffset offset, int extTimeSec);
SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor);
}

Original file line number Diff line number Diff line change
@@ -1,49 +1,55 @@
package org.apache.pinot.controller.helix.core.realtime;

// SegmentCompletionFSMFactory.java
import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;

public class SegmentCompletionFSMFactory {
private SegmentCompletionFSMFactory() {
// Private constructor to prevent instantiation
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final String CLASS = "class";
private static final Map<String, SegmentCompletionFSMCreator> FSM_CREATOR_MAP = new HashMap<>();
private static final Map<String, Constructor<? extends SegmentCompletionFSM>> FSM_CONSTRUCTOR_MAP = new HashMap<>();

/**
* Functional interface for creating FSM instances.
*/
@FunctionalInterface
public interface SegmentCompletionFSMCreator {
SegmentCompletionFSM create(SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType);
// Static block to register the default FSM
static {
try {
Class<?> clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM");
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register("BlockingSegmentCompletionFSMFactory", (Constructor<? extends SegmentCompletionFSM>) constructor);
LOGGER.info("Registered default BlockingSegmentCompletionFSM");
} catch (Exception e) {
LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e);
throw new RuntimeException("Failed to register default BlockingSegmentCompletionFSM", e);
}
}

/**
* Registers an FSM creator with a specific scheme/type.
* Registers an FSM constructor with a specific scheme/type.
*
* @param scheme The scheme or type key.
* @param creator The creator instance.
* @param constructor The constructor for the FSM.
*/
public static void register(String scheme, SegmentCompletionFSMCreator creator) {
public static void register(String scheme, Constructor<? extends SegmentCompletionFSM> constructor) {
Preconditions.checkNotNull(scheme, "Scheme cannot be null");
Preconditions.checkNotNull(creator, "FSM Creator cannot be null");
if (FSM_CREATOR_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM creator for scheme {}", scheme);
Preconditions.checkNotNull(constructor, "FSM Constructor cannot be null");
if (FSM_CONSTRUCTOR_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM constructor for scheme {}", scheme);
}
FSM_CREATOR_MAP.put(scheme, creator);
LOGGER.info("Registered SegmentCompletionFSM creator for scheme {}", scheme);
FSM_CONSTRUCTOR_MAP.put(scheme, constructor);
LOGGER.info("Registered SegmentCompletionFSM constructor for scheme {}", scheme);
}

/**
Expand All @@ -52,18 +58,23 @@ public static void register(String scheme, SegmentCompletionFSMCreator creator)
* @param fsmFactoryConfig The configuration object containing FSM schemes and classes.
*/
public static void init(SegmentCompletionConfig fsmFactoryConfig) {
// Assuming SegmentCompletionConfig is a wrapper around configuration properties
Map<String, String> schemesConfig = fsmFactoryConfig.getFsmSchemes();
for (Map.Entry<String, String> entry : schemesConfig.entrySet()) {
String scheme = entry.getKey();
String className = entry.getValue();
try {
LOGGER.info("Initializing SegmentCompletionFSM for scheme {}, classname {}", scheme, className);
Class<?> clazz = Class.forName(className);
SegmentCompletionFSMCreator creator = (SegmentCompletionFSMCreator) clazz.getDeclaredConstructor().newInstance();
register(scheme, creator);
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register(scheme, (Constructor<? extends SegmentCompletionFSM>) constructor);
} catch (Exception e) {
LOGGER.error("Could not instantiate FSM for class {} with scheme {}", className, scheme, e);
LOGGER.error("Could not register FSM constructor for class {} with scheme {}", className, scheme, e);
throw new RuntimeException(e);
}
}
Expand All @@ -78,17 +89,22 @@ public static void init(SegmentCompletionConfig fsmFactoryConfig) {
* @param llcSegmentName The segment name.
* @param segmentMetadata The segment metadata.
* @param msgType The message type.
* @return An instance of SegmentCompletionFSMInterface.
* @return An instance of SegmentCompletionFSM.
*/
public static SegmentCompletionFSM createFSM(String scheme,
SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType) {
SegmentCompletionFSMCreator creator = FSM_CREATOR_MAP.get(scheme);
Preconditions.checkState(creator != null, "No FSM registered for scheme: " + scheme);
return creator.create(manager, segmentManager, llcSegmentName, segmentMetadata, msgType);
Constructor<? extends SegmentCompletionFSM> constructor = FSM_CONSTRUCTOR_MAP.get(scheme);
Preconditions.checkState(constructor != null, "No FSM registered for scheme: " + scheme);
try {
return constructor.newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType);
} catch (Exception e) {
LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
throw new RuntimeException(e);
}
}

/**
Expand All @@ -98,15 +114,13 @@ public static SegmentCompletionFSM createFSM(String scheme,
* @return True if supported, false otherwise.
*/
public static boolean isFactoryTypeSupported(String factoryType) {
return FSM_CREATOR_MAP.containsKey(factoryType);
return FSM_CONSTRUCTOR_MAP.containsKey(factoryType);
}

/**
* Shuts down all registered FSMs if needed.
* (Implement if FSMs require shutdown logic)
* Clears all registered FSM constructors.
*/
public static void shutdown() {
// Implement shutdown logic if FSMs have resources to release
FSM_CONSTRUCTOR_MAP.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,21 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -112,7 +105,7 @@ protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLC
return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
}

public long getCommitTime(String tableName) {
public Long getCommitTime(String tableName) {
return _commitTimeMap.get(tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
Expand Down Expand Up @@ -1421,7 +1422,7 @@ protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealti
boolean isLeader, ControllerMetrics controllerMetrics) {
super(helixManager, segmentManager, controllerMetrics,
new LeadControllerManager("localhost_1234", helixManager, controllerMetrics),
SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(), new SegmentCompletionConfig(new PinotConfiguration()));
_isLeader = isLeader;
}

Expand Down

0 comments on commit 3e68850

Please sign in to comment.