Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8581] Test schema handler in fg reader and some refactoring to prevent bugs in the future #12340

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))

//If we need to do position based merging with log files we will leave the row index column at the end
val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) {
val dataProjection = if (getShouldMergeUseRecordPosition) {
getIdentityProjection
} else {
projectRecord(dataRequiredSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public abstract class HoodieReaderContext<T> implements Closeable {
private Boolean hasLogFiles = null;
private Boolean hasBootstrapBaseFile = null;
private Boolean needsBootstrapMerge = null;

// should we do position based merging for mor
private Boolean shouldMergeUseRecordPosition = null;

// for encoding and decoding schemas to the spillable map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.readerContext = readerContext;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
this.logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file");
}
this.props = props;
this.start = start;
this.length = length;
Expand All @@ -110,19 +115,14 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
readerContext.setTablePath(tablePath);
readerContext.setLatestCommitTime(latestCommitTime);
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, HoodieReaderConfig.MERGE_TYPE, true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge);
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file");
}
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge && readerContext.getHasLogFiles());
readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)
: new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge,
tableConfig.getRecordMergeMode(), props, !readerContext.getHasLogFiles(), isSkipMerge,
shouldUseRecordPosition, readStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ public class HoodieFileGroupReaderSchemaHandler<T> {

protected final TypedProperties properties;

protected final Option<HoodieRecordMerger> recordMerger;

protected final boolean hasBootstrapBaseFile;
protected boolean needsBootstrapMerge;

protected final boolean needsMORMerge;

private final AvroSchemaCache avroSchemaCache;

public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Expand All @@ -89,16 +82,12 @@ public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
TypedProperties properties) {
this.properties = properties;
this.readerContext = readerContext;
this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile();
this.needsMORMerge = readerContext.getHasLogFiles();
this.recordMerger = readerContext.getRecordMerger();
this.dataSchema = dataSchema;
this.requestedSchema = requestedSchema;
this.hoodieTableConfig = hoodieTableConfig;
this.requiredSchema = prepareRequiredSchema();
this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
this.avroSchemaCache = AvroSchemaCache.getInstance();
}

Expand Down Expand Up @@ -151,18 +140,18 @@ protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSc

private Schema generateRequiredSchema() {
//might need to change this if other queries than mor have mandatory fields
if (!needsMORMerge) {
if (!readerContext.getHasLogFiles()) {
return requestedSchema;
}

if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
if (!recordMerger.get().isProjectionCompatible()) {
if (!readerContext.getRecordMerger().get().isProjectionCompatible()) {
return dataSchema;
}
}

List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) {
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, readerContext.getRecordMerger())) {
if (!findNestedField(requestedSchema, field).isPresent()) {
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field);
if (!foundFieldOpt.isPresent()) {
Expand Down Expand Up @@ -209,8 +198,9 @@ private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, Type
protected Schema prepareRequiredSchema() {
Schema preReorderRequiredSchema = generateRequiredSchema();
Pair<List<Schema.Field>, List<Schema.Field>> requiredFields = getDataAndMetaCols(preReorderRequiredSchema);
this.needsBootstrapMerge = hasBootstrapBaseFile && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty();
return needsBootstrapMerge
readerContext.setNeedsBootstrapMerge(readerContext.getHasBootstrapBaseFile()
&& !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty());
return readerContext.getNeedsBootstrapMerge()
? createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), requiredFields.getRight().stream()).collect(Collectors.toList()))
: preReorderRequiredSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Types;
Expand Down Expand Up @@ -50,10 +51,18 @@ public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties);
}

private boolean morMergeNeedsPositionCol() {
return readerContext.supportsParquetRowIndex() && readerContext.getShouldMergeUseRecordPosition();
}

private boolean bootstrapMergeNeedsPositionCol() {
return readerContext.supportsParquetRowIndex() && readerContext.getNeedsBootstrapMerge();
}

@Override
protected Schema prepareRequiredSchema() {
Schema preMergeSchema = super.prepareRequiredSchema();
return readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles()
return morMergeNeedsPositionCol()
? addPositionalMergeCol(preMergeSchema)
: preMergeSchema;
}
Expand All @@ -65,7 +74,7 @@ protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema> int

@Override
protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSchema internalSchema) {
if (!(readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles())) {
if (!morMergeNeedsPositionCol()) {
return super.doPruneInternalSchema(requiredSchema, internalSchema);
}

Expand All @@ -82,20 +91,24 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem
@Override
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields();
if (readerContext.supportsParquetRowIndex()) {
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) {
if (bootstrapMergeNeedsPositionCol() || morMergeNeedsPositionCol()) {
if (!dataAndMetaCols.getLeft().isEmpty()) {
dataAndMetaCols.getLeft().add(getPositionalMergeField());
}
if (!dataAndMetaCols.getRight().isEmpty()) {
dataAndMetaCols.getRight().add(getPositionalMergeField());
}
}
return dataAndMetaCols;
}

private static Schema addPositionalMergeCol(Schema input) {
@VisibleForTesting
static Schema addPositionalMergeCol(Schema input) {
return appendFieldsToSchemaDedupNested(input, Collections.singletonList(getPositionalMergeField()));
}

private static Schema.Field getPositionalMergeField() {
@VisibleForTesting
static Schema.Field getPositionalMergeField() {
return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME,
Schema.create(Schema.Type.LONG), "", -1L);
}
Expand Down
Loading
Loading