Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix rebase error
Browse files Browse the repository at this point in the history
liuneng1994 committed Mar 18, 2024
1 parent 1abf683 commit 76b7c25
Showing 3 changed files with 160 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -18,8 +18,10 @@ package io.glutenproject.execution

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
@@ -34,8 +36,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

override protected val resourcePath: String =
"../../../../gluten-core/src/test/resources/tpch-data"
override protected val needCopyParquetToTablePath = true

override protected val tablesPath: String = basePath + "/tpch-data"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
@@ -94,7 +95,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
}

override protected def createTPCHNotNullTables(): Unit = {
createTPCHParquetTables(tablesPath)
createNotNullTPCHTablesInParquet(tablesPath)
}

override protected def beforeEach(): Unit = {
@@ -185,13 +186,13 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.isEmpty)
assert(fileIndex.table.primaryKeyOption.isEmpty)
assert(fileIndex.table.partitionColumns.isEmpty)
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
}
@@ -269,13 +270,25 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_shipdate,l_orderkey"))
assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_shipdate"))
assert(fileIndex.table.partitionColumns.isEmpty)
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_shipdate,l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_shipdate"))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
}
@@ -431,15 +444,33 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
assert(mergetreeScan.metrics("numFiles").value == 6)

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey"))
assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_orderkey"))
assert(fileIndex.table.partitionColumns.size == 1)
assert(fileIndex.table.partitionColumns.head.equals("l_returnflag"))
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 6)
assert(addFiles.map(_.rows).sum == 750735)
}
}

@@ -516,18 +547,29 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val mergetreeScan = scanExec(0)
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isDefined)
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
if (sparkVersion.equals("3.2")) {
assert(fileIndex.table.orderByKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
} else {
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
}
assert(fileIndex.table.primaryKeyOption.isEmpty)
assert(fileIndex.table.partitionColumns.size == 1)
assert(fileIndex.table.partitionColumns(0).equals("l_returnflag"))
val addFiles = fileIndex.table.listFiles()
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 12)
assert(addFiles.map(_.rows).sum == 600572)
}
Original file line number Diff line number Diff line change
@@ -16,11 +16,14 @@
*/
package io.glutenproject.execution

import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts

import _root_.org.apache.commons.io.FileUtils
import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
import _root_.org.apache.spark.sql.SaveMode
import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import _root_.org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex
import io.minio._
import io.minio.messages.DeleteObject

@@ -34,8 +37,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

override protected val resourcePath: String =
"../../../../gluten-core/src/test/resources/tpch-data"
override protected val needCopyParquetToTablePath = true

override protected val tablesPath: String = basePath + "/tpch-data"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
@@ -113,7 +115,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}

override protected def createTPCHNotNullTables(): Unit = {
createTPCHParquetTables(tablesPath)
createNotNullTPCHTablesInParquet(tablesPath)
}
override protected def beforeEach(): Unit = {
super.beforeEach()
@@ -218,13 +220,13 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.isEmpty)
assert(fileIndex.table.primaryKeyOption.isEmpty)
assert(fileIndex.table.partitionColumns.isEmpty)
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
}
@@ -302,13 +304,25 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_shipdate,l_orderkey"))
assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_shipdate"))
assert(fileIndex.table.partitionColumns.isEmpty)
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_shipdate,l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_shipdate"))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
}
@@ -464,15 +478,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
assert(mergetreeScan.metrics("numFiles").value == 6)

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isEmpty)
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey"))
assert(fileIndex.table.primaryKeyOption.get.mkString(",").equals("l_orderkey"))
assert(fileIndex.table.partitionColumns.size == 1)
assert(fileIndex.table.partitionColumns.head.equals("l_returnflag"))
val addFiles = fileIndex.table.listFiles()
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 6)
assert(addFiles.map(_.rows).sum == 750735)
}
}

@@ -549,18 +581,29 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val mergetreeScan = scanExec(0)
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[ClickHouseFileIndex]
assert(fileIndex.table.clickhouseTableConfigs.nonEmpty)
assert(fileIndex.table.bucketOption.isDefined)
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
if (sparkVersion.equals("3.2")) {
assert(fileIndex.table.orderByKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
} else {
assert(fileIndex.table.orderByKeyOption.get.mkString(",").equals("l_orderkey"))
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
}
assert(fileIndex.table.primaryKeyOption.isEmpty)
assert(fileIndex.table.partitionColumns.size == 1)
assert(fileIndex.table.partitionColumns(0).equals("l_returnflag"))
val addFiles = fileIndex.table.listFiles()
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 12)
assert(addFiles.map(_.rows).sum == 600572)
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import com.google.protobuf.StringValue;
import io.substrait.proto.ReadRel;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -71,10 +72,11 @@ public class ExtensionTableNode implements SplitInfo {
this.maxPartsNum = maxPartsNum;
this.database = database;
this.tableName = tableName;
if (relativePath.contains(":/")) { // file:/tmp/xxx => tmp/xxx
this.relativePath = relativePath.substring(relativePath.indexOf(":/") + 2);
URI table_uri = URI.create(relativePath);
if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx
this.relativePath = table_uri.getPath().substring(1);
} else {
this.relativePath = relativePath;
this.relativePath = table_uri.getPath();
}
this.absolutePath = absolutePath;
this.tableSchemaJson = tableSchemaJson;

0 comments on commit 76b7c25

Please sign in to comment.