Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 27, 2024
1 parent eefbdd2 commit d009bac
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.xtable.conversion;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -30,6 +31,7 @@
import org.apache.xtable.model.sync.SyncMode;

@Value
@Builder
public class ConversionConfig {
// The source of the sync
@NonNull SourceTable sourceTable;
Expand All @@ -49,10 +51,10 @@ public class ConversionConfig {
SyncMode syncMode) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
this.targetCatalogs = targetCatalogs;
Preconditions.checkArgument(
targetTables != null && !targetTables.isEmpty(),
"Please provide at-least one format to sync");
this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs;
this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class ExternalCatalogConfig {
String catalogConversionSourceImpl;

/**
* The properties for each catalog, used for providing any custom behaviour during catalog sync
* The properties for this catalog, used for providing any custom behaviour during catalog sync
*/
@NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ public Map<String, SyncResult> syncTableAcrossCatalogs(
}
}

/**
* Synchronizes the given source table format metadata in ConversionConfig to multiple target
* formats.
*
* @param config A per table level config containing tableBasePath, partitionFieldSpecConfig,
* targetTableFormats and syncMode.
* @param source An extractor class for {@link ConversionSource} and allows fetching current
* snapshot or incremental table changes.
* @param syncMode sync mode is either FULL or INCREMENTAL.
* @return Returns a map containing the table format, and it's sync result.
*/
private <COMMIT> Map<String, SyncResult> syncTableFormats(
ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode syncMode) {
Map<String, ConversionTarget> conversionTargetByFormat =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import org.apache.xtable.catalog.CatalogConversionFactory;
import org.apache.xtable.catalog.ExternalCatalogConfigFactory;
import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
Expand Down Expand Up @@ -135,6 +137,7 @@ public static void main(String[] args) throws Exception {

Map<String, ExternalCatalogConfig> catalogsById =
datasetConfig.getTargetCatalogs().stream()
.map(RunCatalogSync::populateCatalogImplementations)
.collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity()));
CatalogConversionSource catalogConversionSource =
CatalogConversionFactory.createCatalogConversionSource(
Expand Down Expand Up @@ -234,6 +237,10 @@ static Map<String, ConversionSourceProvider> getConversionSourceProviders(
return CONVERSION_SOURCE_PROVIDERS;
}

/**
* Returns an implementation class for {@link CatalogTableIdentifier} based on the tableIdentifier
* provided by user.
*/
static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier tableIdentifier) {
if (tableIdentifier.getHierarchicalId() != null) {
return ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
Expand All @@ -242,6 +249,20 @@ static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier tableIde
throw new IllegalArgumentException("Invalid tableIdentifier configuration provided");
}

/**
* If user provides catalogType, we try to populate the implementation class if it exists in the
* class path.
*/
static ExternalCatalogConfig populateCatalogImplementations(ExternalCatalogConfig catalogConfig) {
if (!StringUtils.isEmpty(catalogConfig.getCatalogType())) {
return ExternalCatalogConfigFactory.fromCatalogType(
catalogConfig.getCatalogType(),
catalogConfig.getCatalogId(),
catalogConfig.getCatalogProperties());
}
return catalogConfig;
}

@Data
public static class DatasetConfig {
/**
Expand Down

0 comments on commit d009bac

Please sign in to comment.