Skip to content

Commit

Permalink
Simplify fsm factory by just storing class names instead of constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Nov 28, 2024
1 parent 3f0f6be commit 3b13978
Showing 1 changed file with 24 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
Expand All @@ -33,20 +32,13 @@ private SegmentCompletionFSMFactory() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final Map<String, Constructor<? extends SegmentCompletionFSM>> FSM_CONSTRUCTOR_MAP = new HashMap<>();
private static final Map<String, Class<? extends SegmentCompletionFSM>> FSM_CLASS_MAP = new HashMap<>();

// Static block to register the default FSM
static {
try {
Class<?> clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM");
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register("BlockingSegmentCompletionFSMFactory", (Constructor<? extends SegmentCompletionFSM>) constructor);
register("BlockingSegmentCompletionFSMFactory", (Class<? extends SegmentCompletionFSM>) clazz);
LOGGER.info("Registered default BlockingSegmentCompletionFSM");
} catch (Exception e) {
LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e);
Expand All @@ -55,19 +47,19 @@ private SegmentCompletionFSMFactory() {
}

/**
* Registers an FSM constructor with a specific scheme/type.
* Registers an FSM class with a specific scheme/type.
*
* @param scheme The scheme or type key.
* @param constructor The constructor for the FSM.
* @param fsmClass The class for the FSM.
*/
public static void register(String scheme, Constructor<? extends SegmentCompletionFSM> constructor) {
public static void register(String scheme, Class<? extends SegmentCompletionFSM> fsmClass) {
Preconditions.checkNotNull(scheme, "Scheme cannot be null");
Preconditions.checkNotNull(constructor, "FSM Constructor cannot be null");
if (FSM_CONSTRUCTOR_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM constructor for scheme {}", scheme);
Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
if (FSM_CLASS_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM class for scheme {}", scheme);
}
FSM_CONSTRUCTOR_MAP.put(scheme, constructor);
LOGGER.info("Registered SegmentCompletionFSM constructor for scheme {}", scheme);
FSM_CLASS_MAP.put(scheme, fsmClass);
LOGGER.info("Registered SegmentCompletionFSM class for scheme {}", scheme);
}

/**
Expand All @@ -83,16 +75,9 @@ public static void init(SegmentCompletionConfig fsmFactoryConfig) {
try {
LOGGER.info("Initializing SegmentCompletionFSM for scheme {}, classname {}", scheme, className);
Class<?> clazz = Class.forName(className);
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register(scheme, (Constructor<? extends SegmentCompletionFSM>) constructor);
register(scheme, (Class<? extends SegmentCompletionFSM>) clazz);
} catch (Exception e) {
LOGGER.error("Could not register FSM constructor for class {} with scheme {}", className, scheme, e);
LOGGER.error("Could not register FSM class for class {} with scheme {}", className, scheme, e);
throw new RuntimeException(e);
}
}
Expand All @@ -115,10 +100,16 @@ public static SegmentCompletionFSM createFSM(String scheme,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType) {
Constructor<? extends SegmentCompletionFSM> constructor = FSM_CONSTRUCTOR_MAP.get(scheme);
Preconditions.checkState(constructor != null, "No FSM registered for scheme: " + scheme);
Class<? extends SegmentCompletionFSM> fsmClass = FSM_CLASS_MAP.get(scheme);
Preconditions.checkState(fsmClass != null, "No FSM registered for scheme: " + scheme);
try {
return constructor.newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType);
return fsmClass.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType);
} catch (Exception e) {
LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
throw new RuntimeException(e);
Expand All @@ -132,13 +123,13 @@ public static SegmentCompletionFSM createFSM(String scheme,
* @return True if supported, false otherwise.
*/
public static boolean isFactoryTypeSupported(String factoryType) {
return FSM_CONSTRUCTOR_MAP.containsKey(factoryType);
return FSM_CLASS_MAP.containsKey(factoryType);
}

/**
* Clears all registered FSM constructors.
* Clears all registered FSM classes.
*/
public static void shutdown() {
FSM_CONSTRUCTOR_MAP.clear();
FSM_CLASS_MAP.clear();
}
}

0 comments on commit 3b13978

Please sign in to comment.