Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] upgrade to Hudi 0.15.0 #629

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<lombok.version>1.18.36</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.1</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<hudi.version>0.15.0</hudi.version>
<aws.version>2.29.40</aws.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import lombok.NonNull;
import lombok.Value;

import org.apache.hadoop.fs.Path;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -48,8 +46,8 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.storage.StoragePath;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalType;
Expand All @@ -64,7 +62,7 @@ public class BaseFileUpdatesExtractor {
Pattern.compile(
"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-[0-9]_[0-9a-fA-F-]+_[0-9]+\\.");
private final HoodieEngineContext engineContext;
private final Path tableBasePath;
private final StoragePath tableBasePath;

/**
* Extracts the changes between the snapshot files and the base files in the Hudi table currently.
Expand All @@ -91,7 +89,10 @@ ReplaceMetadata extractSnapshotChanges(
Set<String> partitionPathsToDrop =
new HashSet<>(
FSUtils.getAllPartitionPaths(
engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
engineContext,
metaClient.getStorage(),
metadataConfig,
metaClient.getBasePathV2().toString()));
ReplaceMetadata replaceMetadata =
partitionedDataFiles.stream()
.map(
Expand Down Expand Up @@ -173,7 +174,7 @@ ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull Strin
// For all removed files, group by partition and extract the file id
Map<String, List<String>> partitionToReplacedFileIds =
dataFilesDiff.getFilesRemoved().stream()
.map(file -> new CachingPath(file.getPhysicalPath()))
.map(file -> new StoragePath(file.getPhysicalPath()))
.collect(
Collectors.groupingBy(
path -> HudiPathUtils.getPartitionPath(tableBasePath, path),
Expand All @@ -186,7 +187,7 @@ ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull Strin
return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses);
}

private String getFileId(Path filePath) {
private String getFileId(StoragePath filePath) {
String fileName = filePath.getName();
// if file was created by Hudi use original fileId, otherwise use the file name as IDs
if (isFileCreatedByHudiWriter(fileName)) {
Expand All @@ -207,12 +208,12 @@ private boolean isFileCreatedByHudiWriter(String fileName) {
}

private WriteStatus toWriteStatus(
Path tableBasePath,
StoragePath tableBasePath,
String commitTime,
InternalDataFile file,
Optional<String> partitionPathOptional) {
WriteStatus writeStatus = new WriteStatus();
Path path = new CachingPath(file.getPhysicalPath());
StoragePath path = new StoragePath(file.getPhysicalPath());
String partitionPath =
partitionPathOptional.orElseGet(() -> HudiPathUtils.getPartitionPath(tableBasePath, path));
String fileId = getFileId(path);
Expand Down Expand Up @@ -273,8 +274,8 @@ private ReplaceMetadata combine(ReplaceMetadata other) {
}
}

private String getPartitionPath(Path tableBasePath, List<InternalDataFile> files) {
private String getPartitionPath(StoragePath tableBasePath, List<InternalDataFile> files) {
return HudiPathUtils.getPartitionPath(
tableBasePath, new CachingPath(files.get(0).getPhysicalPath()));
tableBasePath, new StoragePath(files.get(0).getPhysicalPath()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import org.apache.xtable.conversion.ConversionSourceProvider;
import org.apache.xtable.conversion.SourceTable;
Expand All @@ -35,7 +36,7 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider<Hoodi
public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder()
.setConf(hadoopConf)
.setConf(new HadoopStorageConfiguration(hadoopConf))
.setBasePath(sourceTable.getBasePath())
.setLoadActiveTimelineOnLoad(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.action.clean.CleanPlanner;

Expand Down Expand Up @@ -109,14 +111,14 @@ public HudiConversionTarget() {}
@VisibleForTesting
HudiConversionTarget(
TargetTable targetTable,
Configuration configuration,
StorageConfiguration<?> configuration,
int maxNumDeltaCommitsBeforeCompaction) {
this(
targetTable.getBasePath(),
(int) targetTable.getMetadataRetention().toHours(),
maxNumDeltaCommitsBeforeCompaction,
BaseFileUpdatesExtractor.of(
new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
new HoodieJavaEngineContext(configuration), new StoragePath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
Expand Down Expand Up @@ -168,9 +170,10 @@ public void init(TargetTable targetTable, Configuration configuration) {
(int) targetTable.getMetadataRetention().toHours(),
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
BaseFileUpdatesExtractor.of(
new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
new HoodieJavaEngineContext(new HadoopStorageConfiguration(configuration)),
new StoragePath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
HudiTableManager.of(new HadoopStorageConfiguration(configuration)),
CommitState::new);
}

Expand Down Expand Up @@ -369,7 +372,7 @@ public void commit() {
getNumInstantsToRetain(),
maxNumDeltaCommitsBeforeCompaction,
timelineRetentionInHours);
HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getHadoopConf());
HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getStorageConf());
try (HoodieJavaWriteClient<?> writeClient =
new HoodieJavaWriteClient<>(engineContext, writeConfig)) {
writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
Expand Down Expand Up @@ -494,7 +497,8 @@ private void markInstantsAsCleaned(
Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
cleanInfoPerPartition,
Collections.emptyList());
Collections.emptyList(),
Collections.emptyMap());
// create a clean instant and mark it as requested with the clean plan
HoodieInstant requestedCleanInstant =
new HoodieInstant(
Expand Down Expand Up @@ -524,7 +528,8 @@ private void markInstantsAsCleaned(
})
.collect(Collectors.toList());
HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats);
CleanerUtils.convertCleanMetadata(
cleanTime, Option.empty(), cleanStats, Collections.emptyMap());
// update the metadata table with the clean metadata so the files' metadata are marked for
// deletion
hoodieTableMetadataWriter.performTableServices(Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.NotSupportedException;
Expand All @@ -79,26 +80,26 @@ public class HudiDataFileExtractor implements AutoCloseable {
private final HudiFileStatsExtractor fileStatsExtractor;
private final HoodieMetadataConfig metadataConfig;
private final FileSystemViewManager fileSystemViewManager;
private final Path basePath;
private final StoragePath basePath;

public HudiDataFileExtractor(
HoodieTableMetaClient metaClient,
HudiPartitionValuesExtractor hudiPartitionValuesExtractor,
HudiFileStatsExtractor hudiFileStatsExtractor) {
this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
this.engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
metadataConfig =
HoodieMetadataConfig.newBuilder()
.enable(metaClient.getTableConfig().isMetadataTableAvailable())
.build();
this.basePath = metaClient.getBasePathV2();
this.tableMetadata =
metadataConfig.enabled()
? HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), true)
metadataConfig.isEnabled()
? HoodieTableMetadata.create(
engineContext, metaClient.getStorage(), metadataConfig, basePath.toString(), true)
: null;
this.fileSystemViewManager =
FileSystemViewManager.createViewManager(
engineContext,
metadataConfig,
FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.build(),
Expand All @@ -114,7 +115,8 @@ public List<PartitionFileGroup> getFilesCurrentState(InternalTable table) {
List<String> allPartitionPaths =
tableMetadata != null
? tableMetadata.getAllPartitionPaths()
: FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath.toString());
: FSUtils.getAllPartitionPaths(
engineContext, metaClient.getStorage(), metadataConfig, basePath.toString());
return getInternalDataFilesForPartitions(allPartitionPaths, table);
} catch (IOException ex) {
throw new ReadException(
Expand Down Expand Up @@ -402,9 +404,9 @@ private InternalDataFile buildFileWithoutStats(
.recordCount(rowCount)
.columnStats(Collections.emptyList())
.lastModified(
hoodieBaseFile.getFileStatus() == null
hoodieBaseFile.getPathInfo() == null
? 0L
: hoodieBaseFile.getFileStatus().getModificationTime())
: hoodieBaseFile.getPathInfo().getModificationTime())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;

import org.apache.hudi.avro.HoodieAvroUtils;
Expand All @@ -44,9 +43,9 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalField;
Expand Down Expand Up @@ -107,7 +106,7 @@ private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
return files.map(
file -> {
HudiFileStats fileStats =
computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap);
computeColumnStatsForFile(new StoragePath(file.getPhysicalPath()), nameFieldMap);
return file.toBuilder()
.columnStats(fileStats.getColumnStats())
.recordCount(fileStats.getRowCount())
Expand All @@ -116,7 +115,7 @@ private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
}

private Pair<String, String> getPartitionAndFileName(String path) {
Path filePath = new CachingPath(path);
StoragePath filePath = new StoragePath(path);
String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath);
return Pair.of(partitionPath, filePath.getName());
}
Expand Down Expand Up @@ -176,10 +175,10 @@ private Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) {
}

private HudiFileStats computeColumnStatsForFile(
Path filePath, Map<String, InternalField> nameFieldMap) {
StoragePath filePath, Map<String, InternalField> nameFieldMap) {
List<HoodieColumnRangeMetadata<Comparable>> columnRanges =
UTILS.readRangeFromParquetMetadata(
metaClient.getHadoopConf(), filePath, new ArrayList<>(nameFieldMap.keySet()));
UTILS.readColumnStatsFromMetadata(
metaClient.getStorage(), filePath, new ArrayList<>(nameFieldMap.keySet()));
List<ColumnStat> columnStats =
columnRanges.stream()
.map(
Expand All @@ -188,7 +187,7 @@ private HudiFileStats computeColumnStatsForFile(
.collect(CustomCollectors.toList(columnRanges.size()));
Long rowCount = getMaxFromColumnStats(columnStats).orElse(null);
if (rowCount == null) {
rowCount = UTILS.getRowCount(metaClient.getHadoopConf(), filePath);
rowCount = UTILS.getRowCount(metaClient.getStorage(), filePath);
}
return new HudiFileStats(columnStats, rowCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.xtable.hudi;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.storage.StoragePath;

public class HudiPathUtils {
public static String getPartitionPath(Path tableBasePath, Path filePath) {
public static String getPartitionPath(StoragePath tableBasePath, StoragePath filePath) {
String fileName = filePath.getName();
String pathStr = filePath.toUri().getPath();
int startIndex = tableBasePath.toUri().getPath().length() + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.xtable.exception.UpdateException;
import org.apache.xtable.model.InternalTable;
Expand All @@ -53,7 +52,7 @@ class HudiTableManager {
private static final String COMPLEX_KEY_GENERATOR = "org.apache.hudi.keygen.ComplexKeyGenerator";
private static final String SIMPLE_KEY_GENERATOR = "org.apache.hudi.keygen.SimpleKeyGenerator";

private final Configuration configuration;
private final StorageConfiguration<?> configuration;

/**
* Loads the meta client for the table at the base path if it exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private static Stream<Arguments> provideArgsForPartitionTesting() {
HUDI,
Arrays.asList(ICEBERG, DELTA),
"timestamp_micros_nullable_field:TIMESTAMP,level:SIMPLE",
"timestamp_micros_nullable_field:DAY:yyyy/MM/dd,level:VALUE",
"timestamp_micros_nullable_field:DAY:yyyy-MM-dd,level:VALUE",
timestampAndLevelFilter)));
}

Expand Down
Loading
Loading