Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Enabling mdt on read. triaging test getting hung #12345

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, fi
this.lockInfo = new LockInfo();
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(), configuration);
List<String> customSupportedFSs = lockConfiguration.getConfig().getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<>());
List<String> customSupportedFSs = lockConfiguration.getConfig().getStringList(HoodieCommonConfig.FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<>());
if (!customSupportedFSs.contains(this.storage.getScheme()) && !StorageSchemes.isAtomicCreationSupported(this.storage.getScheme())) {
throw new HoodieLockException("Unsupported scheme :" + this.storage.getScheme() + ", since this fs can not support atomic creation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ private String generateUniqueInstantTime(String initializationTime) {

private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws IOException {
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(), dataMetaClient,
Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
Option.empty());
final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " instead of the instant's commit time."
);

public static final ConfigProperty<String> HOODIE_FS_ATOMIC_CREATION_SUPPORT = ConfigProperty
public static final ConfigProperty<String> FS_ATOMIC_CREATION_SUPPORT = ConfigProperty
.key("hoodie.fs.atomic_creation.support")
.defaultValue("")
.markAdvanced()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");

public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false;
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;

// Enable metrics for internal Metadata Table
public static final ConfigProperty<Boolean> METRICS_ENABLE = ConfigProperty
Expand Down Expand Up @@ -219,6 +219,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "metadata table which are never added before. This config determines how to handle "
+ "such spurious deletes");

// FIXME-vc: should this be turned on?
public static final ConfigProperty<Boolean> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty
.key(METADATA_PREFIX + OPTIMIZED_LOG_BLOCKS_SCAN)
.defaultValue(false)
Expand Down Expand Up @@ -316,6 +317,9 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.withDocumentation("Initializes the metadata table by reading from the file system when the table is first created. Enabled by default. "
+ "Warning: This should only be disabled when manually constructing the metadata table outside of typical Hudi writer flows.");

// FIXME-vc: does this need to be turned on for having basic date-partitioned workloads working?
// Why do we need a config for this. We can't enable functional index via spark-ds or by other means. only way to enable functioanl index is via sql.
// So, I don't see a necessity for this config.
public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".index.functional.enable")
.defaultValue(false)
Expand Down Expand Up @@ -364,6 +368,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Enable secondary index within the Metadata Table.");

//FIXME-vc: this needs to be a list of columns? do we throw error if different columns are specifed each write?
public static final ConfigProperty<String> SECONDARY_INDEX_COLUMN = ConfigProperty
.key(METADATA_PREFIX + ".index.secondary.column")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1423,11 +1423,7 @@ private static Comparable<?> coerceToComparable(Schema schema, Object val) {
}

private static boolean canCompare(Schema schema, HoodieRecordType recordType) {
// if recordType is SPARK then we cannot compare RECORD and ARRAY types in addition to MAP type
if (recordType == HoodieRecordType.SPARK) {
return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
}
return schema.getType() != Schema.Type.MAP;
return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
}

public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
Expand Down Expand Up @@ -2291,9 +2287,11 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatsRecords(Ho
.filter(stats -> fileNames.contains(stats.getFileName()))
.map(HoodieColumnRangeMetadata::fromColumnStats)
.collectAsList();
// incase of shouldScanColStatsForTightBound = true, we compute stats for the partition of interest for all files from getLatestFileSlice() excluding current commit here
// already fileColumnMetadata contains stats for files from the current infliht commit. so, we are adding both together and sending it to collectAndProcessColumnMetadata
fileColumnMetadata.add(partitionColumnMetadata);
if (!partitionColumnMetadata.isEmpty()) {
// incase of shouldScanColStatsForTightBound = true, we compute stats for the partition of interest for all files from getLatestFileSlice() excluding current commit here
// already fileColumnMetadata contains stats for files from the current infliht commit. so, we are adding both together and sending it to collectAndProcessColumnMetadata
fileColumnMetadata.add(partitionColumnMetadata);
}
}

return collectAndProcessColumnMetadata(fileColumnMetadata, partitionName, shouldScanColStatsForTightBound).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static void configureCheckpointing(StreamExecutionEnvironment env) {
private static Map<String, String> createHudiOptions(String basePath) {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), "s3a");
options.put(HoodieCommonConfig.FS_ATOMIC_CREATION_SUPPORT.key(), "s3a");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object HoodieCLIUtils extends Logging {
}

def getLockOptions(tablePath: String, schema: String, lockConfig: TypedProperties): Map[String, String] = {
val customSupportedFSs = lockConfig.getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key, ",", new ArrayList[String])
val customSupportedFSs = lockConfig.getStringList(HoodieCommonConfig.FS_ATOMIC_CREATION_SUPPORT.key, ",", new ArrayList[String])
if (schema == null || customSupportedFSs.contains(schema) || StorageSchemes.isAtomicCreationSupported(schema)) {
logInfo("Auto config filesystem lock provider for metadata table")
val props = FileSystemBasedLockProvider.getLockConfig(tablePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ case class HoodieFileIndex(spark: SparkSession,
hasPushedDownPartitionPredicates = true

// If there are no data filters, return all the file slices.
// If isPartitionPurge is true, this fun is trigger by HoodiePruneFileSourcePartitions, don't look up candidate files
// If isPartitionPruned is true, this fun is trigger by HoodiePruneFileSourcePartitions, don't look up candidate files
// If there are no file slices, return empty list.
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruned ) {
prunedPartitionsAndFileSlices
Expand All @@ -263,7 +263,7 @@ case class HoodieFileIndex(spark: SparkSession,
}
}

logDebug(s"Overlapping candidate files from Column Stats or Record Level Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
logDebug(s"Overlapping candidate files from indexes: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")

var totalFileSliceSize = 0
var candidateFileSliceSize = 0
Expand Down Expand Up @@ -338,7 +338,10 @@ case class HoodieFileIndex(spark: SparkSession,
} catch {
// If the partition values cannot be parsed by [[convertToPartitionPath]],
// fall back to listing all partitions
case _: HoodieException => (false, listMatchingPartitionPaths(Seq.empty))
case e: HoodieException => {
logInfo(">>> Cannot use partition stats index for pruning partitions, fall back to listing all partitions", e)
(false, listMatchingPartitionPaths(Seq.empty))
}
}
} else {
// Cannot use partition stats index (not available) for pruning partitions,
Expand Down Expand Up @@ -406,13 +409,17 @@ case class HoodieFileIndex(spark: SparkSession,
if (indexSupport.isIndexAvailable && indexSupport.supportsQueryType(options)) {
val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)

logInfo(s">>> Found ${prunedFileNames} candidate files after data skipping, indexSupport: ${indexSupport.getIndexName}")

if (prunedFileNames.nonEmpty) {
return Try(prunedFileNames)
}
}
}
}
validateConfig()
logInfo(s">>> No candidate files found after data skipping")
Option.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ class PartitionStatsIndexSupport(spark: SparkSession,
// filter does not prune any partition.
val indexSchema = transposedPartitionStatsDF.schema
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)).reduce(And)
Some(transposedPartitionStatsDF.where(new Column(indexFilter))
val s = Some(transposedPartitionStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet)
s
} else {
// PARTITION_STATS index does not exist for any column in the filters, skip the pruning
Option.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}

/*
test("Test Bulk Insert Into Consistent Hashing Bucket Index Table") {
withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
Seq("false", "true").foreach { bulkInsertAsRow =>
Expand Down Expand Up @@ -2108,7 +2109,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
}
}*/

/**
* When neither of strict mode nor sql.write.operation is set, sql write operation is deduced as UPSERT
Expand Down Expand Up @@ -2236,7 +2237,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
}

test("Test sql write operation with INSERT_INTO No explicit configs No Precombine") {
/*test("Test sql write operation with INSERT_INTO No explicit configs No Precombine") {
spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
Expand All @@ -2248,7 +2249,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}
}
}*/

var listenerCallCount: Int = 0
var countDownLatch: CountDownLatch = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ public static HoodieTableMetaClient createMetaClient(

public static void addLockOptions(String basePath, String schema, TypedProperties props) {
if (!props.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
List<String> customSupportedFSs = props.getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<>());
List<String> customSupportedFSs = props.getStringList(HoodieCommonConfig.FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<>());
if (schema == null || customSupportedFSs.contains(schema) || StorageSchemes.isAtomicCreationSupported(schema)) {
props.putAll(FileSystemBasedLockProvider.getLockConfig(basePath));
}
Expand Down
Loading