From 4855a4d7dcc3c35160b531cc5913d8c41ae41972 Mon Sep 17 00:00:00 2001 From: Shuai li Date: Fri, 22 Mar 2024 18:33:16 +0800 Subject: [PATCH] [GLUTEN-4675][CH] Support write mergetree to s3 (#4676) What changes were proposed in this pull request? support write mergetree on s3 and hdfs How was this patch tested? UT --------- Co-authored-by: liuneng1994 Co-authored-by: liuneng <1398775315@qq.com> --- backends-clickhouse/pom.xml | 28 +- ...nClickHouseMergeTreeWriteOnHDFSSuite.scala | 526 +++++++++++++++++ ...reeWriteOnObjectStorageAbstractSuite.scala | 188 ++++++ ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 543 ++++++++++++++++++ cpp-ch/local-engine/CMakeLists.txt | 6 +- cpp-ch/local-engine/Common/CHUtil.cpp | 25 + cpp-ch/local-engine/Common/CHUtil.h | 1 + cpp-ch/local-engine/Common/MergeTreeTool.cpp | 27 +- cpp-ch/local-engine/Common/MergeTreeTool.h | 9 +- .../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 75 +++ .../Disks/ObjectStorages/GlutenDiskHDFS.h | 63 ++ .../GlutenHDFSObjectStorage.cpp | 42 ++ .../ObjectStorages/GlutenHDFSObjectStorage.h | 53 ++ .../registerGlutenDiskObjectStorage.cpp | 120 ++++ .../Disks/registerGlutenDisks.cpp | 99 ++++ .../local-engine/Disks/registerGlutenDisks.h | 27 + .../Parser/MergeTreeRelParser.cpp | 13 +- .../local-engine/Parser/MergeTreeRelParser.h | 1 - .../Storages/CustomStorageMergeTree.cpp | 3 +- .../Storages/Mergetree/MetaDataHelper.cpp | 95 +++ .../Storages/Mergetree/MetaDataHelper.h | 29 + .../Mergetree/SparkMergeTreeWriter.cpp | 55 +- .../Storages/Mergetree/SparkMergeTreeWriter.h | 6 +- cpp-ch/local-engine/local_engine_jni.cpp | 2 +- .../substrait/rel/ExtensionTableNode.java | 8 +- 25 files changed, 2008 insertions(+), 36 deletions(-) create mode 100644 backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala create mode 100644 backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala create mode 100644 backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala create mode 100644 cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp create mode 100644 cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h create mode 100644 cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp create mode 100644 cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h create mode 100644 cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp create mode 100644 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp create mode 100644 cpp-ch/local-engine/Disks/registerGlutenDisks.h create mode 100644 cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp create mode 100644 cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index e0f96eda1d3d..f173a61c9e05 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -19,6 +19,12 @@ gluten-core ${project.version} compile + + + guava + com.google.guava + + io.glutenproject @@ -50,6 +56,16 @@ org.apache.spark spark-yarn_${scala.binary.version} provided + + + hadoop-client-api + org.apache.hadoop + + + hadoop-client-runtime + org.apache.hadoop + + org.apache.spark @@ -138,7 +154,11 @@ org.apache.hive hive-exec - + + guava + com.google.guava + + org.apache.hadoop @@ -181,6 +201,12 @@ 1.11.901 provided + + io.minio + minio + 8.5.9 + test + diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala new file mode 100644 index 000000000000..88fc977b7656 --- /dev/null +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem + +import java.io.File + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenClickHouseMergeTreeWriteOnHDFSSuite + extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + override protected def beforeEach(): Unit = { + super.beforeEach() + val conf = new Configuration + conf.set("fs.defaultFS", HDFS_URL) + val fs = FileSystem.get(conf) + fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) +// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) +// FileUtils.forceMkdir(new File(HDFS_CACHE_PATH)) + } + + override protected def afterEach(): Unit = { + super.afterEach() + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) +// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH)) + } + + test("test mergetree table write") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 1) + assert(addFiles.head.rows == 600572) + } + spark.sql("drop table lineitem_mergetree_hdfs") + } + + test("test mergetree write with orderby keys / primary keys") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='l_shipdate,l_orderkey', + | primaryKey='l_shipdate') + |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_orderbykey_hdfs + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_orderbykey_hdfs + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_shipdate,l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_shipdate")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 1) + assert(addFiles.head.rows == 600572) + } + spark.sql("drop table lineitem_mergetree_orderbykey_hdfs") + } + + test("test mergetree write with partition") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='l_orderkey', + | primaryKey='l_orderkey') + |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs' + |""".stripMargin) + + // dynamic partitions + spark.sql(s""" + | insert into table lineitem_mergetree_partition_hdfs + | select * from lineitem + |""".stripMargin) + + // write with dataframe api + val source = spark.sql(s""" + |select + | l_orderkey , + | l_partkey , + | l_suppkey , + | l_linenumber , + | l_quantity , + | l_extendedprice , + | l_discount , + | l_tax , + | l_returnflag , + | l_linestatus , + | l_shipdate , + | l_commitdate , + | l_receiptdate , + | l_shipinstruct , + | l_shipmode , + | l_comment + | from lineitem + | where l_shipdate BETWEEN date'1993-01-01' AND date'1993-01-10' + |""".stripMargin) + + source.write + .format("clickhouse") + .mode(SaveMode.Append) + .insertInto("lineitem_mergetree_partition_hdfs") + + // static partition + spark.sql(s""" + | insert into lineitem_mergetree_partition_hdfs PARTITION (l_returnflag = 'A') + | (l_shipdate, + | l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment) + | select + | l_shipdate, + | l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment from lineitem + | where l_returnflag = 'A' + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_partition_hdfs + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr, compareResult = false) { + df => + val result = df.collect() + assert(result.length == 4) + assert(result(0).getString(0).equals("A")) + assert(result(0).getString(1).equals("F")) + assert(result(0).getDouble(2) == 7578058.0) + + assert(result(2).getString(0).equals("N")) + assert(result(2).getString(1).equals("O")) + assert(result(2).getDouble(2) == 7454519.0) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + assert(mergetreeScan.metrics("numFiles").value == 6) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assert(addFiles.size == 6) + assert(addFiles.map(_.rows).sum == 750735) + } + spark.sql("drop table lineitem_mergetree_partition_hdfs") + } + + test("test mergetree write with bucket table") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |CLUSTERED BY (l_orderkey) + |${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_orderkey)"} INTO 4 BUCKETS + |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_bucket_hdfs + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_bucket_hdfs + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec(0) + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + if (sparkVersion.equals("3.2")) { + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + } else { + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + } + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assert(addFiles.size == 12) + assert(addFiles.map(_.rows).sum == 600572) + } + spark.sql("drop table lineitem_mergetree_bucket_hdfs") + } + +} +// scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala new file mode 100644 index 000000000000..e9b938dbfa2c --- /dev/null +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.GlutenConfig + +import org.apache.spark.sql.SparkSession + +import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf} +import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.commons.io.FileUtils + +import java.io.File + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + private var _spark: SparkSession = _ + + override protected def spark: SparkSession = _spark + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + protected val sparkVersion: String = { + val version = SPARK_VERSION_SHORT.split("\\.") + version(0) + "." + version(1) + } + + val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/" + val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/" + val S3_ENDPOINT = "s3://127.0.0.1:9000/" + val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http") + val BUCKET_NAME: String = sparkVersion.replace(".", "-") + val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/" + + val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" + val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" + val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020" + val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" + + val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" + val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E" + + override protected def initializeSession(): Unit = { + if (_spark == null) { + _spark = SparkSession + .builder() + .appName("Gluten-UT-RemoteHS") + .config(sparkConf) + .getOrCreate() + } + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .setMaster("local[2]") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.io.compression.codec", "LZ4") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "false") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path", + "/tmp/user_defined") + .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) + .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) + .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) + .set("spark.hadoop.fs.s3a.path.style.access", "true") + .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type", + "s3_gluten") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint", + WHOLE_PATH) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id", + S3_ACCESS_KEY) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key", + S3_SECRET_KEY) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path", + S3_METADATA_PATH) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type", + "cache") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk", + "s3") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path", + S3_CACHE_PATH) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size", + "10Gi") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes", + "main") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk", + "s3_cache") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type", + "hdfs_gluten") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint", + HDFS_URL_ENDPOINT + "/") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path", + HDFS_METADATA_PATH) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type", + "cache") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk", + "hdfs") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path", + HDFS_CACHE_PATH) + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size", + "10Gi") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes", + "main") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk", + "hdfs_cache") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit", + "false") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica", "1") + } + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + override protected def afterAll(): Unit = { + try { + super.afterAll() + } finally { + try { + if (_spark != null) { + try { + _spark.sessionState.catalog.reset() + } finally { + _spark.stop() + _spark = null + } + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } + + FileUtils.forceDelete(new File(basePath)) + // init GlutenConfig in the next beforeAll + GlutenConfig.ins = null + } +} +// scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala new file mode 100644 index 000000000000..45eb8625a028 --- /dev/null +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts + +import _root_.org.apache.commons.io.FileUtils +import _root_.org.apache.spark.sql.SaveMode +import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs, MinioClient, RemoveBucketArgs, RemoveObjectsArgs} +import io.minio.messages.DeleteObject + +import java.io.File +import java.util + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenClickHouseMergeTreeWriteOnS3Suite + extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + override protected def beforeEach(): Unit = { + super.beforeEach() + val client = MinioClient + .builder() + .endpoint(MINIO_ENDPOINT) + .credentials(S3_ACCESS_KEY, S3_SECRET_KEY) + .build() + if (client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) { + val results = + client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build()) + val objects = new util.LinkedList[DeleteObject]() + results.forEach( + obj => { + objects.add(new DeleteObject(obj.get().objectName())) + }) + val removeResults = client.removeObjects( + RemoveObjectsArgs.builder().bucket(BUCKET_NAME).objects(objects).build()) + removeResults.forEach(result => result.get().message()) + client.removeBucket(RemoveBucketArgs.builder().bucket(BUCKET_NAME).build()) + } + client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build()) + FileUtils.deleteDirectory(new File(S3_METADATA_PATH)) + FileUtils.deleteDirectory(new File(S3_CACHE_PATH)) + FileUtils.forceMkdir(new File(S3_METADATA_PATH)) + FileUtils.forceMkdir(new File(S3_CACHE_PATH)) + } + + override protected def afterEach(): Unit = { + super.afterEach() + FileUtils.deleteDirectory(new File(S3_METADATA_PATH)) + FileUtils.deleteDirectory(new File(S3_CACHE_PATH)) + } + + test("test mergetree table write") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_s3; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_s3 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_s3' + |TBLPROPERTIES (storage_policy='__s3_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_s3 + | select * from lineitem + |""".stripMargin) + FileUtils.deleteDirectory(new File(S3_METADATA_PATH)) + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_s3 + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 1) + assert(addFiles.head.rows == 600572) + } + spark.sql("drop table lineitem_mergetree_s3") // clean up + } + + test("test mergetree write with orderby keys / primary keys") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_s3 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (storage_policy='__s3_main', + | orderByKey='l_shipdate,l_orderkey', + | primaryKey='l_shipdate') + |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_orderbykey_s3' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_orderbykey_s3 + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_orderbykey_s3 + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_shipdate,l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_shipdate")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assert(addFiles.size == 1) + assert(addFiles.head.rows == 600572) + } + spark.sql("drop table lineitem_mergetree_orderbykey_s3") + } + + test("test mergetree write with partition") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_partition_s3; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_s3 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |TBLPROPERTIES (storage_policy='__s3_main', + | orderByKey='l_orderkey', + | primaryKey='l_orderkey') + |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_partition_s3' + |""".stripMargin) + + // dynamic partitions + spark.sql(s""" + | insert into table lineitem_mergetree_partition_s3 + | select * from lineitem + |""".stripMargin) + + // write with dataframe api + val source = spark.sql(s""" + |select + | l_orderkey , + | l_partkey , + | l_suppkey , + | l_linenumber , + | l_quantity , + | l_extendedprice , + | l_discount , + | l_tax , + | l_returnflag , + | l_linestatus , + | l_shipdate , + | l_commitdate , + | l_receiptdate , + | l_shipinstruct , + | l_shipmode , + | l_comment + | from lineitem + | where l_shipdate BETWEEN date'1993-01-01' AND date'1993-01-10' + |""".stripMargin) + + source.write + .format("clickhouse") + .mode(SaveMode.Append) + .insertInto("lineitem_mergetree_partition_s3") + + // static partition + spark.sql(s""" + | insert into lineitem_mergetree_partition_s3 PARTITION (l_returnflag = 'A') + | (l_shipdate, + | l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment) + | select + | l_shipdate, + | l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment from lineitem + | where l_returnflag = 'A' + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_partition_s3 + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr, compareResult = false) { + df => + val result = df.collect() + assert(result.length == 4) + assert(result(0).getString(0).equals("A")) + assert(result(0).getString(1).equals("F")) + assert(result(0).getDouble(2) == 7578058.0) + + assert(result(2).getString(0).equals("N")) + assert(result(2).getString(1).equals("O")) + assert(result(2).getDouble(2) == 7454519.0) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + assert(mergetreeScan.metrics("numFiles").value == 6) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assert(addFiles.size == 6) + assert(addFiles.map(_.rows).sum == 750735) + } + spark.sql("drop table lineitem_mergetree_partition_s3") + + } + + test("test mergetree write with bucket table") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_s3 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |CLUSTERED BY (l_orderkey) + |${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_orderkey)"} INTO 4 BUCKETS + |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3' + |TBLPROPERTIES (storage_policy='__s3_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_bucket_s3 + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_bucket_s3 + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec(0) + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + if (sparkVersion.equals("3.2")) { + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + } else { + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_orderkey")) + } + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(0) + .equals("l_returnflag")) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assert(addFiles.size == 12) + assert(addFiles.map(_.rows).sum == 600572) + } + spark.sql("drop table lineitem_mergetree_bucket_s3") + } + +} +// scalastyle:off line.size.limit diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index ae58aa70cf08..40fe4402f2f0 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -57,6 +57,8 @@ add_headers_and_sources(shuffle Shuffle) add_headers_and_sources(operator Operator) add_headers_and_sources(jni jni) add_headers_and_sources(aggregate_functions AggregateFunctions) +add_headers_and_sources(disks Disks) +add_headers_and_sources(disks Disks/ObjectStorages) include_directories( ${JNI_INCLUDE_DIRS} @@ -89,7 +91,9 @@ add_library(gluten_clickhouse_backend_libs ${shuffle_sources} ${operator_sources} ${aggregate_functions_sources} - ${jni_sources}) + ${jni_sources} + ${disks_sources} +) target_link_libraries(gluten_clickhouse_backend_libs PUBLIC substrait_source diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b728d7df0351..880f6668d8ba 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -67,6 +67,7 @@ #include #include "CHUtil.h" +#include "Disks/registerGlutenDisks.h" #include #include @@ -677,6 +678,19 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) global_context->setTemporaryStoragePath(config->getString("tmp_path", getDefaultPath()), 0); global_context->setPath(config->getString("path", "/")); + + String mark_cache_policy = config->getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY); + size_t mark_cache_size = config->getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE); + double mark_cache_size_ratio = config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO); + if (!mark_cache_size) + LOG_ERROR(&Poco::Logger::get("CHUtil"), "Too low mark cache size will lead to severe performance degradation."); + + global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio); + + String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY); + size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE); + double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO); + global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio); } } @@ -709,11 +723,22 @@ void registerAllFunctions() auto & factory = AggregateFunctionCombinatorFactory::instance(); registerAggregateFunctionCombinatorPartialMerge(factory); } + +} + +void registerGlutenDisks() +{ registerDisks(true); + +#if USE_AWS_S3 + registerGlutenDisks(true); +#endif } void BackendInitializerUtil::registerAllFactories() { + registerGlutenDisks(); + registerReadBufferBuilders(); registerWriteBufferBuilders(); diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index f6030485bb12..308e22422cdb 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -124,6 +124,7 @@ class QueryPipelineUtil }; void registerAllFunctions(); +void registerGlutenDisks(); class BackendFinalizerUtil; class JNIUtils; diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp b/cpp-ch/local-engine/Common/MergeTreeTool.cpp index 2f6b4602df09..c5122905e01c 100644 --- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp +++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include using namespace DB; @@ -40,7 +42,10 @@ std::shared_ptr buildMetaData(const DB::NamesAndTyp metadata->sorting_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context); if (table.primary_key.empty()) { - metadata->primary_key.expression = std::make_shared(std::make_shared()); + if (table.order_by_key != MergeTreeTable::TUPLE) + metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context); + else + metadata->primary_key.expression = std::make_shared(std::make_shared()); } else { @@ -49,13 +54,12 @@ std::shared_ptr buildMetaData(const DB::NamesAndTyp return metadata; } -std::unique_ptr buildMergeTreeSettings() +std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config) { auto settings = std::make_unique(); -// settings->set("min_bytes_for_wide_part", Field(0)); -// settings->set("min_rows_for_wide_part", Field(0)); settings->set("allow_nullable_key", Field(1)); - // settings->set("storage_policy", Field("s3_main")); + if (!config.storage_policy.empty()) + settings->set("storage_policy", Field(config.storage_policy)); return settings; } @@ -70,6 +74,15 @@ std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_ty } +void parseTableConfig(MergeTreeTableSettings & settings, String config_json) +{ + rapidjson::Document doc; + doc.Parse(config_json.c_str()); + if (doc.HasMember("storage_policy")) + settings.storage_policy = doc["storage_policy"].GetString(); + +} + MergeTreeTable parseMergeTreeTableString(const std::string & info) { @@ -97,7 +110,9 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info) assertChar('\n', in); readString(table.absolute_path, in); assertChar('\n', in); - readString(table.table_configs_json, in); + String json; + readString(json, in); + parseTableConfig(table.table_configs, json); assertChar('\n', in); while (!in.eof()) { diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h b/cpp-ch/local-engine/Common/MergeTreeTool.h index a6af7ebcad8a..bde632f0d7ad 100644 --- a/cpp-ch/local-engine/Common/MergeTreeTool.h +++ b/cpp-ch/local-engine/Common/MergeTreeTool.h @@ -43,6 +43,11 @@ struct MergeTreePart size_t end; }; +struct MergeTreeTableSettings +{ + String storage_policy = ""; +}; + struct MergeTreeTable { inline static const String TUPLE = "tuple()"; @@ -54,7 +59,7 @@ struct MergeTreeTable std::string primary_key = ""; std::string relative_path; std::string absolute_path; - std::string table_configs_json; + MergeTreeTableSettings table_configs; std::vector parts; std::unordered_set getPartNames() const; RangesInDataParts extractRange(DataPartsVector parts_vector) const; @@ -62,7 +67,7 @@ struct MergeTreeTable std::shared_ptr buildMetaData(const DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &); -std::unique_ptr buildMergeTreeSettings(); +std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config); std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_types_list); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp new file mode 100644 index 000000000000..bff4108f28a1 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenDiskHDFS.h" +#include +#include +#if USE_HDFS + +namespace local_engine +{ +using namespace DB; + +void GlutenDiskHDFS::createDirectory(const String & path) +{ + DiskObjectStorage::createDirectory(path); + hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str()); +} + +String GlutenDiskHDFS::path2AbsPath(const String & path) +{ + return getObjectStorage()->generateObjectKeyForPath(path).serialize(); +} + +void GlutenDiskHDFS::createDirectories(const String & path) +{ + DiskObjectStorage::createDirectories(path); + auto* hdfs = hdfs_object_storage->getHDFSFS(); + fs::path p = path; + std::vector paths_created; + while (hdfsExists(hdfs, p.c_str()) < 0) + { + paths_created.push_back(p); + if (!p.has_parent_path()) + break; + p = p.parent_path(); + } + for (const auto & path_to_create : paths_created | std::views::reverse) + hdfsCreateDirectory(hdfs, path_to_create.c_str()); +} + +void GlutenDiskHDFS::removeDirectory(const String & path) +{ + DiskObjectStorage::removeDirectory(path); + hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1); +} + +DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage() +{ + const auto config_prefix = "storage_configuration.disks." + name; + return std::make_shared( + getName(), + object_key_prefix, + getMetadataStorage(), + getObjectStorage(), + SerializedPlanParser::global_context->getConfigRef(), + config_prefix); +} + + +} +#endif \ No newline at end of file diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h new file mode 100644 index 000000000000..9caedaae8785 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#if USE_HDFS +#include +#endif + +namespace local_engine +{ +#if USE_HDFS +class GlutenDiskHDFS : public DB::DiskObjectStorage +{ +public: + GlutenDiskHDFS( + const String & name_, + const String & object_key_prefix_, + DB::MetadataStoragePtr metadata_storage_, + DB::ObjectStoragePtr object_storage_, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix) + : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix) + { + chassert(dynamic_cast(object_storage_.get()) != nullptr); + object_key_prefix = object_key_prefix_; + hdfs_object_storage = dynamic_cast(object_storage_.get()); + hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/"); + } + + void createDirectory(const String & path) override; + + void createDirectories(const String & path) override; + + void removeDirectory(const String & path) override; + + DB::DiskObjectStoragePtr createDiskObjectStorage() override; +private: + String path2AbsPath(const String & path); + + GlutenHDFSObjectStorage * hdfs_object_storage; + String object_key_prefix; +}; +#endif +} + diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp new file mode 100644 index 000000000000..3a844a91f804 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GlutenHDFSObjectStorage.h" +#if USE_HDFS +#include +using namespace DB; +namespace local_engine +{ +std::unique_ptr GlutenHDFSObjectStorage::readObject( /// NOLINT + const StoredObject & object, + const ReadSettings & read_settings, + std::optional, + std::optional) const +{ + size_t begin_of_path = object.remote_path.find('/', object.remote_path.find("//") + 2); + auto hdfs_path = object.remote_path.substr(begin_of_path); + auto hdfs_uri = object.remote_path.substr(0, begin_of_path); + return std::make_unique(hdfs_uri, hdfs_path, config, HDFSObjectStorage::patchSettings(read_settings)); +} + +DB::ObjectStorageKey local_engine::GlutenHDFSObjectStorage::generateObjectKeyForPath(const std::string & path) const +{ + return DB::ObjectStorageKey::createAsAbsolute(hdfs_root_path + path); +} +} +#endif + diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h new file mode 100644 index 000000000000..1efa441c2142 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "config.h" + +#if USE_HDFS +#include +#endif + +namespace local_engine +{ + +#if USE_HDFS +class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage +{ +public: + GlutenHDFSObjectStorage( + const String & hdfs_root_path_, + SettingsPtr settings_, + const Poco::Util::AbstractConfiguration & config_) + : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_), config(config_) + { + } + std::unique_ptr readObject( /// NOLINT + const DB::StoredObject & object, + const DB::ReadSettings & read_settings = DB::ReadSettings{}, + std::optional read_hint = {}, + std::optional file_size = {}) const override; + DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override; + hdfsFS getHDFSFS() const { return hdfs_fs.get(); } +private: + const Poco::Util::AbstractConfiguration & config; +}; +#endif + +} + + diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp new file mode 100644 index 000000000000..8f20080297c6 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include +#if USE_AWS_S3 +#include +#include +#include +#endif + +#if USE_HDFS +#include +#endif + +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +using namespace DB; + +#if USE_AWS_S3 +static S3::URI getS3URI( + const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + const ContextPtr & context) +{ + String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); + S3::URI uri(endpoint); + + /// An empty key remains empty. + if (!uri.key.empty() && !uri.key.ends_with('/')) + uri.key.push_back('/'); + + return uri; +} + +void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory) +{ + static constexpr auto disk_type = "s3_gluten"; + + factory.registerObjectStorageType( + disk_type, + []( + const std::string & name, + const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + const ContextPtr & context, + bool /*skip_access_check*/) -> ObjectStoragePtr + { + auto uri = getS3URI(config, config_prefix, context); + auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix); + auto settings = getSettings(config, config_prefix, context); + auto client = getClient(config, config_prefix, context, *settings); + auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key); + + auto object_storage = std::make_shared( + std::move(client), + std::move(settings), + uri, + s3_capabilities, + key_generator, + name); + return object_storage; + }); +} + +#endif + +#if USE_HDFS +void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory) +{ + factory.registerObjectStorageType( + "hdfs_gluten", + []( + const std::string & /* name */, + const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + const ContextPtr & context, + bool /* skip_access_check */) -> ObjectStoragePtr + { + auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); + checkHDFSURL(uri); + if (uri.back() != '/') + throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri); + + std::unique_ptr settings = std::make_unique( + config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024), + config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000), + context->getSettingsRef().hdfs_replication + ); + return std::make_unique(uri, std::move(settings), config); + }); +} +#endif +} diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp new file mode 100644 index 000000000000..c7e9c5fd32ba --- /dev/null +++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include +#include +#include +#include +#include + +#if USE_HDFS +#include +#endif + +#include "registerGlutenDisks.h" + +namespace local_engine +{ +#if USE_AWS_S3 +void registerGlutenS3ObjectStorage(DB::ObjectStorageFactory & factory); +#endif + +#if USE_HDFS +void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory); +#endif + +void registerGlutenDisks(bool global_skip_access_check) +{ + auto & factory = DB::DiskFactory::instance(); + auto creator = [global_skip_access_check]( + const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DB::ContextPtr context, + const DB::DisksMap & /* map */, + bool, + bool) -> DB::DiskPtr + { + bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false); + auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check); + auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local"); + + DB::DiskObjectStoragePtr disk = std::make_shared( + name, + object_storage->getCommonKeyPrefix(), + std::move(metadata_storage), + std::move(object_storage), + config, + config_prefix); + + disk->startup(context, skip_access_check); + return disk; + }; + + auto & object_factory = DB::ObjectStorageFactory::instance(); +#if USE_AWS_S3 + registerGlutenS3ObjectStorage(object_factory); + factory.registerDiskType("s3_gluten", creator); /// For compatibility +#endif + +#if USE_HDFS + auto hdfs_creator = [global_skip_access_check]( + const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + DB::ContextPtr context, + const DB::DisksMap & /* map */, + bool, + bool) -> DB::DiskPtr + { + bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false); + auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check); + auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local"); + + DB::DiskObjectStoragePtr disk = std::make_shared( + name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix); + + disk->startup(context, skip_access_check); + return disk; + }; + + registerGlutenHDFSObjectStorage(object_factory); + factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For compatibility +#endif +} +} diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.h b/cpp-ch/local-engine/Disks/registerGlutenDisks.h new file mode 100644 index 000000000000..a0c5d96d2133 --- /dev/null +++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.h @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +namespace local_engine +{ + +/// @param global_skip_access_check - skip access check regardless regardless +/// .skip_access_check config directive (used +/// for clickhouse-disks) +void registerGlutenDisks(bool global_skip_access_check); + +} diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 34746217b2da..82a64b99990b 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "MergeTreeRelParser.h" @@ -61,17 +62,14 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name } CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( - const substrait::Rel & rel_, const substrait::ReadRel::ExtensionTable & extension_table, ContextMutablePtr context) { - const auto & rel = rel_.read(); google::protobuf::StringValue table; table.ParseFromString(extension_table.detail().value()); auto merge_tree_table = local_engine::parseMergeTreeTableString(table.value()); DB::Block header; - chassert(rel.has_base_schema()); - header = TypeParser::buildBlockFromNamedStruct(rel.base_schema(), merge_tree_table.low_card_key); + header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key); auto names_and_types_list = header.getNamesAndTypesList(); auto storage_factory = StorageMergeTreeFactory::instance(); auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table); @@ -89,8 +87,7 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( context, "", MergeTreeData::MergingParams(), - buildMergeTreeSettings()); - custom_storage_merge_tree->loadDataParts(false, std::nullopt); + buildMergeTreeSettings(merge_tree_table.table_configs)); return custom_storage_merge_tree; }); return storage; @@ -137,13 +134,13 @@ MergeTreeRelParser::parseReadRel( global_context, "", MergeTreeData::MergingParams(), - buildMergeTreeSettings()); + buildMergeTreeSettings(merge_tree_table.table_configs)); return custom_storage_merge_tree; }); + restoreMetaData(storage, merge_tree_table, context); for (const auto & [name, sizes] : storage->getColumnSizes()) column_sizes[name] = sizes.data_compressed; - query_context.storage_snapshot = std::make_shared(*storage, metadata); query_context.custom_storage_merge_tree = storage; auto names_and_types_list = input.getNamesAndTypesList(); diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h index 5f86a0cb45f6..921f3ac008d5 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h @@ -38,7 +38,6 @@ class MergeTreeRelParser : public RelParser { public: static std::shared_ptr parseStorage( - const substrait::Rel & rel_, const substrait::ReadRel::ExtensionTable & extension_table, ContextMutablePtr context); diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index 9d7a35cb22f9..780d19cc89a8 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -90,7 +90,8 @@ CustomStorageMergeTree::CustomStorageMergeTree( , writer(*this) , reader(*this) { - initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name); + relative_data_path = relative_data_path_; + format_version = 1; } std::atomic CustomStorageMergeTree::part_num; diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp new file mode 100644 index 000000000000..a7d167385337 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "MetaDataHelper.h" +#include + +#include + +using namespace DB; + +namespace local_engine +{ + +std::unordered_map extractPartMetaData(ReadBuffer & in) +{ + std::unordered_map result; + while (!in.eof()) + { + String name; + readString(name, in); + assertChar('\t', in); + UInt64 size; + readIntText(size, in); + assertChar('\n', in); + String data; + data.resize(size); + in.read(data.data(), size); + result.emplace(name, data); + } + return result; +} + +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context) +{ + auto data_disk = storage->getStoragePolicy()->getAnyDisk(); + if (!data_disk->isRemote()) + return; + + std::unordered_set not_exists_part; + DB::MetadataStorageFromDisk * metadata_storage = static_cast(data_disk->getMetadataStorage().get()); + auto metadata_disk = metadata_storage->getDisk(); + auto table_path = std::filesystem::path(mergeTreeTable.relative_path); + for (const auto & part : mergeTreeTable.getPartNames()) + { + auto part_path = table_path / part; + if (!metadata_disk->exists(part_path)) + not_exists_part.emplace(part); + } + + if (not_exists_part.empty()) + return; + + if (auto lock = storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout)) + { + auto s3 = data_disk->getObjectStorage(); + + if (!metadata_disk->exists(table_path)) + metadata_disk->createDirectories(table_path.generic_string()); + + for (const auto & part : not_exists_part) + { + auto part_path = table_path / part; + auto metadata_file_path = part_path / "metadata.gluten"; + + if (metadata_disk->exists(part_path)) + continue; + else + metadata_disk->createDirectories(part_path); + auto key = s3->generateObjectKeyForPath(metadata_file_path.generic_string()); + StoredObject metadata_object(key.serialize()); + auto part_metadata = extractPartMetaData(*s3->readObject(metadata_object)); + for (const auto & item : part_metadata) + { + auto item_path = part_path / item.first; + auto out = metadata_disk->writeFile(item_path); + out->write(item.second.data(), item.second.size()); + } + } + } +} + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h new file mode 100644 index 000000000000..47c5d615d757 --- /dev/null +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace local_engine +{ + +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context); + +} + diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index f5c9a1338f14..8df171f999dd 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -15,8 +15,11 @@ * limitations under the License. */ #include "SparkMergeTreeWriter.h" + #include +#include #include +#include #include using namespace DB; @@ -49,29 +52,59 @@ void SparkMergeTreeWriter::write(DB::Block & block) } auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 10, metadata_snapshot, context); - for (auto & item : blocks_with_partition) - { - auto temp_part = writeTempPart(item, metadata_snapshot, context); - temp_part.finalize(); - new_parts.emplace_back(temp_part.part); + for (auto & item : blocks_with_partition) + { + new_parts.emplace_back(writeTempPartAndFinalize(item, metadata_snapshot).part); part_num++; } } void SparkMergeTreeWriter::finalize() { + auto block = squashing_transform->add({}); + if (block.rows()) + { + auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, metadata_snapshot, context); + for (auto & item : blocks_with_partition) + new_parts.emplace_back(writeTempPartAndFinalize(item, metadata_snapshot).part); + } +} + +DB::MergeTreeDataWriter::TemporaryPart +SparkMergeTreeWriter::writeTempPartAndFinalize( + DB::BlockWithPartition & block_with_partition, + const DB::StorageMetadataPtr & metadata_snapshot) +{ + auto temp_part = writeTempPart(block_with_partition, metadata_snapshot); + temp_part.finalize(); + saveFileStatus(temp_part); + return temp_part; +} - auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add({}), 10, metadata_snapshot, context); - for (auto & item : blocks_with_partition) +void SparkMergeTreeWriter::saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const +{ + auto & data_part_storage = temp_part.part->getDataPartStorage(); + + const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); + if (!disk->isRemote()) return; + if (auto *const disk_metadata = dynamic_cast(disk->getMetadataStorage().get())) { - auto temp_part = writeTempPart(item, metadata_snapshot, context); - temp_part.finalize(); - new_parts.emplace_back(temp_part.part); + const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); + for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) + { + auto content = disk_metadata->readFileToString(it->path()); + writeString(it->name(), *out); + writeChar('\t', *out); + writeIntText(content.length(), *out); + writeChar('\n', *out); + writeString(content, *out); + } + out->finalize(); } } MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( - BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) + BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot) { MergeTreeDataWriter::TemporaryPart temp_part; Block & block = block_with_partition.block; diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h index 5c63d1fef347..d316f208ebf9 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h @@ -82,7 +82,11 @@ class SparkMergeTreeWriter private: DB::MergeTreeDataWriter::TemporaryPart - writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot, DB::ContextPtr context); + writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); + DB::MergeTreeDataWriter::TemporaryPart + writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); + void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const; + String uuid; String partition_dir; String bucket_dir; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index d1a675f4916e..7008c66f78bd 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1020,7 +1020,7 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW local_engine::SerializedPlanParser::parseExtensionTable(split_info_str); auto storage = local_engine::MergeTreeRelParser::parseStorage( - plan_ptr->relations()[0].root().input(), extension_table, local_engine::SerializedPlanParser::global_context); + extension_table, local_engine::SerializedPlanParser::global_context); auto uuid = uuid_str + "_" + task_id; auto * writer = new local_engine::SparkMergeTreeWriter( *storage, storage->getInMemoryMetadataPtr(), local_engine::SerializedPlanParser::global_context, uuid, partition_dir, bucket_dir); diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java index bf942ef267c7..f07e6fccb002 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java @@ -22,6 +22,7 @@ import com.google.protobuf.StringValue; import io.substrait.proto.ReadRel; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,10 +72,11 @@ public class ExtensionTableNode implements SplitInfo { this.maxPartsNum = maxPartsNum; this.database = database; this.tableName = tableName; - if (relativePath.contains(":/")) { // file:/tmp/xxx => tmp/xxx - this.relativePath = relativePath.substring(relativePath.indexOf(":/") + 2); + URI table_uri = URI.create(relativePath); + if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx + this.relativePath = table_uri.getPath().substring(1); } else { - this.relativePath = relativePath; + this.relativePath = table_uri.getPath(); } this.absolutePath = absolutePath; this.tableSchemaJson = tableSchemaJson;