Skip to content

Commit

Permalink
Add factory and make all test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Nov 19, 2024
1 parent 24765e0 commit a4b24d9
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
Expand Down Expand Up @@ -489,9 +490,12 @@ private void setUpPinotController() {
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);

SegmentCompletionConfig segmentCompletionConfig = new SegmentCompletionConfig(_config);

_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), segmentCompletionConfig);

_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import java.util.HashMap;
Expand Down Expand Up @@ -41,7 +59,7 @@
*
* See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
*/
class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
public static final Logger LOGGER = LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class);
// We will have some variation between hosts, so we add 10% to the max hold time to pick a winner.
// If there is more than 10% variation, then it is handled as an error case (i.e. the first few to
Expand Down Expand Up @@ -83,7 +101,7 @@ class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
private final String _controllerVipUrl;

// Ctor that starts the FSM in HOLDING state
private BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, SegmentZKMetadata segmentMetadata,
String msgType) {
_segmentName = segmentName;
Expand Down Expand Up @@ -119,7 +137,8 @@ private BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManag

if (segmentMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
String rawTableName = segmentName.getTableName();
TableConfig tableConfig = _segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
TableConfig tableConfig =
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
StreamPartitionMsgOffsetFactory factory =
Expand All @@ -131,7 +150,6 @@ private BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManag
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
_state = SegmentCompletionFSMState.PARTIAL_CONSUMING;
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.util.Map;
import org.apache.pinot.spi.env.PinotConfiguration;


public class SegmentCompletionConfig {
private final Map<String, String> fsmSchemes = new HashMap<>();
private final Map<String, String> _fsmSchemes = new HashMap<>();

public SegmentCompletionConfig(Properties properties) {
public SegmentCompletionConfig(PinotConfiguration configuration) {
// Parse properties to extract FSM schemes
// Assuming properties keys are in the format scheme=className
for (String key : properties.stringPropertyNames()) {
for (String key : configuration.getKeys()) {
if (key.startsWith("fsm.scheme.")) {
String scheme = key.substring("fsm.scheme.".length());
String className = properties.getProperty(key);
fsmSchemes.put(scheme, className);
String className = configuration.getProperty(key);
_fsmSchemes.put(scheme, className);
}
}
}

public Map<String, String> getFsmSchemes() {
return fsmSchemes;
return _fsmSchemes;
}
}

Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
Expand All @@ -7,10 +25,18 @@

public interface SegmentCompletionFSM {
boolean isDone();
SegmentCompletionProtocol.Response segmentConsumed(String instanceId, StreamPartitionMsgOffset offset, String stopReason);

SegmentCompletionProtocol.Response segmentConsumed(String instanceId, StreamPartitionMsgOffset offset,
String stopReason);

SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, StreamPartitionMsgOffset offset);
SegmentCompletionProtocol.Response stoppedConsuming(String instanceId, StreamPartitionMsgOffset offset, String reason);
SegmentCompletionProtocol.Response extendBuildTime(String instanceId, StreamPartitionMsgOffset offset, int extTimeSec);
SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor);
}

SegmentCompletionProtocol.Response stoppedConsuming(String instanceId, StreamPartitionMsgOffset offset,
String reason);

SegmentCompletionProtocol.Response extendBuildTime(String instanceId, StreamPartitionMsgOffset offset,
int extTimeSec);

SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor);
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

// SegmentCompletionFSMFactory.java
import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;

public class SegmentCompletionFSMFactory {
private SegmentCompletionFSMFactory() {
// Private constructor to prevent instantiation
}

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final String CLASS = "class";
private static final Map<String, SegmentCompletionFSMCreator> FSM_CREATOR_MAP = new HashMap<>();
private static final Map<String, Constructor<? extends SegmentCompletionFSM>> FSM_CONSTRUCTOR_MAP = new HashMap<>();

/**
* Functional interface for creating FSM instances.
*/
@FunctionalInterface
public interface SegmentCompletionFSMCreator {
SegmentCompletionFSM create(SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType);
// Static block to register the default FSM
static {
try {
Class<?> clazz = Class.forName("org.apache.pinot.controller.helix.core.realtime.BlockingSegmentCompletionFSM");
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register("BlockingSegmentCompletionFSMFactory", (Constructor<? extends SegmentCompletionFSM>) constructor);
LOGGER.info("Registered default BlockingSegmentCompletionFSM");
} catch (Exception e) {
LOGGER.error("Failed to register default BlockingSegmentCompletionFSM", e);
throw new RuntimeException("Failed to register default BlockingSegmentCompletionFSM", e);
}
}

/**
* Registers an FSM creator with a specific scheme/type.
* Registers an FSM constructor with a specific scheme/type.
*
* @param scheme The scheme or type key.
* @param creator The creator instance.
* @param constructor The constructor for the FSM.
*/
public static void register(String scheme, SegmentCompletionFSMCreator creator) {
public static void register(String scheme, Constructor<? extends SegmentCompletionFSM> constructor) {
Preconditions.checkNotNull(scheme, "Scheme cannot be null");
Preconditions.checkNotNull(creator, "FSM Creator cannot be null");
if (FSM_CREATOR_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM creator for scheme {}", scheme);
Preconditions.checkNotNull(constructor, "FSM Constructor cannot be null");
if (FSM_CONSTRUCTOR_MAP.containsKey(scheme)) {
LOGGER.warn("Overwriting existing FSM constructor for scheme {}", scheme);
}
FSM_CREATOR_MAP.put(scheme, creator);
LOGGER.info("Registered SegmentCompletionFSM creator for scheme {}", scheme);
FSM_CONSTRUCTOR_MAP.put(scheme, constructor);
LOGGER.info("Registered SegmentCompletionFSM constructor for scheme {}", scheme);
}

/**
Expand All @@ -52,18 +76,23 @@ public static void register(String scheme, SegmentCompletionFSMCreator creator)
* @param fsmFactoryConfig The configuration object containing FSM schemes and classes.
*/
public static void init(SegmentCompletionConfig fsmFactoryConfig) {
// Assuming SegmentCompletionConfig is a wrapper around configuration properties
Map<String, String> schemesConfig = fsmFactoryConfig.getFsmSchemes();
for (Map.Entry<String, String> entry : schemesConfig.entrySet()) {
String scheme = entry.getKey();
String className = entry.getValue();
try {
LOGGER.info("Initializing SegmentCompletionFSM for scheme {}, classname {}", scheme, className);
Class<?> clazz = Class.forName(className);
SegmentCompletionFSMCreator creator = (SegmentCompletionFSMCreator) clazz.getDeclaredConstructor().newInstance();
register(scheme, creator);
Constructor<?> constructor = clazz.getConstructor(
PinotLLCRealtimeSegmentManager.class,
SegmentCompletionManager.class,
LLCSegmentName.class,
SegmentZKMetadata.class,
String.class
);
register(scheme, (Constructor<? extends SegmentCompletionFSM>) constructor);
} catch (Exception e) {
LOGGER.error("Could not instantiate FSM for class {} with scheme {}", className, scheme, e);
LOGGER.error("Could not register FSM constructor for class {} with scheme {}", className, scheme, e);
throw new RuntimeException(e);
}
}
Expand All @@ -78,17 +107,22 @@ public static void init(SegmentCompletionConfig fsmFactoryConfig) {
* @param llcSegmentName The segment name.
* @param segmentMetadata The segment metadata.
* @param msgType The message type.
* @return An instance of SegmentCompletionFSMInterface.
* @return An instance of SegmentCompletionFSM.
*/
public static SegmentCompletionFSM createFSM(String scheme,
SegmentCompletionManager manager,
PinotLLCRealtimeSegmentManager segmentManager,
LLCSegmentName llcSegmentName,
SegmentZKMetadata segmentMetadata,
String msgType) {
SegmentCompletionFSMCreator creator = FSM_CREATOR_MAP.get(scheme);
Preconditions.checkState(creator != null, "No FSM registered for scheme: " + scheme);
return creator.create(manager, segmentManager, llcSegmentName, segmentMetadata, msgType);
Constructor<? extends SegmentCompletionFSM> constructor = FSM_CONSTRUCTOR_MAP.get(scheme);
Preconditions.checkState(constructor != null, "No FSM registered for scheme: " + scheme);
try {
return constructor.newInstance(segmentManager, manager, llcSegmentName, segmentMetadata, msgType);
} catch (Exception e) {
LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
throw new RuntimeException(e);
}
}

/**
Expand All @@ -98,15 +132,13 @@ public static SegmentCompletionFSM createFSM(String scheme,
* @return True if supported, false otherwise.
*/
public static boolean isFactoryTypeSupported(String factoryType) {
return FSM_CREATOR_MAP.containsKey(factoryType);
return FSM_CONSTRUCTOR_MAP.containsKey(factoryType);
}

/**
* Shuts down all registered FSMs if needed.
* (Implement if FSMs require shutdown logic)
* Clears all registered FSM constructors.
*/
public static void shutdown() {
// Implement shutdown logic if FSMs have resources to release
FSM_CONSTRUCTOR_MAP.clear();
}
}

Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.realtime;

public enum SegmentCompletionFSMState {
Expand Down
Loading

0 comments on commit a4b24d9

Please sign in to comment.