From 1d99d1787548975dd5a382843af469be52f6074e Mon Sep 17 00:00:00 2001 From: Yinghao Lin Date: Fri, 8 Dec 2023 14:03:48 +0800 Subject: [PATCH] Add PartitionHelper --- sql/hive/pom.xml | 33 ++++++++++++------- .../apache/spark/sql/hive/TableReader.scala | 5 +-- .../sql/hive/client/HiveClientImpl.scala | 8 ++--- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index bab0274c461ac..624a3b711afd1 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -96,6 +96,7 @@ ${protobuf.version} --> + ${hive.group} hive-common @@ -126,18 +127,6 @@ hive-shims ${hive.shims.scope} - - io.transwarp.inceptor - inceptor-shaded - 1.0-SNAPSHOT - ${hive.serde.scope} - - - org.apache.hive - inceptor-serde - - - org.apache.hive hive-llap-common @@ -153,6 +142,26 @@ hive-contrib compile + + io.transwarp.inceptor + inceptor-shaded + 1.0-SNAPSHOT + ${hive.serde.scope} + + + org.apache.hive + inceptor-serde + + + org.apache.hive + inceptor-exec + + + org.apache.hive + inceptor-metastore + + + org.apache.avro diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 728e29865e3c2..113cf2322128f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import io.kyligence.compact.inceptor.PartitionHelper import io.transwarp.hive.shaded.serde2.objectinspector.ObjectInspectorConverters import io.transwarp.hive.shaded.serde2.objectinspector.primitive.DateObjectInspector import org.apache.hadoop.conf.Configuration @@ -215,7 +216,7 @@ class HadoopTableReader( val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer) .map { case (partition, partDeserializer) => - val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, partition, true) + val partDesc = Utilities.getPartitionDesc(partition) var partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) val skipHeaderLineCount = @@ -223,7 +224,7 @@ class HadoopTableReader( val isTextInputFormatTable = classOf[TextInputFormat].isAssignableFrom(partDesc.getInputFileFormatClass) // Get partition field info - val partSpec = partDesc.getPartSpec + val partSpec = PartitionHelper.getPartSpec(partDesc) val partProps = partDesc.getProperties val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c227555766236..d263d55faaa45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -17,17 +17,17 @@ package org.apache.spark.sql.hive.client +import io.kyligence.compact.inceptor.PartitionHelper + import java.io.PrintStream import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException import java.nio.charset.StandardCharsets.UTF_8 import java.util.{Locale, Map => JMap} import java.util.concurrent.TimeUnit._ - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.security.UserGroupInformation - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -1174,7 +1173,8 @@ private[hive] object HiveClientImpl extends Logging { Map.empty } CatalogTablePartition( - spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), + spec = Option(PartitionHelper.getSpec(hp.getTable, hp.getTPartition)) + .map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), inputFormat = Option(apiPartition.getSd.getInputFormat),