From 76b7c25b4b0b862ce563eeb8189ac28c3cf2c103 Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Mon, 18 Mar 2024 15:47:56 +0800 Subject: [PATCH] fix rebase error --- ...nClickHouseMergeTreeWriteOnHDFSSuite.scala | 112 +++++++++++------ ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 113 ++++++++++++------ .../substrait/rel/ExtensionTableNode.java | 8 +- 3 files changed, 160 insertions(+), 73 deletions(-) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index 744cf45b6906..aac9fe4cf68d 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -18,8 +18,10 @@ package io.glutenproject.execution import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf} import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 +import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -34,8 +36,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { - override protected val resourcePath: String = - "../../../../gluten-core/src/test/resources/tpch-data" + override protected val needCopyParquetToTablePath = true override protected val tablesPath: String = basePath + "/tpch-data" override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" @@ -94,7 +95,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite } override protected def createTPCHNotNullTables(): Unit = { - createTPCHParquetTables(tablesPath) + createNotNullTPCHTablesInParquet(tablesPath) } override protected def beforeEach(): Unit = { @@ -185,13 +186,13 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.isEmpty) - assert(fileIndex.table.primaryKeyOption.isEmpty) - assert(fileIndex.table.partitionColumns.isEmpty) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) assert(addFiles.size == 1) assert(addFiles.head.rows == 600572) } @@ -269,13 +270,25 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_shipdate,l_orderkey")) - assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_shipdate")) - assert(fileIndex.table.partitionColumns.isEmpty) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_shipdate,l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_shipdate")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) assert(addFiles.size == 1) assert(addFiles.head.rows == 600572) } @@ -431,15 +444,33 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) assert(mergetreeScan.metrics("numFiles").value == 6) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey")) - assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_orderkey")) - assert(fileIndex.table.partitionColumns.size == 1) - assert(fileIndex.table.partitionColumns.head.equals("l_returnflag")) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 6) + assert(addFiles.map(_.rows).sum == 750735) } } @@ -516,18 +547,29 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val mergetreeScan = scanExec(0) assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isDefined) + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) if (sparkVersion.equals("3.2")) { - assert(fileIndex.table.orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) } - assert(fileIndex.table.primaryKeyOption.isEmpty) - assert(fileIndex.table.partitionColumns.size == 1) - assert(fileIndex.table.partitionColumns(0).equals("l_returnflag")) - val addFiles = fileIndex.table.listFiles() + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 12) assert(addFiles.map(_.rows).sum == 600572) } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index e4cc0b8bf74d..905a1a55843d 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -16,11 +16,14 @@ */ package io.glutenproject.execution +import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts + import _root_.org.apache.commons.io.FileUtils import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf} import _root_.org.apache.spark.sql.SaveMode import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import _root_.org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex import io.minio._ import io.minio.messages.DeleteObject @@ -34,8 +37,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { - override protected val resourcePath: String = - "../../../../gluten-core/src/test/resources/tpch-data" + override protected val needCopyParquetToTablePath = true override protected val tablesPath: String = basePath + "/tpch-data" override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" @@ -113,7 +115,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite } override protected def createTPCHNotNullTables(): Unit = { - createTPCHParquetTables(tablesPath) + createNotNullTPCHTablesInParquet(tablesPath) } override protected def beforeEach(): Unit = { super.beforeEach() @@ -218,13 +220,13 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.isEmpty) - assert(fileIndex.table.primaryKeyOption.isEmpty) - assert(fileIndex.table.partitionColumns.isEmpty) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) assert(addFiles.size == 1) assert(addFiles.head.rows == 600572) } @@ -302,13 +304,25 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_shipdate,l_orderkey")) - assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_shipdate")) - assert(fileIndex.table.partitionColumns.isEmpty) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_shipdate,l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_shipdate")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) assert(addFiles.size == 1) assert(addFiles.head.rows == 600572) } @@ -464,15 +478,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) assert(mergetreeScan.metrics("numFiles").value == 6) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isEmpty) - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey")) - assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_orderkey")) - assert(fileIndex.table.partitionColumns.size == 1) - assert(fileIndex.table.partitionColumns.head.equals("l_returnflag")) - val addFiles = fileIndex.table.listFiles() + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 6) + assert(addFiles.map(_.rows).sum == 750735) } } @@ -549,18 +581,29 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val mergetreeScan = scanExec(0) assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex] - assert(fileIndex.table.clickhouseTableConfigs.nonEmpty) - assert(fileIndex.table.bucketOption.isDefined) + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) if (sparkVersion.equals("3.2")) { - assert(fileIndex.table.orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) } - assert(fileIndex.table.primaryKeyOption.isEmpty) - assert(fileIndex.table.partitionColumns.size == 1) - assert(fileIndex.table.partitionColumns(0).equals("l_returnflag")) - val addFiles = fileIndex.table.listFiles() + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 12) assert(addFiles.map(_.rows).sum == 600572) } diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java index bf942ef267c7..f07e6fccb002 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java @@ -22,6 +22,7 @@ import com.google.protobuf.StringValue; import io.substrait.proto.ReadRel; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,10 +72,11 @@ public class ExtensionTableNode implements SplitInfo { this.maxPartsNum = maxPartsNum; this.database = database; this.tableName = tableName; - if (relativePath.contains(":/")) { // file:/tmp/xxx => tmp/xxx - this.relativePath = relativePath.substring(relativePath.indexOf(":/") + 2); + URI table_uri = URI.create(relativePath); + if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx + this.relativePath = table_uri.getPath().substring(1); } else { - this.relativePath = relativePath; + this.relativePath = table_uri.getPath(); } this.absolutePath = absolutePath; this.tableSchemaJson = tableSchemaJson;