Skip to content

Commit

Permalink
Add multi stream ingestion support (#13790)
Browse files Browse the repository at this point in the history
* Add multi stream ingestion support

* Fix UT

* Fix issues, rebase and resolve comments

* Resolve comments

* Fix style

* Ensure transient exceptions do not prevent creating new consuming segments

Summary:
Ensure transient exceptions do not prevent creating new consuming segments. If some exception is hit, attempt to reconcile any successful fetches with partition group metadata.

This ensures consuming partitions are not dropped, and attempts to add and new partitions discovered successfully.

Test Plan:
After deployment, despite still some `TransientConsumerException`, no new missing consuming segments appear
{F1002071843}

{F1002071523}

Reviewers: gaoxin, tingchen

Reviewed By: gaoxin

JIRA Issues: EVA-8951

Differential Revision: https://code.uberinternal.com/D15748639

* Resolve comments for optimizing java doc

* Edit doc/comment

* Remove unrelated files

* Rebase and resolve conflicts

* Take the metadata fetch time change from the HEAD

* Resolve conflicts

---------

Co-authored-by: Christopher Peck <[email protected]>
  • Loading branch information
lnbest0707-uber and itschrispeck authored Dec 19, 2024
1 parent 6614bd8 commit 73abb21
Show file tree
Hide file tree
Showing 20 changed files with 367 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void init(PinotConfiguration pinotConfiguration)
// This executor service is used to do async tasks from multiget util or table rebalancing.
_executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d");
_tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
"tenant-rebalance-thread-%d");
"tenant-rebalance-thread-%d");
_tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService);
}

Expand All @@ -272,7 +272,7 @@ public void init(PinotConfiguration pinotConfiguration)
private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory)
: Executors.newFixedThreadPool(numThreadPool, threadFactory);
: Executors.newFixedThreadPool(numThreadPool, threadFactory);
}

private void inferHostnameIfNeeded(ControllerConf config) {
Expand Down Expand Up @@ -577,10 +577,12 @@ protected void configure() {
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
try {
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
streamConfigMap);
for (Map<String, String> streamConfigMap : streamConfigMaps) {
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
streamConfigMap);
}
} catch (Exception e) {
existingHlcTables.add(rt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -419,10 +420,11 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
numInvalidEndTime);

if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig)
).collect(Collectors.toList());
new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
streamConfig).findAndEmitMetrics(idealState);
streamConfigs).findAndEmitMetrics(idealState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
/**
* Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream,
* with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups.
* In particular, this method can also be used to fetch from multiple stream topics.
*
* Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
*
Expand All @@ -79,23 +80,24 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
* the collection of shards in partition group 1, should remain unchanged in the response,
* whereas shards 3,4 can be added to new partition groups if needed.
*
* @param streamConfig the streamConfig from the tableConfig
* @param streamConfigs the List of streamConfig from the tableConfig
* @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current
* partition groups.
* The size of this list is equal to the number of partition groups,
* and is created using the latest segment zk metadata.
*/
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList);
new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
} catch (Exception e) {
Exception fetcherException = partitionGroupMetadataFetcher.getException();
LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(),
streamConfig.getTableNameWithType(), fetcherException);
LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}",
streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
streamConfigs.get(0).getTableNameWithType(), fetcherException);
throw new RuntimeException(fetcherException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
Expand Down Expand Up @@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder {
private ControllerMetrics _controllerMetrics;

public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore,
ControllerMetrics controllerMetrics, StreamConfig streamConfig) {
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
_realtimeTableName = realtimeTableName;
_controllerMetrics = controllerMetrics;
_segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics);
_streamPartitionMsgOffsetFactory =
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();

// create partition group id to largest stream offset map
_partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
streamConfigs.stream().map(streamConfig -> {
streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
return streamConfig;
});
try {
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList())
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList())
.forEach(metadata -> {
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
});
} catch (Exception e) {
LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. "
LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. "
+ "Continue finding missing consuming segment only with ideal state information.",
streamConfig.getTopicName(), streamConfig.getTableNameWithType());
streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).collect(Collectors.toList()),
streamConfigs.get(0).getTableNameWithType());
}
}

Expand Down
Loading

0 comments on commit 73abb21

Please sign in to comment.