From 3d9e1da263858534bbc254a49bd8a80067e35eea Mon Sep 17 00:00:00 2001 From: jerryyue Date: Sat, 8 Oct 2022 14:54:55 +0800 Subject: [PATCH] optimize multiple partial update code --- .../table/action/commit/BaseWriteHelper.java | 10 +- .../action/commit/HoodieWriteHelper.java | 6 +- .../table/action/commit/FlinkWriteHelper.java | 6 +- .../action/commit/FlinkWriteHelperTest.java | 112 ------- .../action/commit/JavaBulkInsertHelper.java | 2 +- .../action/commit/SparkBulkInsertHelper.java | 2 +- .../org/apache/hudi/avro/HoodieAvroUtils.java | 38 +-- .../common/config/SerializableSchema.java | 4 + .../common/model/HoodieRecordPayload.java | 14 + .../model/MultipleOrderingVal2ColsInfo.java | 97 ------ ...writeNonDefaultsWithLatestAvroPayload.java | 44 ++- .../model/PartialUpdateAvroPayload.java | 287 +++++++++++------- .../update/MultiplePartialUpdateHelper.java | 128 ++++++++ .../update/MultiplePartialUpdateUnit.java | 96 ++++++ .../log/HoodieMergedLogRecordScanner.java | 2 +- .../hudi/common/util/SpillableMapUtils.java | 10 +- .../model/TestPartialUpdateAvroPayload.java | 4 +- .../hudi/sink/utils/PayloadCreation.java | 12 +- .../apache/hudi/table/HoodieTableFactory.java | 5 +- .../hudi/sink/ITTestDataStreamWrite.java | 181 ++++++++++- .../java/org/apache/hudi/utils/TestData.java | 73 ++++- .../java/test_read_schema_partial_update.avsc | 48 +++ .../test_read_schema_partial_update.avsc | 48 +++ .../resources/test_source_partial_update.data | 22 ++ .../apache/hudi/HoodieSparkSqlWriter.scala | 16 +- 25 files changed, 846 insertions(+), 421 deletions(-) delete mode 100644 hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java delete mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateHelper.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateUnit.java create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/test_read_schema_partial_update.avsc create mode 100644 hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_partial_update.avsc create mode 100644 hudi-flink-datasource/hudi-flink/src/test/resources/test_source_partial_update.data diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 9e0c34d62061..63b9009aa0cd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -43,7 +43,7 @@ public HoodieWriteMetadata write(String instantTime, try { // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema()); + combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -69,8 +69,8 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table, String schemaString) { - return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records; + boolean condition, I records, int parallelism, HoodieTable table) { + return condition ? deduplicateRecords(records, table, parallelism) : records; } /** @@ -81,8 +81,8 @@ public I combineOnCondition( * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( - I records, HoodieTable table, int parallelism, String schemaString) { - return deduplicateRecords(records, table.getIndex(), parallelism, schemaString); + I records, HoodieTable table, int parallelism) { + return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema()); } public abstract I deduplicateRecords( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index 1acde8f2abc4..cc0af5a160a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -55,8 +56,7 @@ protected HoodieData> tag(HoodieData> dedupedRec public HoodieData> deduplicateRecords( HoodieData> records, HoodieIndex index, int parallelism, String schemaString) { boolean isIndexingGlobal = index.isGlobal(); - Properties properties = new Properties(); - properties.put("schema", schemaString); + final SerializableSchema schema = new SerializableSchema(schemaString); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -64,7 +64,7 @@ public HoodieData> deduplicateRecords( return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties); + T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index a06230bf03f5..d451130cb9a0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -98,9 +99,8 @@ public List> deduplicateRecords( final T data1 = rec1.getData(); final T data2 = rec2.getData(); - Properties properties = new Properties(); - properties.put("schema", schemaString); - @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties); + final Schema schema = new Schema.Parser().parse(schemaString); + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java deleted file mode 100644 index 3060611f2b9f..000000000000 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.commit; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.PartialUpdateAvroPayload; -import org.apache.hudi.index.HoodieIndex; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -/** - * UT for FlinkWriteHelper Test. - */ - -public class FlinkWriteHelperTest { - - private transient Schema avroSchema; - - private String preCombineFields = ""; - - public static final String SCHEMA = "{\n" - + " \"type\": \"record\",\n" - + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" - + " \"fields\": [\n" - + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" - + " {\"name\": \"fa\", \"type\": [\"null\", \"string\"]},\n" - + " {\"name\": \"_ts1\", \"type\": [\"null\", \"long\"]},\n" - + " {\"name\": \"fb\", \"type\": [\"null\", \"string\"]},\n" - + " {\"name\": \"_ts2\", \"type\": [\"null\", \"long\"]}\n" - + " ]\n" - + "}"; - - @TempDir - File tempFile; - - @BeforeEach - public void setUp() throws Exception { - this.preCombineFields = "_ts1:fa;_ts2:fb"; - this.avroSchema = new Schema.Parser().parse(SCHEMA); - } - - @Test - void deduplicateRecords() throws IOException, InterruptedException { - List records = data(); - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.avroSchema.toString()); - GenericRecord record = HoodieAvroUtils.bytesToAvro(((PartialUpdateAvroPayload) records.get(0).getData()).recordBytes, this.avroSchema); - System.out.println("======================================================================================"); - System.out.println("last: " + record); - } - - public List data() throws InterruptedException { - AtomicInteger faCnt = new AtomicInteger(1); - AtomicInteger fbCnt = new AtomicInteger(1); - List records = new ArrayList<>(); - for (int i = 1; i <= 100; i++) { - long ts = System.currentTimeMillis(); - GenericRecord row1 = new GenericData.Record(this.avroSchema); - row1.put("id", "jack"); - row1.put("fa", faCnt.getAndIncrement() + ""); - row1.put("_ts1", ts); - GenericRecord row2 = new GenericData.Record(this.avroSchema); - row2.put("id", "jack"); - row2.put("fb", fbCnt.getAndIncrement() + ""); - row2.put("_ts2", ts); - records.add(row1); - records.add(row2); - Thread.sleep(1); - } - - return records.stream().map(genericRowData -> { - try { - String orderingFieldValText = HoodieAvroUtils.getMultipleNestedFieldVals(genericRowData, - preCombineFields, false).toString(); - return new HoodieAvroRecord(new HoodieKey("1", "default"), - new PartialUpdateAvroPayload(genericRowData, orderingFieldValText), HoodieOperation.INSERT); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } - -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 3f56c41ee5f9..e126372aa906 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -103,7 +103,7 @@ public List bulkInsert(List> inputRecords, if (performDedupe) { dedupedRecords = (List>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table); } final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 28a41f3a9336..1652c35eb63e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -100,7 +100,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor if (performDedupe) { dedupedRecords = (HoodieData>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table); } // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 633fa0a205ad..c74e11a2f08a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,18 +18,6 @@ package org.apache.hudi.avro; -import org.apache.hudi.common.config.SerializableSchema; -import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.MultipleOrderingVal2ColsInfo; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.SchemaCompatibilityException; - import org.apache.avro.AvroRuntimeException; import org.apache.avro.Conversions; import org.apache.avro.Conversions.DecimalConversion; @@ -54,6 +42,16 @@ import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -70,13 +68,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Deque; -import java.util.LinkedList; -import java.util.Objects; import java.util.TimeZone; import java.util.stream.Collectors; @@ -497,17 +494,6 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel return obj == null ? "" : StringUtils.objToString(obj); } - public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) { - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(fieldMappings); - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { - Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled); - if (Objects.nonNull(val)) { - orderingVal2ColsInfo.setOrderingValue(val.toString()); - } - }); - return multipleOrderingVal2ColsInfo.generateOrderingText(); - } - /** * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java index 4f6de8ba5f3c..6e9ff73af9e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -38,6 +38,10 @@ public SerializableSchema() { public SerializableSchema(Schema schema) { this.schema = newCopy(schema); } + + public SerializableSchema(String schemaStr) { + this.schema = new Schema.Parser().parse(schemaStr); + } public SerializableSchema(SerializableSchema serializableSchema) { this(serializableSchema.schema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 6752607d2f48..d4e61da9bbf6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) { return preCombine(oldValue); } + /** + * When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema. + * Implementation can leverage the schema to decide their business logic to do preCombine. + * + * @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with. + * @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value. + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return the combined value + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + default T preCombine(T oldValue, Schema schema, Properties properties) { + return preCombine(oldValue, properties); + } + /** * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java deleted file mode 100644 index 9bea5151ffb0..000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.model; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -/** - * MultipleOrderingVal2ColsInfo - * _ts1=999:name1,price1;_ts2=111:name2,price2 - * _ts1:name1,price1=999;_ts2:name2,price2=111 - */ -public class MultipleOrderingVal2ColsInfo { - private List orderingVal2ColsInfoList = new ArrayList<>(); - - public MultipleOrderingVal2ColsInfo(String multipleOrderingFieldsWithColsText) { - for (String orderingFieldWithColsText : multipleOrderingFieldsWithColsText.split(";")) { - if (orderingFieldWithColsText == null || orderingFieldWithColsText.isEmpty()) { - continue; - } - OrderingVal2ColsInfo orderingVal2ColsInfo = new OrderingVal2ColsInfo(orderingFieldWithColsText); - orderingVal2ColsInfoList.add(orderingVal2ColsInfo); - } - } - - public List getOrderingVal2ColsInfoList() { - return orderingVal2ColsInfoList; - } - - public String generateOrderingText() { - StringBuilder sb = new StringBuilder(); - orderingVal2ColsInfoList.stream().forEach(orderingVal2ColsInfo -> { - sb.append(orderingVal2ColsInfo.orderingField); - sb.append("="); - if (Objects.nonNull(orderingVal2ColsInfo.orderingValue)) { - sb.append(orderingVal2ColsInfo.orderingValue); - } - sb.append(":"); - sb.append(String.join(",", orderingVal2ColsInfo.getColumnNames())); - sb.append(";"); - }); - sb.deleteCharAt(sb.length() - 1); - - return sb.toString(); - } - - public class OrderingVal2ColsInfo { - private String orderingField; - private String orderingValue = ""; - private List columnNames; - - public OrderingVal2ColsInfo(String orderingFieldWithColsText) { - String[] orderInfo2ColsArr = orderingFieldWithColsText.split(":"); - String[] orderingField2Value = orderInfo2ColsArr[0].split("="); - String[] columnArr = orderInfo2ColsArr[1].split(","); - this.orderingField = orderingField2Value[0]; - if (orderingField2Value.length > 1) { - this.orderingValue = orderingField2Value[1]; - } - this.columnNames = Arrays.asList(columnArr); - } - - public String getOrderingField() { - return orderingField; - } - - public String getOrderingValue() { - return orderingValue; - } - - public void setOrderingValue(String value) { - this.orderingValue = value; - } - - public List getColumnNames() { - return columnNames; - } - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 05d8f24da7b9..64854c754410 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -22,7 +22,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; - import org.apache.hudi.common.util.Option; import java.io.IOException; @@ -58,26 +57,43 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue GenericRecord insertRecord = (GenericRecord) recordOption.get(); GenericRecord currentRecord = (GenericRecord) currentValue; - return getMergedIndexedRecordOption(schema, insertRecord, currentRecord); + return mergeRecords(schema, insertRecord, currentRecord); } - protected Option getMergedIndexedRecordOption(Schema schema, GenericRecord insertRecord, GenericRecord currentRecord) { - if (isDeleteRecord(insertRecord)) { + /** + * Merges the given records into one. + * The fields in {@code baseRecord} has higher priority: + * it is set up into the merged record if it is not null or equals to the default. + * + * @param schema The record schema + * @param baseRecord The base record to merge with + * @param mergedRecord The record to be merged + * + * @return the merged record option + */ + protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { + if (isDeleteRecord(baseRecord)) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); - fields.forEach(field -> { - Object value = insertRecord.get(field.name()); - value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; - Object defaultValue = field.defaultVal(); - if (!overwriteField(value, defaultValue)) { - builder.set(field, value); - } else { - builder.set(field, currentRecord.get(field.pos())); - } - }); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); return Option.of(builder.build()); } } + + protected void setField( + GenericRecord baseRecord, + GenericRecord mergedRecord, + GenericRecordBuilder builder, + Schema.Field field) { + Object value = baseRecord.get(field.name()); + value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + builder.set(field, value); + } else { + builder.set(field, mergedRecord.get(field.name())); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 97815e4e3a22..99bdb7c4cff0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -18,32 +18,77 @@ package org.apache.hudi.common.model; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.partial.update.MultiplePartialUpdateHelper; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.util.Option; - import java.io.IOException; -import java.util.Map; -import java.util.Objects; +import java.util.List; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** - * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta streamer. + * Payload clazz that is used for partial update Hudi Table. + * + *

Simplified partial update Logic: + *

+ *  1. #preCombine
+ *  For records with the same record key in one batch
+ *  or in the delta logs that belongs to same File Group,
+ *  Checks whether one record's ordering value is larger than the other record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
+ *
+ *  2. #combineAndGetUpdateValue
+ *  For every incoming record with existing record in storage (same record key)
+ *  Checks whether incoming record's ordering value is larger than the existing record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
+ *  else overwrites the incoming one with the existing record for specified fields that doesn't equal to null
+ *  and returns a merged record.
  *
- * 
    - *
  1. preCombine - Picks the latest delta record for a key, based on an ordering field; - *
  2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields - * that doesn't equal defaultValue. - *
+ * Illustration with simple data. + * let's say the order field is 'ts' and schema is : + * { + * [ + * {"name":"id","type":"string"}, + * {"name":"ts","type":"long"}, + * {"name":"name","type":"string"}, + * {"name":"price","type":"string"} + * ] + * } + * + * case 1 + * Current data: + * id ts name price + * 1 1 name_1 price_1 + * Insert data: + * id ts name price + * 1 2 null price_2 + * + * Result data after #preCombine or #combineAndGetUpdateValue: + * id ts name price + * 1 2 name_1 price_2 + * + * case 2 + * Current data: + * id ts name price + * 1 2 name_1 null + * Insert data: + * id ts name price + * 1 1 null price_1 + * + * Result data after preCombine or combineAndGetUpdateValue: + * id ts name price + * 1 2 name_1 price_1 + *
*/ -public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { - - public static ConcurrentHashMap schemaRepo = new ConcurrentHashMap<>(); +public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload implements MultiplePartialUpdateHelper { public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); @@ -54,27 +99,27 @@ public PartialUpdateAvroPayload(Option record) { } @Override - public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties) { - String schemaStringIn = properties.getProperty("schema"); - Schema schemaInstance; - if (!schemaRepo.containsKey(schemaStringIn)) { - schemaInstance = new Schema.Parser().parse(schemaStringIn); - schemaRepo.put(schemaStringIn, schemaInstance); - } else { - schemaInstance = schemaRepo.get(schemaStringIn); - } + public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) { if (oldValue.recordBytes.length == 0) { // use natural order for delete record return this; } try { - GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schemaInstance).get(); - Option optValue = combineAndGetUpdateValue(indexedOldValue, schemaInstance, this.orderingVal.toString()); - // Rebuild ordering value if required - String newOrderingFieldWithColsText = rebuildWithNewOrderingVal((GenericRecord) optValue.get(), this.orderingVal.toString()); - if (optValue.isPresent()) { - return new PartialUpdateAvroPayload((GenericRecord) optValue.get(), newOrderingFieldWithColsText); + Option oldIndexedValue = oldValue.getInsertValue(schema); + if (isMultipleOrderFields(this.orderingVal.toString())) { + Option incomingRecord = getInsertValue(schema); + Option mergedRecord = preCombineMultiplePartialUpdate(oldIndexedValue, incomingRecord, schema, this.orderingVal, properties); + if (mergedRecord.isPresent()) { + return new PartialUpdateAvroPayload(Option.of((GenericRecord) mergedRecord.get())); + } + } + // pick the payload with greater ordering value as insert record + final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0; + Option mergedRecord = mergeOldRecord(oldIndexedValue.get(), schema, shouldPickOldRecord); + if (mergedRecord.isPresent()) { + return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), + shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal); } } catch (Exception ex) { return this; @@ -82,108 +127,114 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal return this; } - public Option combineAndGetUpdateValue( - IndexedRecord currentValue, Schema schema, String multipleOrderingFieldsWithCols) throws IOException { - Option incomingRecord = getInsertValue(schema); - if (!incomingRecord.isPresent()) { - return Option.empty(); - } - - // Perform a deserialization again to prevent resultRecord from sharing the same reference as recordOption - GenericRecord resultRecord = (GenericRecord) getInsertValue(schema).get(); - - Map name2Field = schema.getFields().stream().collect(Collectors.toMap(Schema.Field::name, item -> item)); - // multipleOrderingFieldsWithCols = _ts1:name1,price1=999;_ts2:name2,price2=; - - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(multipleOrderingFieldsWithCols); - final Boolean[] deleteFlag = new Boolean[1]; - deleteFlag[0] = false; - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { - String persistOrderingVal = HoodieAvroUtils.getNestedFieldValAsString( - (GenericRecord) currentValue, orderingVal2ColsInfo.getOrderingField(), true, false); - if (persistOrderingVal == null) { - persistOrderingVal = ""; - } - - // No update required - if (persistOrderingVal.isEmpty() && orderingVal2ColsInfo.getOrderingField().isEmpty()) { - return; - } - - // Pick the payload with greatest ordering value as insert record - boolean needUpdatePersistData = false; - try { - if (persistOrderingVal == null || (orderingVal2ColsInfo.getOrderingValue() != null - && persistOrderingVal.compareTo(orderingVal2ColsInfo.getOrderingValue()) <= 0)) { - needUpdatePersistData = true; - } - } catch (NumberFormatException e) { - if (persistOrderingVal.compareTo(orderingVal2ColsInfo.getOrderingValue()) < 0) { - needUpdatePersistData = true; - } - } - - // Initialise the fields of the sub-tables - GenericRecord insertRecord; - if (!needUpdatePersistData) { - insertRecord = (GenericRecord) currentValue; - // resultRecord is already assigned as recordOption - orderingVal2ColsInfo.getColumnNames().stream() - .filter(name2Field::containsKey) - .forEach(fieldName -> resultRecord.put(fieldName, insertRecord.get(fieldName))); - resultRecord.put(orderingVal2ColsInfo.getOrderingField(), Long.parseLong(persistOrderingVal)); - } else { - insertRecord = (GenericRecord) incomingRecord.get(); - orderingVal2ColsInfo.getColumnNames().stream() - .filter(name2Field::containsKey) - .forEach(fieldName -> resultRecord.put(fieldName, insertRecord.get(fieldName))); - } - - // If any of the sub-table records is flagged for deletion, delete entire row - if (isDeleteRecord(insertRecord)) { - deleteFlag[0] = true; - } - }); - - if (deleteFlag[0]) { - return Option.empty(); - } - return Option.of(resultRecord); - } - @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - return this.combineAndGetUpdateValue(currentValue, schema, this.orderingVal.toString()); + return isMultipleOrderFields(this.orderingVal.toString()) + ? combineAndGetUpdateValue(currentValue, getInsertValue(schema), schema, this.orderingVal) + : this.mergeOldRecord(currentValue, schema, false); } @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { + return isMultipleOrderFields(this.orderingVal.toString()) + ? combineAndGetUpdateValue(currentValue, getInsertValue(schema), schema, this.orderingVal, prop) + : mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop)); + } + + /** + * Return true if value equals defaultValue otherwise false. + */ + public Boolean overwriteField(Object value, Object defaultValue) { + return value == null; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private Option mergeOldRecord(IndexedRecord oldRecord, + Schema schema, + boolean isOldRecordNewer) throws IOException { Option recordOption = getInsertValue(schema); + if (!recordOption.isPresent()) { + // use natural order for delete record return Option.empty(); } - String orderingFieldWithColsText = rebuildWithNewOrderingVal( - (GenericRecord) recordOption.get(), this.orderingVal.toString()); - return combineAndGetUpdateValue(currentValue, schema, orderingFieldWithColsText); + + if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) { + // handling disorder, should use the metadata fields of the updating record + return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else if (isOldRecordNewer) { + return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else { + return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord); + } } - private static String rebuildWithNewOrderingVal(GenericRecord record, String orderingFieldWithColsText) { - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(orderingFieldWithColsText); - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { - Object orderingVal = record.get(orderingVal2ColsInfo.getOrderingField()); - if (Objects.nonNull(orderingVal)) { - orderingVal2ColsInfo.setOrderingValue(orderingVal.toString()); - } else { - orderingVal2ColsInfo.setOrderingValue("-1"); - } - }); - return multipleOrderingVal2ColsInfo.generateOrderingText(); + /**StreamWriteFunction + * Merges the given disorder records with metadata. + * + * @param schema The record schema + * @param oldRecord The current record from file + * @param updatingRecord The incoming record + * + * @return the merged record option + */ + protected Option mergeDisorderRecordsWithMetadata( + Schema schema, + GenericRecord oldRecord, + GenericRecord updatingRecord) { + if (isDeleteRecord(oldRecord)) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> { + final GenericRecord baseRecord; + final GenericRecord mergedRecord; + if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) { + // this is a metadata field + baseRecord = updatingRecord; + mergedRecord = oldRecord; + } else { + baseRecord = oldRecord; + mergedRecord = updatingRecord; + } + setField(baseRecord, mergedRecord, builder, field); + }); + return Option.of(builder.build()); + } } /** - * Return true if value equals defaultValue otherwise false. + * Returns whether the given record is newer than the record of this payload. + * + * @param orderingVal + * @param record The record + * @param prop The payload properties + * + * @return true if the given record is newer */ - public Boolean overwriteField(Object value, Object defaultValue) { - return value == null; + private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) { + String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); + if (!StringUtils.isNullOrEmpty(orderingField)) { + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + + Comparable oldOrderingVal = + (Comparable) HoodieAvroUtils.getNestedFieldVal( + (GenericRecord) record, + orderingField, + true, + consistentLogicalTimestampEnabled); + + // pick the payload with greater ordering value as insert record + return oldOrderingVal != null + && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) + && oldOrderingVal.compareTo(orderingVal) > 0; + } + return false; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateHelper.java new file mode 100644 index 000000000000..00965d7d959d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateHelper.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model.partial.update; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta streamer. + * + *
    + *
  1. preCombine - Picks the latest delta record for a key, based on an ordering field; + *
  2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields + * that doesn't equal defaultValue. + *
+ */ +public interface MultiplePartialUpdateHelper { + + default Option preCombineMultiplePartialUpdate(Option oldValue,Option incomingRecord, Schema schema, Comparable orderVal, Properties properties) + throws IOException { + return combineAndGetUpdateValue(oldValue.get(), incomingRecord, schema,orderVal, properties); + } + + default Option combineAndGetUpdateValue( + IndexedRecord currentValue, Option incomingRecord, Schema schema, MultiplePartialUpdateUnit multiplePartialUpdateUnit) throws IOException { + if (!incomingRecord.isPresent()) { + return Option.empty(); + } + + // Perform a deserialization again to prevent resultRecord from sharing the same reference as recordOption + GenericRecord resultRecord = (GenericRecord) incomingRecord.get(); + + Map name2Field = schema.getFields().stream().collect(Collectors.toMap(Schema.Field::name, item -> item)); + // multipleOrderingFieldsWithCols = _ts1:name1,price1=999;_ts2:name2,price2=; + + final Boolean[] deleteFlag = new Boolean[1]; + deleteFlag[0] = false; + multiplePartialUpdateUnit.getMultiplePartialUpdateUnits().forEach(partialUpdateUnit -> { + + // Initialise the fields of the sub-tables + GenericRecord insertRecord = resultRecord; + boolean needUseOldRecordToUpdate = needUseOldRecordToUpdate(resultRecord, (GenericRecord) currentValue, partialUpdateUnit); + if (needUseOldRecordToUpdate) { + insertRecord = (GenericRecord) currentValue; + // resultRecord is already assigned as recordOption + GenericRecord finalInsertRecord = insertRecord; + partialUpdateUnit.getColumnNames().stream() + .filter(name2Field::containsKey) + .forEach(fieldName -> { + Object value = finalInsertRecord.get(fieldName); + if (value != null) { + resultRecord.put(fieldName, value); + } + }); + Object oldOrderingVal = HoodieAvroUtils.getNestedFieldVal(finalInsertRecord, partialUpdateUnit.getOrderingField(), true, false); + resultRecord.put(partialUpdateUnit.getOrderingField(), oldOrderingVal); + } + // If any of the sub-table records is flagged for deletion, delete entire row + if (isDelete(insertRecord)) { + deleteFlag[0] = true; + } + }); + + if (deleteFlag[0]) { + return Option.empty(); + } + return Option.of(resultRecord); + } + + default Option combineAndGetUpdateValue(IndexedRecord currentValue,Option incomingRecord, Schema schema, Comparable orderingVal) throws IOException { + return combineAndGetUpdateValue(currentValue, incomingRecord, schema, new MultiplePartialUpdateUnit(orderingVal.toString())); + } + + default boolean needUseOldRecordToUpdate(GenericRecord incomingRecord, GenericRecord currentRecord, MultiplePartialUpdateUnit.PartialUpdateUnit partialUpdateUnit) { + String orderingField = partialUpdateUnit.getOrderingField(); + Comparable currentOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(currentRecord, orderingField, true, false); + Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(incomingRecord, orderingField, true, false); + return Objects.isNull(incomingOrderingVal) && Objects.nonNull(currentOrderingVal) || Objects.nonNull(currentOrderingVal) && currentOrderingVal.compareTo(incomingOrderingVal) > 0; + } + + default Option combineAndGetUpdateValue(IndexedRecord currentValue, Option incomingRecord, Schema schema, Comparable orderingVal, Properties prop) + throws IOException { + return combineAndGetUpdateValue(currentValue, incomingRecord, schema, orderingVal); + } + + default boolean isMultipleOrderFields(String preCombineField) { + return !StringUtils.isNullOrEmpty(preCombineField) && preCombineField.split(":").length > 1; + } + + default boolean isDelete(GenericRecord genericRecord) { + final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED; + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(isDeleteKey) == null) { + return false; + } + Object deleteMarker = genericRecord.get(isDeleteKey); + return (deleteMarker instanceof Boolean && (boolean) deleteMarker); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateUnit.java b/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateUnit.java new file mode 100644 index 000000000000..5d8af72f2efc --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/partial/update/MultiplePartialUpdateUnit.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model.partial.update; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * MultipleOrderingVal2ColsInfo + * _ts1=999:name1,price1;_ts2=111:name2,price2 + * _ts1:name1,price1=999;_ts2:name2,price2=111 + */ +public class MultiplePartialUpdateUnit { + private List multiplePartialUpdateUnits = Collections.EMPTY_LIST; + + public MultiplePartialUpdateUnit(String multipleUpdateUnitText) { + this.multiplePartialUpdateUnits = + Arrays.stream(multipleUpdateUnitText.split(";")).filter(Objects::nonNull) + .map(PartialUpdateUnit::new).collect(Collectors.toList()); + } + + public List getAllColumns() { + List allColumns = new ArrayList<>(); + this.getMultiplePartialUpdateUnits().forEach(partialUpdateUnit -> { + allColumns.add(partialUpdateUnit.getOrderingField()); + allColumns.addAll(partialUpdateUnit.getColumnNames()); + }); + return allColumns; + } + + public List getMultiplePartialUpdateUnits() { + return multiplePartialUpdateUnits; + } + + public class PartialUpdateUnit { + private String orderingField; + private String orderingValue = ""; + private List columnNames; + + public PartialUpdateUnit(String partialUpdateUnitText) { + List partialUpdateList = Arrays.asList(partialUpdateUnitText.split(":|,")); + this.orderingField = partialUpdateList.get(0); + this.columnNames = partialUpdateList.subList(1, partialUpdateList.size()); + } + + public String getOrderingField() { + return orderingField; + } + + public String getOrderingValue() { + return orderingValue; + } + + public void setOrderingValue(String value) { + this.orderingValue = value; + } + + public List getColumnNames() { + return columnNames; + } + + @Override + public String toString() { + return String.format("%s=%s:%s", this.orderingField, this.orderingValue, String.join(",", this.columnNames)); + } + } + + @Override + public String toString() { + int len = multiplePartialUpdateUnits.size(); + return len > 1 ? this.multiplePartialUpdateUnits + .stream() + .map(PartialUpdateUnit::toString) + .collect(Collectors.joining(";")) : multiplePartialUpdateUnits.get(0).toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 28f9481876fa..88455bac27bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -155,7 +155,7 @@ protected void processNextRecord(HoodieRecord hoo HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, properties); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue,readerSchema, properties); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 1952305aa0ec..d4bafd9c9fee 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.util; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.SizeAwareDataOutputStream; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -155,13 +154,8 @@ private static Object getPreCombineVal(GenericRecord rec, String preCombineField if (preCombineField == null) { return 0; } - // Handle multiple ordering/preCombineFields - if (preCombineField.contains(";")) { - return HoodieAvroUtils.getMultipleNestedFieldVals(rec, preCombineField, false); - } else { - Schema.Field field = rec.getSchema().getField(preCombineField); - return field == null ? 0 : rec.get(field.pos()); - } + Schema.Field field = rec.getSchema().getField(preCombineField); + return field == null ? 0 : rec.get(field.pos()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 76062925d51f..5aec5780270b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -137,8 +137,8 @@ public void testActiveRecords() throws IOException { payload1 = new PartialUpdateAvroPayload(record1, "_ts1=5:name1,price1;_ts2=4:name2,price2"); payload2 = new PartialUpdateAvroPayload(record2, "_ts1=4:name1,price1;_ts2=5:name2,price2"); - PartialUpdateAvroPayload preCombineRes1 = payload1.preCombine(payload2, properties); - PartialUpdateAvroPayload preCombineRes2 = payload2.preCombine(payload1, properties); + PartialUpdateAvroPayload preCombineRes1 = payload1.preCombine(payload2,schema, properties); + PartialUpdateAvroPayload preCombineRes2 = payload2.preCombine(payload1,schema, properties); String expOrderingVal = "_ts1=5:name1,price1;_ts2=5:name2,price2"; PartialUpdateAvroPayload expPrecombineRes = diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index 7df339489261..bb0f262d4a4f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -76,15 +76,9 @@ public static PayloadCreation instance(Configuration conf) throws Exception { public HoodieRecordPayload createPayload(GenericRecord record) throws Exception { if (shouldCombine) { ValidationUtils.checkState(preCombineField != null); - Comparable orderingVal; - if (preCombineField.contains(";")) { - // Multi ordering field support - orderingVal = (Comparable) HoodieAvroUtils.getMultipleNestedFieldVals(record, preCombineField, false); - } else { - orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, - preCombineField, false, false); - } - return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); + Comparable orderingVal = preCombineField.split(":").length > 1 ? preCombineField : (Comparable) HoodieAvroUtils.getNestedFieldVal(record, + preCombineField, false, false); + return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); } else { return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 077d8dc312c0..81278d1a8835 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.model.partial.update.MultiplePartialUpdateUnit; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; @@ -154,8 +155,8 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); } else if (preCombineField.contains(";")) { // pre_combine key is in multi_ordering format (e.g. _ts1:name1,price1;_ts2:name2,price2) - Arrays.stream(preCombineField.split(";")) - .filter(f -> !fields.contains(f.split(":")[0])) + new MultiplePartialUpdateUnit(preCombineField) + .getAllColumns().stream().filter(field -> !fields.contains(field)) .findAny() .ifPresent(f -> { throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 680c4d02e238..0810de0ce2a3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -18,13 +18,23 @@ package org.apache.hudi.sink; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.HoodiePipeline; import org.apache.hudi.util.StreamerUtil; @@ -52,12 +62,14 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; +import org.apache.parquet.Strings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -67,6 +79,8 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; + /** * Integration test for Flink Hoodie stream sink. */ @@ -103,7 +117,7 @@ public void testWriteCopyOnWrite(String indexType) throws Exception { conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); - conf.setBoolean(FlinkOptions.PRE_COMBINE,true); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); testWriteToHoodie(conf, "cow_write", 2, EXPECTED); } @@ -390,7 +404,7 @@ public void testHoodiePipelineBuilderSink() throws Exception { // Read from file source RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) - .getLogicalType(); + .getLogicalType(); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, @@ -414,7 +428,6 @@ public void testHoodiePipelineBuilderSink() throws Exception { .setParallelism(1); - //sink to hoodie table use low-level sink api. HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") .column("uuid string not null") @@ -431,4 +444,166 @@ public void testHoodiePipelineBuilderSink() throws Exception { execute(execEnv, true, "Api_Sink_Test"); TestData.checkWrittenDataCOW(tempFile, EXPECTED); } + + @Test + public void testMultiplePartialUpdate() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + Map options = new HashMap<>(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); + options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema_partial_update.avsc")).toString()); + Configuration conf = Configuration.fromMap(options); + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source_partial_update.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + format.setCharsetName("UTF-8"); + + DataStream dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 1)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4"); + options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "uuid"); + options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"); + options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), PartialUpdateAvroPayload.class.getName()); + options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:fa;_ts2:fb"); + options.put(FlinkOptions.PRE_COMBINE.key(), "true"); + options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + + + //sink to hoodie table use low-level sink api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") + .column("uuid string not null") + .column("fa string") //部分更新列 + .column("_ts1 bigint") + .column("fb string") //部分更新列 + //.column("fc string") //部分更新列//部分更新列 + .column("_ts2 bigint") + //.column("fd string") //部分更新列fb 对应的orderingField,必须要设置 + .pk("uuid") + .options(options); + + + builder.sink(dataStream, false); + execute(execEnv, true, "Api_Sink_Test"); + Map> expected = new HashMap<>(); + + expected.put("", Arrays.asList("id1,7,7,7,7", "id2,8,8,8,8", "id3,6,6,6,6")); + + TestData.checkWrittenFullDataFunction(tempFile, expected, genericRecord -> { + List fields = new ArrayList<>(); + fields.add(genericRecord.get("_hoodie_record_key").toString()); + fields.add(genericRecord.get("fa").toString()); + fields.add(genericRecord.get("_ts1").toString()); + fields.add(genericRecord.get("fb").toString()); + fields.add(genericRecord.get("_ts2").toString()); + return Strings.join(fields, ","); + }); + } + + @Test + void deduplicateRecords() throws IOException { + final String SCHEMA = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"fa\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"fb\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_ts\", \"type\": [\"null\", \"long\"]}\n" + + " ]\n" + + "}"; + + String preCombineFields = "_ts"; + List records = new ArrayList<>(); + Schema avroSchema = new Schema.Parser().parse(SCHEMA); + for (int i = 1; i <= 100; i++) { + long ts = System.currentTimeMillis(); + GenericRecord row1 = new GenericData.Record(avroSchema); + row1.put("id", "jack"); + row1.put("fa", i + ""); + row1.put("_ts", ts); + Comparable orderingVal1 = (Comparable) HoodieAvroUtils.getNestedFieldVal(row1, + preCombineFields, false, false); + records.add(new HoodieAvroRecord(new HoodieKey("1", "default"), + new PartialUpdateAvroPayload(row1, orderingVal1), HoodieOperation.INSERT)); + ts = System.currentTimeMillis(); + GenericRecord row2 = new GenericData.Record(avroSchema); + row2.put("id", "jack"); + row2.put("fb", i + ""); + row2.put("_ts", ts); + Comparable orderingVal2 = (Comparable) HoodieAvroUtils.getNestedFieldVal(row2, + preCombineFields, false, false); + records.add(new HoodieAvroRecord(new HoodieKey("1", "default"), + new PartialUpdateAvroPayload(row2, orderingVal2), HoodieOperation.INSERT)); + } + + List deduplicateRecords = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, avroSchema.toString()); + GenericRecord record = HoodieAvroUtils.bytesToAvro(((PartialUpdateAvroPayload) deduplicateRecords.get(0).getData()).recordBytes, avroSchema); + assertEquals(deduplicateRecords.size(), 1); + assertEquals(record.get(1).toString(), "100"); + assertEquals(record.get(1), record.get(2)); + } + + @Test + void deduplicateRecordsWithMultipleOrderingFields() throws IOException { + String schema = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"fa\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_ts1\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"fb\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_ts2\", \"type\": [\"null\", \"long\"]}\n" + + " ]\n" + + "}"; + String preCombineFields = "_ts1:fa;_ts2:fb"; + List records = new ArrayList<>(); + Schema avroSchema = new Schema.Parser().parse(schema); + for (int i = 1; i <= 1000; i++) { + long ts = System.currentTimeMillis(); + GenericRecord row1 = new GenericData.Record(avroSchema); + row1.put("id", "jack"); + row1.put("fa", i + ""); + row1.put("_ts1", ts); + records.add(new HoodieAvroRecord(new HoodieKey("1", "default"), + new PartialUpdateAvroPayload(row1, preCombineFields), HoodieOperation.INSERT)); + ts = System.currentTimeMillis(); + GenericRecord row2 = new GenericData.Record(avroSchema); + row2.put("id", "jack"); + row2.put("fb", i + ""); + row2.put("_ts2", ts); + records.add(new HoodieAvroRecord(new HoodieKey("1", "default"), + new PartialUpdateAvroPayload(row2, preCombineFields), HoodieOperation.INSERT)); + } + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, schema); + GenericRecord record = HoodieAvroUtils.bytesToAvro(((PartialUpdateAvroPayload) records.get(0).getData()).recordBytes, avroSchema); + assertEquals(record.get(1).toString(), "1000"); + assertEquals(record.get(1), record.get(3)); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index fbcd3d6a8676..e3a9157dcb2a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -68,6 +68,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.function.Function; import java.util.function.Predicate; import java.util.HashSet; import java.util.List; @@ -554,6 +555,65 @@ public static void assertRowDataEquals(List rows, List expecte assertThat(rowsString, is(rowDataToString(expected))); } + /** + * Checks the source data are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + */ + public static void checkWrittenFullData( + File basePath, + Map> expected) throws IOException { + checkWrittenFullDataFunction(basePath, expected, FILTER_OUT_VARIABLES_FUNCTION); + } + + + /** + * Checks the source data are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + */ + public static void checkWrittenFullDataFunction( + File basePath, + Map> expected, + Function creator) throws IOException { + + // 1. init flink table + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); + HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); + + // 2. check each partition data + expected.forEach((partition, partitionDataSet) -> { + + List readBuffer = new ArrayList<>(); + table.getBaseFileOnlyView().getLatestBaseFiles(partition) + .forEach(baseFile -> { + String path = baseFile.getPath(); + try { + ParquetReader reader = AvroParquetReader.builder(new Path(path)).build(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + String value = creator.apply(nextRecord); + readBuffer.add(value); + nextRecord = reader.read(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer)); + + }); + + } + /** * Checks the source data set are written as expected. * @@ -711,8 +771,8 @@ public static void checkWrittenDataMOR( assert hoodiePropertiesFile.exists(); // 1. init flink table HoodieWriteConfig config = HoodieWriteConfig.newBuilder() - .fromFile(hoodiePropertiesFile) - .withPath(basePath).build(); + .fromFile(hoodiePropertiesFile) + .withPath(basePath).build(); HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, config.getProps()); HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); @@ -768,8 +828,8 @@ public static void checkWrittenDataMOR( for (String curKey : scanner.getRecords().keySet()) { if (!keyToSkip.contains(curKey)) { Option record = (Option) scanner.getRecords() - .get(curKey).getData() - .getInsertValue(schema, config.getProps()); + .get(curKey).getData() + .getInsertValue(schema, config.getProps()); if (record.isPresent()) { readBuffer.add(filterOutVariables(record.get())); } @@ -808,6 +868,9 @@ private static HoodieMergedLogRecordScanner getScanner( .build(); } + private static Function FILTER_OUT_VARIABLES_FUNCTION = + genericRecord -> filterOutVariables(genericRecord); + /** * Filter out the variables like file name. */ @@ -820,7 +883,7 @@ private static String filterOutVariables(GenericRecord genericRecord) { fields.add(genericRecord.get("age").toString()); fields.add(genericRecord.get("ts").toString()); fields.add(genericRecord.get("partition").toString()); - return String.join(",",fields); + return String.join(",", fields); } public static BinaryRowData insertRow(Object... fields) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/test_read_schema_partial_update.avsc b/hudi-flink-datasource/hudi-flink/src/test/java/test_read_schema_partial_update.avsc new file mode 100644 index 000000000000..5b5618e95d24 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/test_read_schema_partial_update.avsc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "record", + "fields" : [ { + "name" : "uuid", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "fa", + "type" : [ "null", "string" ], + "default" : null + },{ + "name" : "_ts1", + "type" : [ "null", { + "type" : "long", + "logicalType" : "long" + } ], + "default" : null + } , { + "name" : "fb", + "type" : [ "null", "string" ], + "default" : null + },{ + "name" : "_ts2", + "type" : [ "null", { + "type" : "long", + "logicalType" : "long" + } ], + "default" : null + }] +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_partial_update.avsc b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_partial_update.avsc new file mode 100644 index 000000000000..5b5618e95d24 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/resources/test_read_schema_partial_update.avsc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "record", + "fields" : [ { + "name" : "uuid", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "fa", + "type" : [ "null", "string" ], + "default" : null + },{ + "name" : "_ts1", + "type" : [ "null", { + "type" : "long", + "logicalType" : "long" + } ], + "default" : null + } , { + "name" : "fb", + "type" : [ "null", "string" ], + "default" : null + },{ + "name" : "_ts2", + "type" : [ "null", { + "type" : "long", + "logicalType" : "long" + } ], + "default" : null + }] +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/test_source_partial_update.data b/hudi-flink-datasource/hudi-flink/src/test/resources/test_source_partial_update.data new file mode 100644 index 000000000000..182e43786253 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/resources/test_source_partial_update.data @@ -0,0 +1,22 @@ +{"uuid": "id1", "fa": "1", "_ts1": 1} +{"uuid": "id2", "fa": "2", "_ts1": 2} +{"uuid": "id3", "fa": "3", "_ts1": 3} +{"uuid": "id1", "fa": "4", "_ts1": 4} +{"uuid": "id2", "fa": "5", "_ts1": 5} +{"uuid": "id2", "fb": "3","_ts2": 3} +{"uuid": "id3", "fb": "2","_ts2": 2} +{"uuid": "id1", "fb": "4","_ts2": 4} +{"uuid": "id3", "fa": "6", "_ts1": 6} +{"uuid": "id1", "fa": "7", "_ts1": 7} +{"uuid": "id2", "fb": "3","_ts2": 3} +{"uuid": "id3", "fb": "2","_ts2": 2} +{"uuid": "id1", "fb": "4","_ts2": 4} +{"uuid": "id3", "fa": "3", "_ts1": 3} +{"uuid": "id1", "fa": "4", "_ts1": 4} +{"uuid": "id2", "fa": "5", "_ts1": 5} +{"uuid": "id2", "fb": "5","_ts2": 5} +{"uuid": "id3", "fb": "6","_ts2": 6} +{"uuid": "id1", "fb": "7","_ts2": 7} +{"uuid": "id2", "fa": "8", "_ts1": 8} +{"uuid": "id1", "fb": "1", "_ts2": 1} +{"uuid": "id2", "fb": "8","_ts2": 8} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 726aad9a53aa..1ca86e0b3990 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -291,17 +291,11 @@ object HoodieSparkSqlWriter { val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns) val hoodieRecord = if (shouldCombine) { val preCombineField = hoodieConfig.getString(PRECOMBINE_FIELD) - val orderingVal = if (preCombineField.contains(";")) { - HoodieAvroUtils.getMultipleNestedFieldVals(gr, preCombineField, parameters.getOrElse( - DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) - .asInstanceOf[Comparable[_]] - } else { - HoodieAvroUtils.getNestedFieldVal(gr, preCombineField, false, parameters.getOrElse( - DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) - .asInstanceOf[Comparable[_]] - } + val orderingVal = if (preCombineField.split(":").length > 1) preCombineField + else HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse( + DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) + .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, keyGenerator.getKey(gr),