Skip to content

Commit

Permalink
fix compile issues, update config naming
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Jul 8, 2024
1 parent a7d01c1 commit a0361bc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,30 @@
import lombok.NonNull;
import lombok.Value;

import com.google.common.base.Preconditions;

import org.apache.xtable.model.sync.SyncMode;

@Value
@Builder(toBuilder = true)
public class TableSyncConfig {
// TODO add docs for all fields
@NonNull SourceTable sourceTable;
List<TargetTable> targetTables;
@Builder.Default SyncMode syncMode = SyncMode.INCREMENTAL;
@Builder.Default Map<String, String> properties = Collections.emptyMap();
SyncMode syncMode;
Map<String, String> properties;

@Builder
TableSyncConfig(
@NonNull SourceTable sourceTable,
List<TargetTable> targetTables,
SyncMode syncMode,
Map<String, String> properties) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
Preconditions.checkArgument(
targetTables != null && !targetTables.isEmpty(),
"Please provide at-least one format to sync");
this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
this.properties = properties == null ? Collections.emptyMap() : properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
@Value
public class HudiSourceConfig {
public static final String PARTITION_SPEC_EXTRACTOR_CLASS =
"onetable.hudi.source.partition_spec_extractor_class";
"xtable.hudi.source.partition_spec_extractor_class";
public static final String PARTITION_FIELD_SPEC_CONFIG =
"onetable.hudi.source.partition_field_spec_config";
"xtable.hudi.source.partition_field_spec_config";

String partitionSpecExtractorClass;
List<PartitionFieldSpec> partitionFieldSpecs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ private static TableSyncConfig getTableSyncConfig(
String tableName,
GenericTable table,
List<String> targetTableFormats,
String oneTablePartitionConfig,
String partitionConfig,
Integer metadataRetentionInHours) {
SourceTable sourceTable =
SourceTable.builder()
Expand All @@ -876,7 +876,7 @@ private static TableSyncConfig getTableSyncConfig(
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(syncMode)
.properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, oneTablePartitionConfig))
.properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, partitionConfig))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.xtable.hudi.sync;

import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -35,9 +38,10 @@
import org.apache.hudi.sync.common.HoodieSyncTool;

import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.conversion.TableSyncConfig;
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.hudi.HudiConversionSourceProvider;
import org.apache.xtable.hudi.HudiSourceConfigImpl;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
Expand All @@ -54,7 +58,7 @@ public XTableSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
this.config = new XTableSyncConfig(props);
this.hudiConversionSourceProvider = new HudiConversionSourceProvider();
hudiConversionSourceProvider.init(hadoopConf, Collections.emptyMap());
hudiConversionSourceProvider.init(hadoopConf);
}

@Override
Expand All @@ -65,18 +69,27 @@ public void syncHoodieTable() {
.collect(Collectors.toList());
String basePath = config.getString(HoodieSyncConfig.META_SYNC_BASE_PATH);
String tableName = config.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
SourceTable sourceTable = SourceTable.builder().name(tableName).metadataPath(basePath).build();
List<TargetTable> targetTables =
formatsToSync.stream()
.map(
format ->
TargetTable.builder()
.metadataPath(basePath)
.metadataRetention(
Duration.ofHours(
config.getInt(
XTableSyncConfig.ONE_TABLE_TARGET_METADATA_RETENTION_HOURS)))
.formatName(format)
.build())
.collect(Collectors.toList());
TableSyncConfig tableSyncConfig =
TableSyncConfigImpl.builder()
.tableName(tableName)
.tableBasePath(basePath)
.targetTableFormats(formatsToSync)
.hudiSourceConfig(
HudiSourceConfigImpl.builder()
.partitionFieldSpecConfig(getPartitionSpecConfig())
.build())
TableSyncConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
.targetMetadataRetentionInHours(
config.getInt(XTableSyncConfig.ONE_TABLE_TARGET_METADATA_RETENTION_HOURS))
.properties(
Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, getPartitionSpecConfig()))
.build();
Map<String, SyncResult> results =
new ConversionController(hadoopConf).sync(tableSyncConfig, hudiConversionSourceProvider);
Expand All @@ -86,7 +99,7 @@ public void syncHoodieTable() {
entry ->
entry.getValue().getStatus().getStatusCode()
!= SyncResult.SyncStatusCode.SUCCESS)
.map(entry -> entry.getKey().toString())
.map(Map.Entry::getKey)
.collect(Collectors.joining(","));
if (!failingFormats.isEmpty()) {
throw new HoodieException("Unable to sync to InternalTable for formats: " + failingFormats);
Expand Down

0 comments on commit a0361bc

Please sign in to comment.