Skip to content

Commit

Permalink
optimize multiple partial update code
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryyue authored and XuQianJin-Stars committed Oct 15, 2022
1 parent 2ebd741 commit 3d9e1da
Show file tree
Hide file tree
Showing 25 changed files with 846 additions and 421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
try {
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema());
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
Expand All @@ -69,8 +69,8 @@ protected abstract I tag(
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);

public I combineOnCondition(
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, String schemaString) {
return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records;
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
}

/**
Expand All @@ -81,8 +81,8 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism, String schemaString) {
return deduplicateRecords(records, table.getIndex(), parallelism, schemaString);
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
}

public abstract I deduplicateRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand Down Expand Up @@ -55,16 +56,15 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaString) {
boolean isIndexingGlobal = index.isGlobal();
Properties properties = new Properties();
properties.put("schema", schemaString);
final SerializableSchema schema = new SerializableSchema(schemaString);
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties);
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -98,9 +99,8 @@ public List<HoodieRecord<T>> deduplicateRecords(
final T data1 = rec1.getData();
final T data2 = rec2.getData();

Properties properties = new Properties();
properties.put("schema", schemaString);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties);
final Schema schema = new Schema.Parser().parse(schemaString);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,

if (performDedupe) {
dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table, config.getSchema());
parallelism, table);
}

final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecor

if (performDedupe) {
dedupedRecords = (HoodieData<HoodieRecord<T>>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table, config.getSchema());
parallelism, table);
}

// only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
Expand Down
38 changes: 12 additions & 26 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MultipleOrderingVal2ColsInfo;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
Expand All @@ -54,6 +42,16 @@
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -70,13 +68,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Objects;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -497,17 +494,6 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel
return obj == null ? "" : StringUtils.objToString(obj);
}

public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) {
MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(fieldMappings);
multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> {
Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled);
if (Objects.nonNull(val)) {
orderingVal2ColsInfo.setOrderingValue(val.toString());
}
});
return multipleOrderingVal2ColsInfo.generateOrderingText();
}

/**
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public SerializableSchema() {
public SerializableSchema(Schema schema) {
this.schema = newCopy(schema);
}

public SerializableSchema(String schemaStr) {
this.schema = new Schema.Parser().parse(schemaStr);
}

public SerializableSchema(SerializableSchema serializableSchema) {
this(serializableSchema.schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
* Implementation can leverage the schema to decide their business logic to do preCombine.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
default T preCombine(T oldValue, Schema schema, Properties properties) {
return preCombine(oldValue, properties);
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Loading

0 comments on commit 3d9e1da

Please sign in to comment.