From 67319fc94c0d4094ab159a17dfa18725e14285bb Mon Sep 17 00:00:00 2001 From: gaoxin Date: Tue, 27 Jun 2023 20:39:41 +0800 Subject: [PATCH] [opt](hudi) use spark bundle to read hudi data #21260 --- be/src/vec/exec/scan/hudi_jni_reader.cpp | 6 +- be/src/vec/exec/scan/hudi_jni_reader.h | 2 +- bin/start_be.sh | 12 +- .../apache/doris/avro/AvroColumnValue.java | 5 + .../org/apache/doris/avro/AvroJNIScanner.java | 1 + fe/be-java-extensions/hudi-scanner/pom.xml | 202 ++++- .../apache/doris/hudi/HudiColumnValue.java | 141 ++-- .../org/apache/doris/hudi/HudiJniScanner.java | 194 ++--- .../org/apache/doris/hudi/HudiScanParam.java | 194 ----- .../org/apache/doris/hudi/HudiScanUtils.java | 62 -- .../java/org/apache/doris/hudi/Utils.java | 21 + .../apache/doris/hudi/BaseSplitReader.scala | 702 ++++++++++++++++++ .../doris/hudi/HoodieRecordIterator.scala | 142 ++++ .../doris/hudi/MORSnapshotSplitReader.scala | 183 +++++ .../apache/doris/common/jni/JniScanner.java | 9 +- .../doris/common/jni/MockJniScanner.java | 11 +- .../doris/common/jni/vec/ColumnValue.java | 2 + .../common/jni/vec/NativeColumnValue.java | 48 ++ .../doris/common/jni/vec/ScanPredicate.java | 5 + .../doris/common/jni/vec/VectorColumn.java | 35 +- .../doris/common/jni/vec/VectorTable.java | 5 + .../maxcompute/MaxComputeColumnValue.java | 10 +- .../maxcompute/MaxComputeJniScanner.java | 7 - .../doris/paimon/PaimonColumnValue.java | 7 +- .../apache/doris/paimon/PaimonJniScanner.java | 7 - .../org/apache/doris/catalog/HudiUtils.java | 3 + 26 files changed, 1500 insertions(+), 516 deletions(-) delete mode 100644 fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java delete mode 100644 fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java create mode 100644 fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala create mode 100644 fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala create mode 100644 fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala create mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java diff --git a/be/src/vec/exec/scan/hudi_jni_reader.cpp b/be/src/vec/exec/scan/hudi_jni_reader.cpp index 1e02b8ee796b48..029135ac670bd2 100644 --- a/be/src/vec/exec/scan/hudi_jni_reader.cpp +++ b/be/src/vec/exec/scan/hudi_jni_reader.cpp @@ -21,6 +21,7 @@ #include #include "runtime/descriptors.h" +#include "runtime/runtime_state.h" #include "runtime/types.h" #include "vec/core/types.h" @@ -35,7 +36,7 @@ class Block; namespace doris::vectorized { -const std::string HudiJniReader::HADOOP_FS_PREFIX = "hadoop_fs."; +const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf."; HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, @@ -52,6 +53,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, } std::map params = { + {"query_id", print_id(_state->query_id())}, {"base_path", _hudi_params.base_path}, {"data_file_path", _hudi_params.data_file_path}, {"data_file_length", std::to_string(_hudi_params.data_file_length)}, @@ -65,7 +67,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, // Use compatible hadoop client to read data for (auto& kv : _scan_params.properties) { - params[HADOOP_FS_PREFIX + kv.first] = kv.second; + params[HADOOP_CONF_PREFIX + kv.first] = kv.second; } _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", params, diff --git a/be/src/vec/exec/scan/hudi_jni_reader.h b/be/src/vec/exec/scan/hudi_jni_reader.h index c3bd68056dea78..bf2dab943d8d80 100644 --- a/be/src/vec/exec/scan/hudi_jni_reader.h +++ b/be/src/vec/exec/scan/hudi_jni_reader.h @@ -46,7 +46,7 @@ class HudiJniReader : public GenericReader { ENABLE_FACTORY_CREATOR(HudiJniReader); public: - static const std::string HADOOP_FS_PREFIX; + static const std::string HADOOP_CONF_PREFIX; HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, const std::vector& file_slot_descs, RuntimeState* state, diff --git a/bin/start_be.sh b/bin/start_be.sh index abcaedf682e0be..c603239a8566b6 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -82,23 +82,23 @@ for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do if [[ -z "${DORIS_CLASSPATH}" ]]; then export DORIS_CLASSPATH="${f}" else - export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + export DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}" fi done if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then # add hadoop libs for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do - DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}" done for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do - DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}" done for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do - DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}" done for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do - DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}" done fi @@ -107,7 +107,7 @@ fi if command -v hadoop >/dev/null 2>&1; then HADOOP_SYSTEM_CLASSPATH="$(hadoop classpath --glob)" fi -export CLASSPATH="${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/:${DORIS_CLASSPATH}" +export CLASSPATH="${DORIS_CLASSPATH}:${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/" # DORIS_CLASSPATH is for self-managed jni export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}" diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java index 3c796f1fc7d191..dd72c9aad5010d 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java @@ -105,6 +105,11 @@ public String getString() { return inspectObject().toString(); } + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException(); + } + @Override public LocalDate getDate() { // avro has no date type diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 72b9e414169ec7..d3e1cad579a559 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -192,6 +192,7 @@ protected int getNext() throws IOException { return numRows; } + @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); List schemaFields = schema.getFields(); diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml index 098073504ead91..1b19da98870007 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/hudi-scanner/pom.xml @@ -30,83 +30,217 @@ under the License. ${basedir}/../../ 1 + 2.12.15 + 2.12 + 3.2.0 + 3.2 + 0.13.0 + 3.0.16 + 2.14.3 - org.apache.doris - java-common - ${project.version} + org.scala-lang + scala-library + ${scala.version} + compile + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${hudi.version} + + + org.apache.hudi + hudi-spark-client + ${hudi.version} + + + org.apache.hudi + hudi-spark3-common + ${hudi.version} + + + org.apache.hudi + hudi-spark3.2.x_${scala.binary.version} + ${hudi.version} - fe-common - org.apache.doris + json4s-ast_2.11 + org.json4s + + + json4s-core_2.11 + org.json4s + + + json4s-jackson_2.11 + org.json4s + + + json4s-scalap_2.11 + org.json4s - - org.apache.hudi - hudi-hadoop-mr-bundle - ${hudi.version} + org.apache.parquet + parquet-avro + 1.10.1 + compile + + + org.apache.spark + spark-core_${scala.binary.version} - com.google.protobuf - protobuf-java + javax.servlet + * - commons-lang - commons-lang + jackson-module-scala_2.12 + com.fasterxml.jackson.module - org.apache.hudi - hudi-common + hadoop-client-api + org.apache.hadoop - org.apache.parquet - parquet-avro + hadoop-client-runtime + org.apache.hadoop + + ${spark.version} + compile + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + compile + - org.apache.avro - avro + org.codehaus.janino + janino - hudi-hadoop-mr - org.apache.hudi + org.codehaus.janino + commons-compiler - com.facebook.presto.hive - hive-apache - ${presto.hive.version} + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + compile + + + org.apache.spark + spark-launcher_${scala.binary.version} + ${spark.version} + compile + + + + org.codehaus.janino + janino + ${janino.version} - org.slf4j - slf4j-log4j12 + org.codehaus.janino + commons-compiler - org.apache.hadoop - hadoop-client + org.codehaus.janino + commons-compiler + ${janino.version} - org.apache.hadoop - hadoop-common + + com.fasterxml.jackson.module + jackson-module-scala_${scala.binary.version} + ${fasterxml.jackson.version} + + + com.google.guava + guava + + + - org.apache.hadoop - hadoop-hdfs + org.apache.doris + java-common + ${project.version} - commons-io - commons-io + org.apache.hadoop + hadoop-common hudi-scanner + src/main/java + src/test/java + + + src/main/resources + + + + + src/test/resources + + + + + net.alchim31.maven + scala-maven-plugin + 4.7.2 + + + + compile + testCompile + + + + + ${scala.version} + + -unchecked + -deprecation + -feature + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + default-compile + none + + + default-testCompile + none + + + java-compile + + compile + testCompile + + compile + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java index 4a7ea36e44fa29..7d402e84292df7 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java @@ -18,51 +18,56 @@ package org.apache.doris.hudi; +import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.doris.common.jni.vec.NativeColumnValue; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import java.math.BigDecimal; import java.math.BigInteger; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; -public class HudiColumnValue implements ColumnValue { - public enum TimeUnit { - MILLIS, MICROS - } +public class HudiColumnValue implements ColumnValue, NativeColumnValue { + private boolean isUnsafe; + private InternalRow internalRow; + private int ordinal; + private int precision; + private int scale; - private final Object fieldData; - private final ObjectInspector fieldInspector; - private final TimeUnit timeUnit; + HudiColumnValue() { + } - HudiColumnValue(ObjectInspector fieldInspector, Object fieldData) { - this(fieldInspector, fieldData, TimeUnit.MICROS); + HudiColumnValue(InternalRow internalRow, int ordinal, int precision, int scale) { + this.isUnsafe = internalRow instanceof UnsafeRow; + this.internalRow = internalRow; + this.ordinal = ordinal; + this.precision = precision; + this.scale = scale; } - HudiColumnValue(ObjectInspector fieldInspector, Object fieldData, int timePrecision) { - this(fieldInspector, fieldData, timePrecision == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS); + public void reset(InternalRow internalRow, int ordinal, int precision, int scale) { + this.isUnsafe = internalRow instanceof UnsafeRow; + this.internalRow = internalRow; + this.ordinal = ordinal; + this.precision = precision; + this.scale = scale; } - HudiColumnValue(ObjectInspector fieldInspector, Object fieldData, TimeUnit timeUnit) { - this.fieldInspector = fieldInspector; - this.fieldData = fieldData; - this.timeUnit = timeUnit; + public void reset(int ordinal, int precision, int scale) { + this.ordinal = ordinal; + this.precision = precision; + this.scale = scale; } - private Object inspectObject() { - return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); + public void reset(InternalRow internalRow) { + this.isUnsafe = internalRow instanceof UnsafeRow; + this.internalRow = internalRow; } @Override @@ -72,96 +77,89 @@ public boolean canGetStringAsBytes() { @Override public boolean isNull() { - return false; + return internalRow.isNullAt(ordinal); } @Override public boolean getBoolean() { - return (boolean) inspectObject(); + return internalRow.getBoolean(ordinal); } @Override public byte getByte() { - return 0; + return internalRow.getByte(ordinal); } @Override public short getShort() { - return (short) inspectObject(); + return internalRow.getShort(ordinal); } @Override public int getInt() { - return (int) inspectObject(); + return internalRow.getInt(ordinal); } @Override public float getFloat() { - return (float) inspectObject(); + return internalRow.getFloat(ordinal); } @Override public long getLong() { - return (long) inspectObject(); + return internalRow.getLong(ordinal); } @Override public double getDouble() { - return (double) inspectObject(); + return internalRow.getDouble(ordinal); } @Override public BigInteger getBigInteger() { - throw new UnsupportedOperationException("Hudi type does not support largeint"); + throw new UnsupportedOperationException("Hoodie type does not support largeint"); } @Override public BigDecimal getDecimal() { - return ((HiveDecimalObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData).bigDecimalValue(); + return internalRow.getDecimal(ordinal, precision, scale).toJavaBigDecimal(); } @Override public String getString() { - return inspectObject().toString(); + return internalRow.getUTF8String(ordinal).toString(); + } + + @Override + public byte[] getStringAsBytes() { + return internalRow.getUTF8String(ordinal).getBytes(); } @Override public LocalDate getDate() { - return ((DateObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData).toLocalDate(); + return LocalDate.ofEpochDay(internalRow.getInt(ordinal)); } @Override public LocalDateTime getDateTime() { - if (fieldData instanceof LongWritable) { - long datetime = ((LongWritable) fieldData).get(); - long seconds; - long nanoseconds; - if (timeUnit == TimeUnit.MILLIS) { - seconds = datetime / 1000; - nanoseconds = (datetime % 1000) * 1000000; - } else { - seconds = datetime / 1000000; - nanoseconds = (datetime % 1000000) * 1000; - } - return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanoseconds), ZoneId.systemDefault()); + long datetime = internalRow.getLong(ordinal); + long seconds; + long nanoseconds; + if (precision == 3) { + seconds = datetime / 1000; + nanoseconds = (datetime % 1000) * 1000000; + } else if (precision == 6) { + seconds = datetime / 1000000; + nanoseconds = (datetime % 1000000) * 1000; } else { - return ((TimestampObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData).toLocalDateTime(); + throw new RuntimeException("Hoodie timestamp only support milliseconds and microseconds"); } + return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanoseconds), ZoneId.systemDefault()); } @Override public byte[] getBytes() { - // Get bytes directly if fieldData is BytesWritable or Text to avoid decoding&encoding - if (fieldData instanceof BytesWritable) { - return ((BytesWritable) fieldData).getBytes(); - } - if (fieldData instanceof Text) { - return ((Text) fieldData).getBytes(); - } - if (fieldData instanceof String) { - return ((String) fieldData).getBytes(StandardCharsets.UTF_8); - } - return (byte[]) inspectObject(); + return internalRow.getBinary(ordinal); } @Override @@ -179,4 +177,23 @@ public void unpackStruct(List structFieldIndex, List value } + @Override + public NativeValue getNativeValue(ColumnType.Type type) { + if (isUnsafe) { + UnsafeRow unsafeRow = (UnsafeRow) internalRow; + switch (type) { + case CHAR: + case VARCHAR: + case BINARY: + case STRING: + long offsetAndSize = unsafeRow.getLong(ordinal); + int offset = (int) (offsetAndSize >> 32); + int size = (int) offsetAndSize; + return new NativeValue(unsafeRow.getBaseObject(), offset, size); + default: + return null; + } + } + return null; + } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index bdc4960ec153b1..d067493a79dc79 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -19,33 +19,21 @@ import org.apache.doris.common.jni.JniScanner; -import org.apache.doris.common.jni.vec.ColumnValue; -import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ScanPredicate; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.log4j.Logger; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.Filter; +import scala.collection.Iterator; +import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,118 +45,44 @@ */ public class HudiJniScanner extends JniScanner { private static final Logger LOG = Logger.getLogger(HudiJniScanner.class); - private final HudiScanParam hudiScanParam; - UserGroupInformation ugi = null; - private RecordReader reader; - private StructObjectInspector rowInspector; - private Deserializer deserializer; + private final int fetchSize; + private final HoodieSplit split; + private final ScanPredicate[] predicates; private final ClassLoader classLoader; + private final UserGroupInformation ugi; private long getRecordReaderTimeNs = 0; + private Iterator recordIterator; public HudiJniScanner(int fetchSize, Map params) { if (LOG.isDebugEnabled()) { LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()) .collect(Collectors.joining("\n"))); } - this.hudiScanParam = new HudiScanParam(fetchSize, params); this.classLoader = this.getClass().getClassLoader(); - } - - - @Override - public void open() throws IOException { - try { - initTableInfo(hudiScanParam.getRequiredTypes(), hudiScanParam.getRequiredFields(), null, - hudiScanParam.getFetchSize()); - Properties properties = hudiScanParam.createProperties(); - JobConf jobConf = HudiScanUtils.createJobConf(properties); - ugi = Utils.getUserGroupInformation(jobConf); - init(jobConf, properties); - } catch (Exception e) { - close(); - throw new IOException("Failed to open the hudi reader.", e); - } - } - - @Override - public void close() throws IOException { - try { - if (reader != null) { - reader.close(); - } - } catch (IOException e) { - LOG.error("Failed to close the hudi reader.", e); - throw e; - } - } - - @Override - public int getNext() throws IOException { - try { - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - - int readRowNumbers = 0; - while (readRowNumbers < getBatchSize()) { - if (!reader.next(key, value)) { - break; - } - Object rowData = deserializer.deserialize(value); - - for (int i = 0; i < hudiScanParam.getRequiredFields().length; i++) { - Object fieldData = rowInspector.getStructFieldData(rowData, hudiScanParam.getStructFields()[i]); - if (fieldData == null) { - appendData(i, null); - } else { - ColumnValue fieldValue = new HudiColumnValue(hudiScanParam.getFieldInspectors()[i], fieldData, - hudiScanParam.getRequiredTypes()[i].getPrecision()); - appendData(i, fieldValue); - } - } - readRowNumbers++; + String predicatesAddressString = params.remove("push_down_predicates"); + this.fetchSize = fetchSize; + this.split = new HoodieSplit(params); + if (predicatesAddressString == null) { + predicates = new ScanPredicate[0]; + } else { + long predicatesAddress = Long.parseLong(predicatesAddressString); + if (predicatesAddress != 0) { + predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes()); + LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); + } else { + predicates = new ScanPredicate[0]; } - return readRowNumbers; - } catch (Exception e) { - close(); - throw new IOException("Failed to get the next batch of hudi.", e); } + ugi = Utils.getUserGroupInformation(split.hadoopConf()); } @Override - protected TableSchema parseTableSchema() throws UnsupportedOperationException { - // do nothing - return null; - } - - private void init(JobConf jobConf, Properties properties) throws Exception { - String basePath = hudiScanParam.getBasePath(); - String dataFilePath = hudiScanParam.getDataFilePath(); - long dataFileLength = hudiScanParam.getDataFileLength(); - String[] deltaFilePaths = hudiScanParam.getDeltaFilePaths(); - String[] requiredFields = hudiScanParam.getRequiredFields(); - - String realtimePath = dataFilePath.isEmpty() ? deltaFilePaths[0] : dataFilePath; - long realtimeLength = dataFileLength > 0 ? dataFileLength : 0; - - Path path = new Path(realtimePath); - - FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, (String[]) null); - List logFiles = Arrays.stream(deltaFilePaths).map(HoodieLogFile::new) - .collect(Collectors.toList()); - - FileSplit hudiSplit = - new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, hudiScanParam.getInstantTime(), false, - Option.empty()); - - InputFormat inputFormatClass = HudiScanUtils.createInputFormat(jobConf, hudiScanParam.getInputFormat()); - - // org.apache.hudi.common.util.SerializationUtils$KryoInstantiator.newKryo - // throws error like `java.lang.IllegalArgumentException: classLoader cannot be null`. - // Set the default class loader + public void open() throws IOException { Thread.currentThread().setContextClassLoader(classLoader); - + initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); + long startTime = System.nanoTime(); // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, // so use another process to kill this stuck process. // TODO(gaoxin): better way to solve the stuck process? @@ -190,31 +104,49 @@ private void init(JobConf jobConf, Properties properties) throws Exception { } } }, 100, 1000, TimeUnit.MILLISECONDS); - - long startTime = System.nanoTime(); if (ugi != null) { - reader = ugi.doAs((PrivilegedExceptionAction>) () -> { - RecordReader ugiReader - = (RecordReader) inputFormatClass.getRecordReader(hudiSplit, - jobConf, Reporter.NULL); - return ugiReader; - }); + try { + recordIterator = ugi.doAs( + (PrivilegedExceptionAction>) () -> new MORSnapshotSplitReader( + split).buildScanIterator(split.requiredFields(), new Filter[0])); + } catch (InterruptedException e) { + throw new IOException(e); + } } else { - reader = (RecordReader) inputFormatClass - .getRecordReader(hudiSplit, jobConf, Reporter.NULL); + recordIterator = new MORSnapshotSplitReader(split) + .buildScanIterator(split.requiredFields(), new Filter[0]); } - getRecordReaderTimeNs += System.nanoTime() - startTime; isKilled.set(true); executorService.shutdownNow(); + getRecordReaderTimeNs += System.nanoTime() - startTime; + } - deserializer = HudiScanUtils.getDeserializer(jobConf, properties, hudiScanParam.getSerde()); - - rowInspector = (StructObjectInspector) deserializer.getObjectInspector(); + @Override + public void close() throws IOException { + if (recordIterator instanceof Closeable) { + ((Closeable) recordIterator).close(); + } + } - for (int i = 0; i < requiredFields.length; i++) { - StructField field = rowInspector.getStructFieldRef(requiredFields[i]); - hudiScanParam.getStructFields()[i] = field; - hudiScanParam.getFieldInspectors()[i] = field.getFieldObjectInspector(); + @Override + public int getNext() throws IOException { + try { + int readRowNumbers = 0; + HudiColumnValue columnValue = new HudiColumnValue(); + int numFields = split.requiredFields().length; + ColumnType[] columnTypes = split.requiredTypes(); + while (readRowNumbers < fetchSize && recordIterator.hasNext()) { + columnValue.reset(recordIterator.next()); + for (int i = 0; i < numFields; i++) { + columnValue.reset(i, columnTypes[i].getPrecision(), columnTypes[i].getScale()); + appendData(i, columnValue); + } + readRowNumbers++; + } + return readRowNumbers; + } catch (Exception e) { + close(); + throw new IOException("Failed to get the next batch of hudi.", e); } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java deleted file mode 100644 index 4343dd3f492c80..00000000000000 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java +++ /dev/null @@ -1,194 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.hudi; - - -import org.apache.doris.common.jni.vec.ColumnType; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -/** - * The hudi scan param - */ -public class HudiScanParam { - public static String HADOOP_FS_PREFIX = "hadoop_fs."; - - private final int fetchSize; - private final String basePath; - private final String dataFilePath; - private final long dataFileLength; - private final String[] deltaFilePaths; - private final String hudiColumnNames; - private final String[] hudiColumnTypes; - private final String[] requiredFields; - private int[] requiredColumnIds; - private ColumnType[] requiredTypes; - private final String[] nestedFields; - private final String instantTime; - private final String serde; - private final String inputFormat; - private final ObjectInspector[] fieldInspectors; - private final StructField[] structFields; - private final Map hadoopConf; - - public HudiScanParam(int fetchSize, Map params) { - this.fetchSize = fetchSize; - this.basePath = params.get("base_path"); - this.dataFilePath = params.get("data_file_path"); - this.dataFileLength = Long.parseLong(params.get("data_file_length")); - String deltaFilePaths = params.get("delta_file_paths"); - - if (StringUtils.isEmpty(deltaFilePaths)) { - this.deltaFilePaths = new String[0]; - } else { - this.deltaFilePaths = deltaFilePaths.split(","); - } - - this.hudiColumnNames = params.get("hudi_column_names"); - this.hudiColumnTypes = params.get("hudi_column_types").split("#"); - this.requiredFields = params.get("required_fields").split(","); - this.nestedFields = params.getOrDefault("nested_fields", "").split(","); - this.instantTime = params.get("instant_time"); - this.serde = params.get("serde"); - this.inputFormat = params.get("input_format"); - this.fieldInspectors = new ObjectInspector[requiredFields.length]; - this.structFields = new StructField[requiredFields.length]; - - hadoopConf = new HashMap<>(); - for (Map.Entry kv : params.entrySet()) { - if (kv.getKey().startsWith(HADOOP_FS_PREFIX)) { - hadoopConf.put(kv.getKey().substring(HADOOP_FS_PREFIX.length()), kv.getValue()); - } - } - - parseRequiredColumns(); - } - - private void parseRequiredColumns() { - String[] hiveColumnNames = this.hudiColumnNames.split(","); - Map hiveColumnNameToIndex = new HashMap<>(); - Map hiveColumnNameToType = new HashMap<>(); - for (int i = 0; i < hiveColumnNames.length; i++) { - hiveColumnNameToIndex.put(hiveColumnNames[i], i); - hiveColumnNameToType.put(hiveColumnNames[i], this.hudiColumnTypes[i]); - } - requiredTypes = new ColumnType[requiredFields.length]; - requiredColumnIds = new int[requiredFields.length]; - for (int i = 0; i < requiredFields.length; i++) { - requiredColumnIds[i] = hiveColumnNameToIndex.get(requiredFields[i]); - String type = hiveColumnNameToType.get(requiredFields[i]); - requiredTypes[i] = ColumnType.parseType(requiredFields[i], type); - } - } - - public Properties createProperties() { - Properties properties = new Properties(); - - properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, - Arrays.stream(requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(","))); - properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, String.join(",", requiredFields)); - properties.setProperty(HudiScanUtils.COLUMNS, hudiColumnNames); - // recover INT64 based timestamp mark to hive type, timestamp(3)/timestamp(6) => timestamp - properties.setProperty(HudiScanUtils.COLUMNS_TYPES, - Arrays.stream(hudiColumnTypes).map(type -> type.startsWith("timestamp(") ? "timestamp" : type).collect( - Collectors.joining(","))); - properties.setProperty(serdeConstants.SERIALIZATION_LIB, serde); - - for (Map.Entry kv : hadoopConf.entrySet()) { - properties.setProperty(kv.getKey(), kv.getValue()); - } - - return properties; - } - - - public int getFetchSize() { - return fetchSize; - } - - public String getBasePath() { - return basePath; - } - - public String getDataFilePath() { - return dataFilePath; - } - - public long getDataFileLength() { - return dataFileLength; - } - - public String[] getDeltaFilePaths() { - return deltaFilePaths; - } - - public String getHudiColumnNames() { - return hudiColumnNames; - } - - public String[] getHudiColumnTypes() { - return hudiColumnTypes; - } - - public String[] getRequiredFields() { - return requiredFields; - } - - public int[] getRequiredColumnIds() { - return requiredColumnIds; - } - - public ColumnType[] getRequiredTypes() { - return requiredTypes; - } - - public String[] getNestedFields() { - return nestedFields; - } - - public String getInstantTime() { - return instantTime; - } - - public String getSerde() { - return serde; - } - - public String getInputFormat() { - return inputFormat; - } - - public ObjectInspector[] getFieldInspectors() { - return fieldInspectors; - } - - public StructField[] getStructFields() { - return structFields; - } - -} diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java deleted file mode 100644 index 78fff5229383f5..00000000000000 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.hudi; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; - -import java.util.Properties; - - -/** - * The hudi scan utils - */ -public class HudiScanUtils { - - public static final String COLUMNS = "columns"; - public static final String COLUMNS_TYPES = "columns.types"; - - public static JobConf createJobConf(Properties properties) { - JobConf jobConf = new JobConf(new Configuration()); - jobConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - properties.stringPropertyNames().forEach(name -> jobConf.set(name, properties.getProperty(name))); - return jobConf; - } - - public static InputFormat createInputFormat(Configuration conf, String inputFormat) throws Exception { - Class clazz = conf.getClassByName(inputFormat); - Class> cls = - (Class>) clazz.asSubclass(InputFormat.class); - return ReflectionUtils.newInstance(cls, conf); - } - - public static Deserializer getDeserializer(Configuration configuration, Properties properties, String name) - throws Exception { - Class deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) - .asSubclass(Deserializer.class); - Deserializer deserializer = deserializerClass.getConstructor().newInstance(); - deserializer.initialize(configuration, properties); - return deserializer; - } - -} diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java index c7f3ecf9dab289..9dcfacebb8c087 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java @@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hudi.common.table.HoodieTableMetaClient; import sun.management.VMManagement; import java.io.BufferedReader; @@ -30,6 +31,7 @@ import java.lang.management.RuntimeMXBean; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; import java.util.LinkedList; import java.util.List; @@ -110,4 +112,23 @@ public static void killProcess(long pid) { throw new RuntimeException("Couldn't kill process PID " + pid, e); } } + + public static HoodieTableMetaClient getMetaClient(Configuration conf, String basePath) { + UserGroupInformation ugi = getUserGroupInformation(conf); + HoodieTableMetaClient metaClient; + if (ugi != null) { + try { + metaClient = ugi.doAs( + (PrivilegedExceptionAction) () -> HoodieTableMetaClient.builder() + .setConf(conf).setBasePath(basePath).build()); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException("Cannot get hudi client.", e); + } + } else { + metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build(); + } + return metaClient; + } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala new file mode 100644 index 00000000000000..92dd6638473cd3 --- /dev/null +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -0,0 +1,702 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.doris.common.jni.vec.ColumnType +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.io.hfile.CacheConfig +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema} +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, SerializableConfiguration, TypedProperties} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.{ConfigUtils, StringUtils} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} +import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} +import org.apache.hudi.io.storage.HoodieAvroHFileReader +import org.apache.hudi.metadata.HoodieTableMetadataUtil +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, HoodieTableState} +import org.apache.log4j.Logger +import org.apache.spark.sql.adapter.Spark3_2Adapter +import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.{SQLContext, SparkSession, SparkSessionExtensions} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{SparkConf, SparkContext} + +import java.lang.reflect.Constructor +import java.net.URI +import java.util.Objects +import java.util.concurrent.TimeUnit +import java.{util => jutil} +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} + +class DorisSparkAdapter extends Spark3_2Adapter { + override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters +} + +class HoodieSplit(private val params: jutil.Map[String, String]) { + val queryId: String = params.remove("query_id") + val basePath: String = params.remove("base_path") + val dataFilePath: String = params.remove("data_file_path") + val dataFileLength: Long = params.remove("data_file_length").toLong + val deltaFilePaths: Array[String] = { + val deltas = params.remove("delta_file_paths") + if (StringUtils.isNullOrEmpty(deltas)) new Array[String](0) else deltas.split(",") + } + + val hudiColumnNames: Array[String] = params.remove("hudi_column_names").split(",") + val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip( + params.remove("hudi_column_types").split("#")).toMap + + val requiredFields: Array[String] = params.remove("required_fields").split(",") + val requiredTypes: Array[ColumnType] = requiredFields.map( + field => ColumnType.parseType(field, hudiColumnTypes(field))) + + val nestedFields: Array[String] = { + val fields = params.remove("nested_fields") + if (StringUtils.isNullOrEmpty(fields)) new Array[String](0) else fields.split(",") + } + val instantTime: String = params.remove("instant_time") + val serde: String = params.remove("serde") + val inputFormat: String = params.remove("input_format") + + val hadoopProperties: Map[String, String] = { + val properties = new jutil.HashMap[String, String] + val iterator = params.entrySet().iterator() + while (iterator.hasNext) { + val kv = iterator.next() + if (kv.getKey.startsWith(BaseSplitReader.HADOOP_CONF_PREFIX)) { + properties.put(kv.getKey.substring(BaseSplitReader.HADOOP_CONF_PREFIX.length), kv.getValue) + iterator.remove() + } + } + properties.asScala.toMap + } + + lazy val hadoopConf: Configuration = { + val conf = new Configuration + hadoopProperties.foreach(kv => conf.set(kv._1, kv._2)) + conf + } + + // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain + // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that + // case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema + // from the table itself + val schemaSpec: Option[StructType] = None + + val optParams: Map[String, String] = params.asScala.toMap + + override def equals(obj: Any): Boolean = { + if (obj == null) { + return false + } + obj match { + case split: HoodieSplit => + hashCode() == split.hashCode() + case _ => false + } + } + + override def hashCode(): Int = { + Objects.hash(queryId, basePath) + } +} + +case class HoodieTableInformation(sparkSession: SparkSession, + metaClient: HoodieTableMetaClient, + timeline: HoodieTimeline, + tableConfig: HoodieTableConfig, + tableAvroSchema: Schema, + internalSchemaOpt: Option[InternalSchema]) + +/** + * Reference to Apache Hudi + * see HoodieBaseRelation + */ +abstract class BaseSplitReader(val split: HoodieSplit) { + + import BaseSplitReader._ + + protected val optParams: Map[String, String] = split.optParams + + protected val tableInformation: HoodieTableInformation = cache.get(split) + + protected val sparkSession: SparkSession = tableInformation.sparkSession + protected val sqlContext: SQLContext = sparkSession.sqlContext + imbueConfigs(sqlContext) + + protected val tableConfig: HoodieTableConfig = tableInformation.tableConfig + + // NOTE: Record key-field is assumed singular here due to the either of + // - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part + // of the record's payload (as part of the Hudi's metadata) + // - In case Hudi's meta fields are disabled (virtual keys): in that case record has to bear _single field_ + // identified as its (unique) primary key w/in its payload (this is a limitation of [[SimpleKeyGenerator]], + // which is the only [[KeyGenerator]] permitted for virtual-keys payloads) + protected lazy val recordKeyField: String = + if (tableConfig.populateMetaFields()) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = tableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + protected lazy val preCombineFieldOpt: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } + + /** + * Columns that relation has to read from the storage to properly execute on its semantic: for ex, + * for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns, + * meaning that regardless of whether this columns are being requested by the query they will be fetched + * regardless so that relation is able to combine records properly (if necessary) + * + * @VisibleInTests + */ + val mandatoryFields: Seq[String] + + /** + * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O + * required to fetch table's Avro and Internal schemas + */ + protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { + (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt) + } + + protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema) + + protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + + protected lazy val specifiedQueryTimestamp: Option[String] = + optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) + + private def queryTimestamp: Option[String] = + specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) + + lazy val tableState: HoodieTableState = { + val recordMergerImpls = ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList + val recordMergerStrategy = getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY, + Option(tableInformation.metaClient.getTableConfig.getRecordMergerStrategy)) + val configProperties = getConfigProperties(sparkSession, optParams) + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(configProperties) + .enable(configProperties.getBoolean( + HoodieMetadataConfig.ENABLE.key(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS) + && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableInformation.metaClient)) + .build() + + // Subset of the state of table's configuration as of at the time of the query + HoodieTableState( + tablePath = split.basePath, + latestCommitTimestamp = queryTimestamp, + recordKeyField = recordKeyField, + preCombineFieldOpt = preCombineFieldOpt, + usesVirtualKeys = !tableConfig.populateMetaFields(), + recordPayloadClassName = tableConfig.getPayloadClass, + metadataConfig = metadataConfig, + recordMergerImpls = recordMergerImpls, + recordMergerStrategy = recordMergerStrategy + ) + } + + private def getConfigValue(config: ConfigProperty[String], + defaultValueOption: Option[String] = Option.empty): String = { + optParams.getOrElse(config.key(), + sqlContext.getConf(config.key(), defaultValueOption.getOrElse(config.defaultValue()))) + } + + def imbueConfigs(sqlContext: SQLContext): Unit = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + // TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + } + + def buildScanIterator(requiredColumns: Array[String], filters: Array[Filter]): Iterator[InternalRow] = { + // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES + // *Appending* additional columns to the ones requested by the caller is not a problem, as those + // will be eliminated by the caller's projection; + // (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this + // will break the upstream projection + val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns) + // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro + // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and + // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions + // w/ more than 2 types are involved) + val sourceSchema = tableAvroSchema + val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = + projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) + + val tableAvroSchemaStr = tableAvroSchema.toString + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) + val requiredSchema = HoodieTableSchema( + requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) + + composeIterator(tableSchema, requiredSchema, targetColumns, filters) + } + + /** + * Composes iterator provided file split to read from, table and partition schemas, data filters to be applied + * + * @param tableSchema target table's schema + * @param requiredSchema projected schema required by the reader + * @param requestedColumns columns requested by the query + * @param filters data filters to be applied + * @return instance of RDD (holding [[InternalRow]]s) + */ + protected def composeIterator(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + filters: Array[Filter]): Iterator[InternalRow] + + private final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { + // For a nested field in mandatory columns, we should first get the root-level field, and then + // check for any missing column, as the requestedColumns should only contain root-level fields + // We should only append root-level field as well + val missing = mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) + .filter(rootField => !requestedColumns.contains(rootField)) + requestedColumns ++ missing + } + + /** + * Projects provided schema by picking only required (projected) top-level columns from it + * + * @param tableSchema schema to project (either of [[InternalSchema]] or Avro's [[Schema]]) + * @param requiredColumns required top-level columns to be projected + */ + def projectSchema(tableSchema: Either[Schema, InternalSchema], + requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = { + tableSchema match { + case Right(internalSchema) => + checkState(!internalSchema.isEmptySchema) + val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema( + internalSchema, requiredColumns.toList.asJava) + val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema") + val requiredStructSchema = convertAvroSchemaToStructType(requiredAvroSchema) + + (requiredAvroSchema, requiredStructSchema, prunedInternalSchema) + + case Left(avroSchema) => + val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> f).toMap + val requiredFields = requiredColumns.map { col => + val f = fieldMap(col) + // We have to create a new [[Schema.Field]] since Avro schemas can't share field + // instances (and will throw "org.apache.avro.AvroRuntimeException: Field already used") + new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()) + }.toList + val requiredAvroSchema = Schema.createRecord(avroSchema.getName, avroSchema.getDoc, + avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava) + val requiredStructSchema = convertAvroSchemaToStructType(requiredAvroSchema) + + (requiredAvroSchema, requiredStructSchema, InternalSchema.getEmptyInternalSchema) + } + } + + /** + * Converts Avro's [[Schema]] to Catalyst's [[StructType]] + */ + protected def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { + val schemaConverters = sparkAdapter.getAvroSchemaConverters + schemaConverters.toSqlType(avroSchema) match { + case (dataType, _) => dataType.asInstanceOf[StructType] + } + } + + protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + // Since schema requested by the caller might contain partition columns, we might need to + // prune it, removing all partition columns from it in case these columns are not persisted + // in the data files + // + // NOTE: This partition schema is only relevant to file reader to be able to embed + // values of partition columns (hereafter referred to as partition values) encoded into + // the partition path, and omitted from the data file, back into fetched rows; + // Note that, by default, partition columns are not omitted therefore specifying + // partition schema for reader is not required + if (shouldExtractPartitionValuesFromPartitionPath) { + val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) + val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) + val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) + + (partitionSchema, + HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString), + HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString)) + } else { + (StructType(Nil), tableSchema, requiredSchema) + } + } + + /** + * Controls whether partition values (ie values of partition columns) should be + *
    + *
  1. Extracted from partition path and appended to individual rows read from the data file (we + * delegate this to Spark's [[ParquetFileFormat]])
  2. + *
  3. Read from the data-file as is (by default Hudi persists all columns including partition ones)
  4. + *
+ * + * This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]] + * config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for + * such partition columns (ie "partition values") will have to be parsed from the partition path, and appended + * to every row only in the fetched dataset. + * + * NOTE: Partition values extracted from partition path might be deviating from the values of the original + * partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch + * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format + * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the + * partition path, meaning that string value of "2022/01/01" will be appended, and not its original + * representation + */ + protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { + // Controls whether partition columns (which are the source for the partition path values) should + // be omitted from persistence in the data files. On the read path it affects whether partition values (values + // of partition columns) will be read from the data file or extracted from partition path + + val shouldOmitPartitionColumns = tableInformation.tableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty + val shouldExtractPartitionValueFromPath = + optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean + shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath + } + + private def prunePartitionColumns(dataStructSchema: StructType): StructType = + StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) + + /** + * For enable hoodie.datasource.write.drop.partition.columns, need to create an InternalRow on partition values + * and pass this reader on parquet file. So that, we can query the partition columns. + */ + protected def getPartitionColumnsAsInternalRow(): InternalRow = { + try { + if (shouldExtractPartitionValuesFromPartitionPath) { + val filePath = new Path(split.dataFilePath) + val relativePath = new URI(split.basePath).relativize(new URI(filePath.getParent.toString)).toString + val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean + if (hiveStylePartitioningEnabled) { + val partitionSpec = PartitioningUtils.parsePathFragment(relativePath) + InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString)) + } else { + if (partitionColumns.length == 1) { + InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath))) + } else { + val parts = relativePath.split("/") + assert(parts.size == partitionColumns.length) + InternalRow.fromSeq(parts.map(UTF8String.fromString)) + } + } + } else { + InternalRow.empty + } + } catch { + case NonFatal(e) => + LOG.warn(s"Failed to get the right partition InternalRow for file: ${split.dataFilePath}", e) + InternalRow.empty + } + } + + /** + * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]], + * when Parquet's Vectorized Reader is used + * + * TODO move to HoodieBaseRelation, make private + */ + private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration, + appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { + val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get + val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( + sparkSession = sparkSession, + dataSchema = dataSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ) + + file: PartitionedFile => { + val iter = readParquetFile(file) + iter.flatMap { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + } + } + } + + private def createHFileReader(spark: SparkSession, + dataSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + partitionedFile => { + val reader = new HoodieAvroHFileReader( + hadoopConf, new Path(partitionedFile.filePath), new CacheConfig(hadoopConf)) + + val requiredRowSchema = requiredDataSchema.structTypeSchema + // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable + // to be passed from driver to executor + val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) + val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter( + requiredAvroSchema, requiredRowSchema) + + reader.getRecordIterator(requiredAvroSchema).asScala + .map(record => { + avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get + }) + } + } + + /** + * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] + * over [[InternalRow]] + */ + protected def createBaseFileReader(spark: SparkSession, + partitionSchema: StructType, + dataSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): BaseFileReader = { + val tableBaseFileFormat = tableConfig.getBaseFileFormat + + // NOTE: PLEASE READ CAREFULLY + // Lambda returned from this method is going to be invoked on the executor, and therefore + // we have to eagerly initialize all of the readers even though only one specific to the type + // of the file being read will be used. This is required to avoid serialization of the whole + // relation (containing file-index for ex) and passing it to the executor + val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = + tableBaseFileFormat match { + case HoodieFileFormat.PARQUET => + val parquetReader = buildHoodieParquetReader( + sparkSession = spark, + dataSchema = dataSchema.structTypeSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredDataSchema.structTypeSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf, + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath + ) + // Since partition values by default are omitted, and not persisted w/in data-files by Spark, + // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading + // the data. As such, actual full schema produced by such reader is composed of + // a) Data-file schema (projected or not) + // b) Appended partition column values + val readerSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) + + (parquetReader, readerSchema) + + case HoodieFileFormat.HFILE => + val hfileReader = createHFileReader( + spark = spark, + dataSchema = dataSchema, + requiredDataSchema = requiredDataSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ) + + (hfileReader, requiredDataSchema.structTypeSchema) + + case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") + } + + BaseFileReader( + read = partitionedFile => { + val extension = FSUtils.getFileExtension(partitionedFile.filePath) + if (tableBaseFileFormat.getFileExtension.equals(extension)) { + read(partitionedFile) + } else { + throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") + } + }, + schema = schema + ) + } + + protected val timeline: HoodieTimeline = tableInformation.timeline + + protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { + val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema) + val querySchemaString = SerDeHelper.toJson(internalSchema) + if (!StringUtils.isNullOrEmpty(querySchemaString)) { + val validCommits = timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",") + LOG.warn(s"Table valid commits: $validCommits") + + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, split.basePath) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + } + conf + } +} + +object SparkMockHelper { + private lazy val mockSparkContext = { + val conf = new SparkConf().setMaster("local").setAppName("mock_sc") + .set("spark.ui.enabled", "false") + val sc = new SparkContext(conf) + sc.setLogLevel("WARN") + sc + } + + implicit class MockSparkSession(builder: SparkSession.Builder) { + def createMockSession(split: HoodieSplit): SparkSession = { + val sparkSessionClass = classOf[SparkSession] + val constructor: Constructor[SparkSession] = sparkSessionClass.getDeclaredConstructors + .find(_.getParameterCount == 5).get.asInstanceOf[Constructor[SparkSession]] + constructor.setAccessible(true) + val ss = constructor.newInstance(mockSparkContext, None, None, new SparkSessionExtensions, Map.empty) + split.hadoopProperties.foreach(kv => ss.sessionState.conf.setConfString(kv._1, kv._2)) + ss + } + } +} + +object BaseSplitReader { + + import SparkMockHelper.MockSparkSession + + private val LOG = Logger.getLogger(BaseSplitReader.getClass) + val HADOOP_CONF_PREFIX = "hadoop_conf." + + // Use [[SparkAdapterSupport]] instead ? + private lazy val sparkAdapter = new DorisSparkAdapter + + private lazy val cache: LoadingCache[HoodieSplit, HoodieTableInformation] = { + val loader = new CacheLoader[HoodieSplit, HoodieTableInformation] { + override def load(split: HoodieSplit): HoodieTableInformation = { + // create mock spark session + val sparkSession = SparkSession.builder().createMockSession(split) + val metaClient = Utils.getMetaClient(split.hadoopConf, split.basePath) + // NOTE: We're including compaction here since it's not considering a "commit" operation + val timeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants + + val specifiedQueryTimestamp: Option[String] = + split.optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) + val schemaResolver = new TableSchemaResolver(metaClient) + val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) { + None + } else { + Try { + specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata) + .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata) + } match { + case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt) + case Failure(_) => + None + } + } + val avroSchema: Schema = internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, "schema") + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) + } orElse { + split.schemaSpec.map(convertToAvroSchema) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + throw new HoodieSchemaException("Failed to fetch schema from the table", e) + } + } + + HoodieTableInformation(sparkSession, + metaClient, + timeline, + metaClient.getTableConfig, + avroSchema, + internalSchemaOpt) + } + } + CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .maximumSize(4096) + .build(loader) + } + + private def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } + + private def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { + val sqlConf: SQLConf = spark.sessionState.conf + val properties = new TypedProperties() + properties.putAll(options.filter(p => p._2 != null).asJava) + + // TODO(HUDI-5361) clean up properties carry-over + + // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users + // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. + val isMetadataTableEnabled = HoodieSparkConfUtils.getConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key, null) + if (isMetadataTableEnabled != null) { + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataTableEnabled)) + } + + val listingModeOverride = HoodieSparkConfUtils.getConfigValue(options, sqlConf, + DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null) + if (listingModeOverride != null) { + properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) + } + + properties + } +} diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala new file mode 100644 index 00000000000000..c5645655355708 --- /dev/null +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} +import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible +import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.catalyst.InternalRow + +import java.io.Closeable + +/** + * Class holding base-file readers for 3 different use-cases: + * + *
    + *
  1. Full-schema reader: is used when whole row has to be read to perform merging correctly. + * This could occur, when no optimizations could be applied and we have to fallback to read the whole row from + * the base file and the corresponding delta-log file to merge them correctly
  2. + * + *
  3. Required-schema reader: is used when it's fine to only read row's projected columns. + * This could occur, when row could be merged with corresponding delta-log record while leveraging only + * projected columns
  4. + * + *
  5. Required-schema reader (skip-merging): is used when when no merging will be performed (skip-merged). + * This could occur, when file-group has no delta-log files
  6. + *
+ */ +private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: BaseFileReader, + requiredSchemaReader: BaseFileReader, + requiredSchemaReaderSkipMerging: BaseFileReader) + +/** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in + * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not + * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) + */ +private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, + baseFileReader: BaseFileReader, + dataSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) + extends LogFileIterator(split, dataSchema, requiredSchema, tableState, config) { + + private val requiredSchemaProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) + + override def doHasNext: Boolean = { + if (baseFileIterator.hasNext) { + // No merge is required, simply load current row and project into required schema + nextRecord = requiredSchemaProjection(baseFileIterator.next()) + true + } else { + super[LogFileIterator].doHasNext + } + } +} + +/** + * Reference to Apache Hudi + * see HoodieMergeOnReadRDD + */ +class HoodieMORRecordIterator(config: Configuration, + fileReaders: HoodieMergeOnReadBaseFileReaders, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + mergeType: String, + fileSplit: HoodieMergeOnReadFileSplit) extends Iterator[InternalRow] with Closeable { + protected val maxCompactionMemoryInBytes: Long = config.getLongBytes( + "hoodie.compaction.memory", 512 * 1024 * 1024) + + protected val recordIterator: Iterator[InternalRow] = fileSplit match { + case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + projectedReader(dataFileOnlySplit.dataFile.get) + + case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => + new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, config) + + case split => mergeType match { + case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => + val reader = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) + + case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => + val reader = pickBaseFileReader() + new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, config) + + case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)") + } + } + + private def pickBaseFileReader(): BaseFileReader = { + // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum + // of the stored data possible, while still properly executing corresponding relation's semantic + // and meet the query's requirements. + // + // Here we assume that iff queried table does use one of the standard (and whitelisted) + // Record Payload classes then we can avoid reading and parsing the records w/ _full_ schema, + // and instead only rely on projected one, nevertheless being able to perform merging correctly + if (isProjectionCompatible(tableState)) { + fileReaders.requiredSchemaReader + } else { + fileReaders.fullSchemaReader + } + } + + override def hasNext: Boolean = { + recordIterator.hasNext + } + + override def next(): InternalRow = { + recordIterator.next() + } + + override def close(): Unit = { + recordIterator match { + case closeable: Closeable => + closeable.close() + case _ => + } + } +} diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala new file mode 100644 index 00000000000000..8ea9e1fcef16cb --- /dev/null +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi + +import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, HoodieTableSchema} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +/** + * Reference to Apache Hudi + * see MergeOnReadSnapshotRelation + */ +class MORSnapshotSplitReader(override val split: HoodieSplit) extends BaseSplitReader(split) { + /** + * NOTE: These are the fields that are required to properly fulfil Merge-on-Read (MOR) + * semantic: + * + *
    + *
  1. Primary key is required to make sure we're able to correlate records from the base + * file with the updated records from the delta-log file
  2. + *
  3. Pre-combine key is required to properly perform the combining (or merging) of the + * existing and updated records
  4. + *
+ * + * However, in cases when merging is NOT performed (for ex, if file-group only contains base + * files but no delta-log files, or if the query-type is equal to [["skip_merge"]]) neither + * of primary-key or pre-combine-key are required to be fetched from storage (unless requested + * by the query), therefore saving on throughput + */ + protected lazy val mandatoryFieldsForMerging: Seq[String] = + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) + + override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging + + protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, + DataSourceReadOptions.REALTIME_MERGE.defaultValue) + + override protected def composeIterator(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + filters: Array[Filter]): Iterator[InternalRow] = { + // todo: push down predicates about key field + val requiredFilters = Seq.empty + val optionalFilters = filters + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + + new HoodieMORRecordIterator(split.hadoopConf, + readers, + tableSchema, + requiredSchema, + tableState, + mergeType, + getFileSplit()) + } + + private def getFileSplit(): HoodieMergeOnReadFileSplit = { + val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_)) + .sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList + val partitionedBaseFile = if (split.dataFilePath.isEmpty) { + None + } else { + Some(PartitionedFile(getPartitionColumnsAsInternalRow(), split.dataFilePath, 0, split.dataFileLength)) + } + HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) + } + + override def imbueConfigs(sqlContext: SQLContext): Unit = { + super.imbueConfigs(sqlContext) + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") + // there's no thread local TaskContext, so the parquet reader will still use on heap memory even setting true + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.columnVector.offheap.enabled", "true") + } + + protected def createBaseFileReaders(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + requiredFilters: Seq[Filter], + optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + val fullSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = dataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(split.hadoopConf, internalSchemaOpt) + ) + + val requiredSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = requiredDataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(split.hadoopConf, requiredDataSchema.internalSchema) + ) + + // Check whether fields required for merging were also requested to be fetched + // by the query: + // - In case they were, there's no optimization we could apply here (we will have + // to fetch such fields) + // - In case they were not, we will provide 2 separate file-readers + // a) One which would be applied to file-groups w/ delta-logs (merging) + // b) One which would be applied to file-groups w/ no delta-logs or + // in case query-mode is skipping merging + val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) + if (mandatoryColumns.forall(requestedColumns.contains)) { + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReader + ) + } else { + val prunedRequiredSchema = { + val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) + val prunedStructSchema = + StructType(requiredDataSchema.structTypeSchema.fields + .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) + + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) + } + + val requiredSchemaReaderSkipMerging = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = prunedRequiredSchema, + // This file-reader is only used in cases when no merging is performed, therefore it's safe to push + // down these filters to the base file readers + filters = requiredFilters ++ optionalFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(split.hadoopConf, requiredDataSchema.internalSchema) + ) + + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging + ) + } + } +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java index c45b2ac8e5973b..5031a0182671ae 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java @@ -20,6 +20,7 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.doris.common.jni.vec.NativeColumnValue; import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; import org.apache.doris.common.jni.vec.VectorTable; @@ -45,7 +46,9 @@ public abstract class JniScanner { protected abstract int getNext() throws IOException; // parse table schema - protected abstract TableSchema parseTableSchema() throws UnsupportedOperationException; + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, int batchSize) { @@ -55,6 +58,10 @@ protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields this.batchSize = batchSize; } + protected void appendNativeData(int index, NativeColumnValue value) { + vectorTable.appendNativeData(index, value); + } + protected void appendData(int index, ColumnValue value) { vectorTable.appendData(index, value); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java index fc2928f8ed362a..3557b3b9032073 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java @@ -21,7 +21,6 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValue; import org.apache.doris.common.jni.vec.ScanPredicate; -import org.apache.doris.common.jni.vec.TableSchema; import org.apache.log4j.Logger; @@ -111,6 +110,11 @@ public String getString() { return "row-" + i + "-column-" + j; } + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException(); + } + @Override public LocalDate getDate() { return LocalDate.now(); @@ -196,9 +200,4 @@ protected int getNext() throws IOException { readRows += rows; return rows; } - - @Override - protected TableSchema parseTableSchema() throws UnsupportedOperationException { - return null; - } } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java index fa2e268366681a..0d1c522f9cbf06 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java @@ -55,6 +55,8 @@ public interface ColumnValue { String getString(); + byte[] getStringAsBytes(); + LocalDate getDate(); LocalDateTime getDateTime(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java new file mode 100644 index 00000000000000..8a0b4d2244cddd --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.vec; + +/** + * Native types of data that can be directly copied. + */ +public interface NativeColumnValue { + public static class NativeValue { + public final Object baseObject; + public final long offset; + public final int length; + + public NativeValue(Object baseObject, long offset) { + this.baseObject = baseObject; + this.offset = offset; + this.length = -1; + } + + public NativeValue(Object baseObject, long offset, int length) { + this.baseObject = baseObject; + this.offset = offset; + this.length = length; + } + } + + boolean isNull(); + + /** + * Return null if the type can't be copied directly + */ + NativeValue getNativeValue(ColumnType.Type type); +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java index 4553d29e18172d..e82f05c7d0a367 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java @@ -183,6 +183,11 @@ public String getString() { return toString(); } + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException(); + } + @Override public LocalDate getDate() { return LocalDate.now(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index f65ea8fae76a0e..3998a1a3270aff 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -21,6 +21,7 @@ import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.utils.TypeNativeBytes; import org.apache.doris.common.jni.vec.ColumnType.Type; +import org.apache.doris.common.jni.vec.NativeColumnValue.NativeValue; import java.math.BigDecimal; import java.math.BigInteger; @@ -550,6 +551,38 @@ public void updateMeta(VectorColumn meta) { } } + public void appendNativeValue(NativeColumnValue o) { + ColumnType.Type typeValue = columnType.getType(); + if (o == null || o.isNull()) { + appendNull(typeValue); + return; + } + NativeValue nativeValue = o.getNativeValue(typeValue); + if (nativeValue == null) { + // can't get native value, fall back to materialized value + appendValue((ColumnValue) o); + return; + } + if (nativeValue.length == -1) { + // java origin types + long typeSize = typeValue.size; + reserve(appendIndex + 1); + OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset, + null, data + typeSize * appendIndex, typeSize); + appendIndex++; + } else { + int byteLength = nativeValue.length; + VectorColumn bytesColumn = childColumns[0]; + int startOffset = bytesColumn.appendIndex; + bytesColumn.reserve(startOffset + byteLength); + OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset, + null, bytesColumn.data + startOffset, byteLength); + bytesColumn.appendIndex += byteLength; + OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + byteLength); + appendIndex++; + } + } + public void appendValue(ColumnValue o) { ColumnType.Type typeValue = columnType.getType(); if (o == null || o.isNull()) { @@ -598,7 +631,7 @@ public void appendValue(ColumnValue o) { case VARCHAR: case STRING: if (o.canGetStringAsBytes()) { - appendBytesAndOffset(o.getBytes()); + appendBytesAndOffset(o.getStringAsBytes()); } else { appendStringAndOffset(o.getString()); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java index e70d8f683f6b08..63b6f1ac2a9038 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java @@ -68,6 +68,11 @@ public VectorTable(ColumnType[] types, String[] fields, long metaAddress) { this.isRestoreTable = true; } + public void appendNativeData(int fieldId, NativeColumnValue o) { + assert (!isRestoreTable); + columns[fieldId].appendNativeValue(o); + } + public void appendData(int fieldId, ColumnValue o) { assert (!isRestoreTable); columns[fieldId].appendValue(o); diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 9fa9eea9f65bff..5dfd5a0bcf697b 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -60,7 +60,7 @@ public void reset(FieldVector column) { @Override public boolean canGetStringAsBytes() { - return false; + return true; } @Override @@ -152,6 +152,14 @@ public String getString() { return v == null ? new String(new byte[0]) : v; } + @Override + public byte[] getStringAsBytes() { + skippedIfNull(); + VarCharVector varcharCol = (VarCharVector) column; + byte[] v = varcharCol.getObject(idx++).getBytes(); + return v == null ? new byte[0] : v; + } + @Override public LocalDate getDate() { skippedIfNull(); diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 6a3d519670d51c..8f9b903afdc716 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -20,7 +20,6 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; -import org.apache.doris.common.jni.vec.TableSchema; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; @@ -239,12 +238,6 @@ protected int getNext() throws IOException { return realRows; } - @Override - protected TableSchema parseTableSchema() throws UnsupportedOperationException { - // do nothing - return null; - } - private int readVectors(int expectedRows) throws IOException { VectorSchemaRoot batch; int curReadRows = 0; diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java index d10c876de0862f..43999ac3a230b0 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java @@ -46,7 +46,7 @@ public void setOffsetRow(ColumnarRow record) { @Override public boolean canGetStringAsBytes() { - return false; + return true; } @Override @@ -99,6 +99,11 @@ public String getString() { return record.getString(idx).toString(); } + @Override + public byte[] getStringAsBytes() { + return record.getString(idx).toBytes(); + } + @Override public LocalDate getDate() { return Instant.ofEpochMilli(record.getTimestamp(idx, 3) diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 005b08b3a86117..190bdb829a4de9 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -21,7 +21,6 @@ import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; -import org.apache.doris.common.jni.vec.TableSchema; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.log4j.Logger; @@ -141,12 +140,6 @@ protected int getNext() throws IOException { return rows; } - @Override - protected TableSchema parseTableSchema() throws UnsupportedOperationException { - // do nothing - return null; - } - private Catalog create(CatalogContext context) throws IOException { Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE)); FileIO fileIO; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java index 1b57641efe052f..0799f7b137be2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java @@ -65,6 +65,9 @@ public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) { int scale = ((LogicalTypes.Decimal) logicalType).getScale(); return String.format("decimal(%s,%s)", precision, scale); } else { + if (columnType == Schema.Type.BYTES) { + return "binary"; + } return "string"; } case ARRAY: