From d009bac9b3a4d51247181efa813b5425611f5807 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Thu, 26 Dec 2024 18:46:08 -0800 Subject: [PATCH] Address comments --- .../xtable/conversion/ConversionConfig.java | 4 +++- .../conversion/ExternalCatalogConfig.java | 2 +- .../conversion/ConversionController.java | 11 ++++++++++ .../xtable/utilities/RunCatalogSync.java | 21 +++++++++++++++++++ 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 8d1e76974..63e9d6733 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -18,6 +18,7 @@ package org.apache.xtable.conversion; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,6 +31,7 @@ import org.apache.xtable.model.sync.SyncMode; @Value +@Builder public class ConversionConfig { // The source of the sync @NonNull SourceTable sourceTable; @@ -49,10 +51,10 @@ public class ConversionConfig { SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; - this.targetCatalogs = targetCatalogs; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); + this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs; this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java index 6cacd9076..b525d8318 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java @@ -59,7 +59,7 @@ public class ExternalCatalogConfig { String catalogConversionSourceImpl; /** - * The properties for each catalog, used for providing any custom behaviour during catalog sync + * The properties for this catalog, used for providing any custom behaviour during catalog sync */ @NonNull @Builder.Default Map catalogProperties = Collections.emptyMap(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index cd79ccb27..1db145ee6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -159,6 +159,17 @@ public Map syncTableAcrossCatalogs( } } + /** + * Synchronizes the given source table format metadata in ConversionConfig to multiple target + * formats. + * + * @param config A per table level config containing tableBasePath, partitionFieldSpecConfig, + * targetTableFormats and syncMode. + * @param source An extractor class for {@link ConversionSource} and allows fetching current + * snapshot or incremental table changes. + * @param syncMode sync mode is either FULL or INCREMENTAL. + * @return Returns a map containing the table format, and it's sync result. + */ private Map syncTableFormats( ConversionConfig config, ExtractFromSource source, SyncMode syncMode) { Map conversionTargetByFormat = diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 959217dff..2f60f33f6 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -42,6 +42,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -49,6 +50,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.apache.xtable.catalog.CatalogConversionFactory; +import org.apache.xtable.catalog.ExternalCatalogConfigFactory; import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; @@ -135,6 +137,7 @@ public static void main(String[] args) throws Exception { Map catalogsById = datasetConfig.getTargetCatalogs().stream() + .map(RunCatalogSync::populateCatalogImplementations) .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity())); CatalogConversionSource catalogConversionSource = CatalogConversionFactory.createCatalogConversionSource( @@ -234,6 +237,10 @@ static Map getConversionSourceProviders( return CONVERSION_SOURCE_PROVIDERS; } + /** + * Returns an implementation class for {@link CatalogTableIdentifier} based on the tableIdentifier + * provided by user. + */ static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier tableIdentifier) { if (tableIdentifier.getHierarchicalId() != null) { return ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( @@ -242,6 +249,20 @@ static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier tableIde throw new IllegalArgumentException("Invalid tableIdentifier configuration provided"); } + /** + * If user provides catalogType, we try to populate the implementation class if it exists in the + * class path. + */ + static ExternalCatalogConfig populateCatalogImplementations(ExternalCatalogConfig catalogConfig) { + if (!StringUtils.isEmpty(catalogConfig.getCatalogType())) { + return ExternalCatalogConfigFactory.fromCatalogType( + catalogConfig.getCatalogType(), + catalogConfig.getCatalogId(), + catalogConfig.getCatalogProperties()); + } + return catalogConfig; + } + @Data public static class DatasetConfig { /**