Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Hudi merged view files for partition path updates without compaction #24283

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public class HiveClientConfig
private boolean verboseRuntimeStatsEnabled;
private boolean useRecordPageSourceForCustomSplit = true;
private boolean hudiMetadataEnabled;
private String hudiTablesUseMergedView;

private boolean sizeBasedSplitWeightsEnabled = true;
private double minimumAssignedSplitWeight = 0.05;
Expand Down Expand Up @@ -1647,6 +1648,19 @@ public boolean isHudiMetadataEnabled()
return this.hudiMetadataEnabled;
}

@Config("hive.hudi-tables-use-merged-view")
@ConfigDescription("For Hudi tables prefer to fetch the list of files from the merged file system view")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the description should be more clear about the type of value it's looking for. The type is a string, so for users it should be more explicit what the value should be. When I read this I initially thought it should be a boolean.

Based on looking at the tests it looks like it needs a dot-separated name of schema and table? If so, it should be clear about which tables should be added here

Suggested change
@ConfigDescription("For Hudi tables prefer to fetch the list of files from the merged file system view")
@ConfigDescription("For Hudi tables, A comma-separated list in the form of <schema>.<table> which should prefer to fetch the list of files from the merged file system view")

public HiveClientConfig setHudiTablesUseMergedView(String hudiTablesUseMergedView)
{
this.hudiTablesUseMergedView = hudiTablesUseMergedView;
return this;
}

public String getHudiTablesUseMergedView()
{
return this.hudiTablesUseMergedView;
}

@Config("hive.quick-stats.enabled")
@ConfigDescription("Use quick stats to resolve stats")
public HiveClientConfig setQuickStatsEnabled(boolean quickStatsEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public final class HiveSessionProperties
public static final String MAX_INITIAL_SPLITS = "max_initial_splits";
public static final String FILE_SPLITTABLE = "file_splittable";
private static final String HUDI_METADATA_ENABLED = "hudi_metadata_enabled";
private static final String HUDI_TABLES_USE_MERGED_VIEW = "hudi_tables_use_merged_view";
private static final String READ_TABLE_CONSTRAINTS = "read_table_constraints";
public static final String PARALLEL_PARSING_OF_PARTITION_VALUES_ENABLED = "parallel_parsing_of_partition_values_enabled";
public static final String QUICK_STATS_ENABLED = "quick_stats_enabled";
Expand Down Expand Up @@ -608,6 +609,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"For Hudi tables prefer to fetch the list of file names, sizes and other metadata from the internal metadata table rather than storage",
hiveClientConfig.isHudiMetadataEnabled(),
false),
stringProperty(
HUDI_TABLES_USE_MERGED_VIEW,
"For Hudi tables, use merged view to read data",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the description here as well

hiveClientConfig.getHudiTablesUseMergedView(),
false),
booleanProperty(
PARALLEL_PARSING_OF_PARTITION_VALUES_ENABLED,
"Enables parallel parsing of partition values from partition names using thread pool",
Expand Down Expand Up @@ -1101,6 +1107,12 @@ public static boolean isHudiMetadataEnabled(ConnectorSession session)
return session.getProperty(HUDI_METADATA_ENABLED, Boolean.class);
}

public static String getHudiTablesUseMergedView(ConnectorSession session)
{
String hudiTablesUseMergedView = session.getProperty(HUDI_TABLES_USE_MERGED_VIEW, String.class);
return hudiTablesUseMergedView == null ? "" : hudiTablesUseMergedView;
}

public static boolean isReadTableConstraints(ConnectorSession session)
{
return session.getProperty(READ_TABLE_CONSTRAINTS, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.spi.ConnectorSession;
import com.google.common.base.Splitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -30,32 +31,43 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Stream;

import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
import static com.facebook.presto.hive.HiveSessionProperties.getHudiTablesUseMergedView;
import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataEnabled;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;

public class HudiDirectoryLister
implements DirectoryLister
{
private static final Logger log = Logger.get(HudiDirectoryLister.class);
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();

private final HoodieTableFileSystemView fileSystemView;
private final HoodieTableMetaClient metaClient;
private final boolean metadataEnabled;
private final String latestInstant;
private final boolean shouldUseMergedView;

public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table table)
{
log.info("Using Hudi Directory Lister.");
this.metadataEnabled = isHudiMetadataEnabled(session);
this.shouldUseMergedView = SPLITTER.splitToList(getHudiTablesUseMergedView(session)).contains(table.getSchemaTableName().toString());
Configuration actualConfig = ((CachingJobConf) conf).getConfig();
/*
WrapperJobConf acts as a wrapper on top of the actual Configuration object. If `hive.copy-on-first-write-configuration-enabled`
Expand All @@ -68,6 +80,12 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
.setConf(actualConfig)
.setBasePath(table.getStorage().getLocation())
.build();
this.latestInstant = metaClient.getActiveTimeline()
.getCommitsAndCompactionTimeline()
.filterCompletedInstants()
.filter(instant -> !HoodieTableType.MERGE_ON_READ.equals(metaClient.getTableType()) || instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
.lastInstant()
.map(HoodieInstant::getTimestamp).orElseThrow(() -> new RuntimeException("No active instant found"));
HoodieEngineContext engineContext = new HoodieLocalEngineContext(actualConfig);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(metadataEnabled)
Expand Down Expand Up @@ -96,7 +114,10 @@ public Iterator<HiveFileInfo> list(
fileSystemView,
metadataEnabled ? Optional.empty() : Optional.of(fileSystem.listStatus(p)),
table.getStorage().getLocation(),
p),
p,
metaClient.getTableType(),
latestInstant,
shouldUseMergedView),
namenodeStats,
hiveDirectoryContext.getNestedDirectoryPolicy(),
hiveDirectoryContext.isSkipEmptyFilesEnabled());
Expand All @@ -111,15 +132,26 @@ public HudiFileInfoIterator(
HoodieTableFileSystemView fileSystemView,
Optional<FileStatus[]> fileStatuses,
String tablePath,
Path directory)
Path directory,
HoodieTableType tableType,
String latestInstant,
boolean shouldUseMergedView)
{
String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
if (fileStatuses.isPresent()) {
fileSystemView.addFilesToView(fileStatuses.get());
this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
}
else {
this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
if (shouldUseMergedView) {
Stream<FileSlice> fileSlices = HoodieTableType.MERGE_ON_READ.equals(tableType) ?
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partition, latestInstant) :
fileSystemView.getLatestFileSlicesBeforeOrOn(partition, latestInstant, false);
this.hoodieBaseFileIterator = fileSlices.map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get).iterator();
}
else {
this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,10 +40,13 @@
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.TEST_CLIENT_TAGS;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -107,6 +112,71 @@ private Table getMockTable()
Optional.empty());
}

private Table getMockMORTableWithPartition()
{
return new Table(
"schema",
"hudi_mor_part_update",
"user",
EXTERNAL_TABLE,
new Storage(fromHiveStorageFormat(PARQUET),
getTableBasePath("hudi_mor_part_update"),
Optional.of(new HiveBucketProperty(
ImmutableList.of(),
1,
ImmutableList.of(),
HIVE_COMPATIBLE,
Optional.empty())),
false,
ImmutableMap.of(),
ImmutableMap.of()),
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty());
}

@Test
public void testDirectoryListerForMORTableWithPartitionUpdates()
throws IOException
{
Table mockTable = getMockMORTableWithPartition();
Configuration hadoopConf = getHadoopConfWithCopyOnFirstWriteDisabled();
try {
ConnectorSession session = new TestingConnectorSession(
getAllSessionProperties(
new HiveClientConfig()
.setHudiMetadataEnabled(true)
.setHudiTablesUseMergedView(mockTable.getSchemaTableName().toString()),
new HiveCommonClientConfig()),
TEST_CLIENT_TAGS);
HudiDirectoryLister directoryLister = new HudiDirectoryLister(hadoopConf, session, mockTable);
HoodieTableMetaClient metaClient = directoryLister.getMetaClient();
assertEquals(metaClient.getBasePath(), mockTable.getStorage().getLocation());
Path path = new Path(mockTable.getStorage().getLocation());
ExtendedFileSystem fs = (ExtendedFileSystem) path.getFileSystem(hadoopConf);
Iterator<HiveFileInfo> fileInfoIterator = directoryLister.list(fs, mockTable, path, Optional.empty(), new NamenodeStats(), new HiveDirectoryContext(
IGNORED,
false,
false,
new ConnectorIdentity("test", Optional.empty(), Optional.empty()),
ImmutableMap.of(),
new RuntimeStats()));
while (fileInfoIterator.hasNext()) {
HiveFileInfo fileInfo = fileInfoIterator.next();
String fileName = fileInfo.getPath().getName();
// expected to have the latest base file in p1 and p2 partitions
assertTrue(fileName.startsWith("37c2b860-eea6-4142-8bda-257b2562e4b4-0_1-338-594") || fileName.startsWith("7483ef07-d1f8-4d44-b9b0-cba6df5cd1b8-0_1-149-341"));
// not expected to have the older version of the base file in p1
assertFalse(fileName.startsWith("c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_0-32-192"));
}
}
finally {
hadoopConf = null;
}
}

@Test
public void testDirectoryListerForHudiTable()
throws IOException
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"partitionToWriteStats" : {
"p1" : [ {
"fileId" : "c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0",
"path" : "p1/c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_0-32-192_19700101000000000.parquet",
"cdcStats" : null,
"prevCommit" : "null",
"numWrites" : 4,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 4,
"totalWriteBytes" : 435904,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "p1",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 435904,
"minEventTime" : null,
"maxEventTime" : null,
"runtimeStats" : {
"totalScanTime" : 0,
"totalUpsertTime" : 0,
"totalCreateTime" : 1553
}
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"PayloadAdaptableRecord\",\"namespace\":\"hoodie\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"pt\",\"type\":\"string\"},{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"Op\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"_change_operation_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"_hoodie_is_deleted\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"_event_seq\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"_event_lsn\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
},
"operationType" : "UPSERT"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"partitionToWriteStats" : {
"p1" : [ {
"fileId" : "",
"path" : null,
"cdcStats" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 4,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null,
"runtimeStats" : null
} ]
},
"compacted" : false,
"extraMetadata" : { },
"operationType" : "UPSERT"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"partitionToWriteStats" : {
"p1" : [ {
"fileId" : "c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0",
"path" : "p1/.c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_19700101000000000.log.1_0-86-264",
"cdcStats" : null,
"prevCommit" : "19700101000000000",
"numWrites" : 4,
"numDeletes" : 0,
"numUpdateWrites" : 4,
"numInserts" : 0,
"totalWriteBytes" : 1413,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "p1",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 1413,
"minEventTime" : null,
"maxEventTime" : null,
"runtimeStats" : {
"totalScanTime" : 0,
"totalUpsertTime" : 109,
"totalCreateTime" : 0
},
"logVersion" : 1,
"logOffset" : 0,
"baseFile" : "c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_0-32-192_19700101000000000.parquet",
"logFiles" : [ ".c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_19700101000000000.log.1_0-86-264" ],
"recordsStats" : {
"val" : null
}
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"PayloadAdaptableRecord\",\"namespace\":\"hoodie\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"pt\",\"type\":\"string\"},{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"Op\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"_change_operation_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"_hoodie_is_deleted\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"_event_seq\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"_event_lsn\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
},
"operationType" : "UPSERT"
}
Loading
Loading