Skip to content

Commit

Permalink
[CORE] Basic runnable version of ACBO (Advanced CBO) (#5058)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Mar 27, 2024
1 parent 001a5e1 commit c1e1cca
Show file tree
Hide file tree
Showing 88 changed files with 10,658 additions and 574 deletions.
44 changes: 30 additions & 14 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ jobs:
strategy:
fail-fast: false
matrix:
os: ["ubuntu:20.04", "ubuntu:22.04"]
spark: ["spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5"]
os: [ "ubuntu:20.04", "ubuntu:22.04" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5" ]
java: [ "java-8", "java-17" ]
# Spark supports JDK17 since 3.3 and later, see https://issues.apache.org/jira/browse/SPARK-33772
exclude:
Expand Down Expand Up @@ -119,9 +119,9 @@ jobs:
strategy:
fail-fast: false
matrix:
os: ["centos:7", "centos:8"]
spark: ["spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5"]
java: ["java-8", "java-17"]
os: [ "centos:7", "centos:8" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5" ]
java: [ "java-8", "java-17" ]
# Spark supports JDK17 since 3.3 and later, see https://issues.apache.org/jira/browse/SPARK-33772
exclude:
- spark: spark-3.2
Expand Down Expand Up @@ -156,24 +156,40 @@ jobs:
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
- name: Build and run TPCH/DS
- name: Set environment variables
run: |
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
echo "MAVEN_HOME=/usr/lib/maven" >> $GITHUB_ENV
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
if [ "${{ matrix.java }}" = "java-17" ]; then
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
echo "JAVA_HOME=/usr/lib/jvm/java-17-openjdk" >> $GITHUB_ENV
else
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
echo "JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk" >> $GITHUB_ENV
fi
- name: Build gluten-it
run: |
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/
mvn clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests
cd $GITHUB_WORKSPACE/tools/gluten-it
mvn clean install -P${{ matrix.spark }} -P${{ matrix.java }} \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
cd $GITHUB_WORKSPACE/tools/gluten-it
mvn clean install -P${{ matrix.spark }} -P${{ matrix.java }}
- name: Run TPC-H / TPC-DS
run: |
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/tools/gluten-it
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
- name: Run TPC-H / TPC-DS with ACBO
run: |
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/tools/gluten-it
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
--extra-conf=spark.gluten.sql.advanced.cbo.enabled=true \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
--extra-conf=spark.gluten.sql.advanced.cbo.enabled=true
# run-tpc-test-centos8-oom-randomkill:
# needs: build-native-lib
Expand Down
7 changes: 7 additions & 0 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.glutenproject</groupId>
<artifactId>gluten-cbo-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.planner

import io.glutenproject.cbo.{Cbo, CboSuiteBase}
import io.glutenproject.cbo.path.CboPath
import io.glutenproject.cbo.property.PropertySet
import io.glutenproject.cbo.rule.{CboRule, Shape, Shapes}
import io.glutenproject.planner.property.GlutenProperties.{Conventions, Schemas}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution._
import org.apache.spark.sql.test.SharedSparkSession

class VeloxCboSuite extends SharedSparkSession {
import VeloxCboSuite._

test("C2R, R2C - basic") {
val in = RowUnary(RowLeaf())
val planner = newCbo().newPlanner(in)
val out = planner.plan()
assert(out == RowUnary(RowLeaf()))
}

test("C2R, R2C - explicitly requires any properties") {
val in = RowUnary(RowLeaf())
val planner =
newCbo().newPlanner(in, PropertySet(List(Conventions.ANY, Schemas.ANY)))
val out = planner.plan()
assert(out == RowUnary(RowLeaf()))
}

test("C2R, R2C - requires columnar output") {
val in = RowUnary(RowLeaf())
val planner =
newCbo().newPlanner(in, PropertySet(List(Conventions.VANILLA_COLUMNAR, Schemas.ANY)))
val out = planner.plan()
assert(out == RowToColumnarExec(RowUnary(RowLeaf())))
}

test("C2R, R2C - insert c2rs / r2cs") {
val in =
ColumnarUnary(RowUnary(RowUnary(ColumnarUnary(RowUnary(RowUnary(ColumnarUnary(RowLeaf())))))))
val planner =
newCbo().newPlanner(in, PropertySet(List(Conventions.ROW_BASED, Schemas.ANY)))
val out = planner.plan()
assert(out == ColumnarToRowExec(ColumnarUnary(
RowToColumnarExec(RowUnary(RowUnary(ColumnarToRowExec(ColumnarUnary(RowToColumnarExec(
RowUnary(RowUnary(ColumnarToRowExec(ColumnarUnary(RowToColumnarExec(RowLeaf()))))))))))))))
val paths = planner.newState().memoState().collectAllPaths(CboPath.INF_DEPTH).toList
val pathCount = paths.size
assert(pathCount == 165)
}

test("C2R, R2C - Row unary convertible to Columnar") {
object ConvertRowUnaryToColumnar extends CboRule[SparkPlan] {
override def shift(node: SparkPlan): Iterable[SparkPlan] = node match {
case RowUnary(child) => List(ColumnarUnary(child))
case other => List.empty
}

override def shape(): Shape[SparkPlan] = Shapes.fixedHeight(1)
}

val in =
ColumnarUnary(RowUnary(RowUnary(ColumnarUnary(RowUnary(RowUnary(ColumnarUnary(RowLeaf())))))))
val planner =
newCbo(List(ConvertRowUnaryToColumnar))
.newPlanner(in, PropertySet(List(Conventions.ROW_BASED, Schemas.ANY)))
val out = planner.plan()
assert(out == ColumnarToRowExec(ColumnarUnary(ColumnarUnary(ColumnarUnary(
ColumnarUnary(ColumnarUnary(ColumnarUnary(ColumnarUnary(RowToColumnarExec(RowLeaf()))))))))))
val paths = planner.newState().memoState().collectAllPaths(CboPath.INF_DEPTH).toList
val pathCount = paths.size
assert(pathCount == 1094)
}
}

object VeloxCboSuite extends CboSuiteBase {
def newCbo(): Cbo[SparkPlan] = {
GlutenOptimization().asInstanceOf[Cbo[SparkPlan]]
}

def newCbo(cboRules: Seq[CboRule[SparkPlan]]): Cbo[SparkPlan] = {
GlutenOptimization(cboRules).asInstanceOf[Cbo[SparkPlan]]
}

case class RowLeaf() extends LeafExecNode {
override def supportsColumnar: Boolean = false
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}

case class RowUnary(child: SparkPlan) extends UnaryExecNode {
override def supportsColumnar: Boolean = false
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): RowUnary =
copy(child = newChild)
}

case class ColumnarUnary(child: SparkPlan) extends UnaryExecNode {
override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): ColumnarUnary =
copy(child = newChild)
}
}
1 change: 1 addition & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ You can add these configurations into spark-defaults.conf to enable or disable t
| spark.plugins | To load Gluten's components by Spark's plug-in loader | io.glutenproject.GlutenPlugin |
| spark.shuffle.manager | To turn on Gluten Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager |
| spark.gluten.enabled | Enable Gluten, default is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for `spark.plugins`. | true |
| spark.gluten.sql.advanced.cbo.enabled | Experimental: Enables Gluten's advanced CBO features during physical planning. E.g, More efficient fallback strategy, etc. The option can be turned on and off individually despite vanilla Spark's CBO settings. Note, Gluten's query optimizer may still adopt a subset of its advanced CBO capabilities even this option is off. Enabling it would cause Gluten consider using CBO for optimization more aggressively. Note, this feature is still in development and may not bring performance profits. | false |
| spark.gluten.sql.columnar.maxBatchSize | Number of rows to be processed in each batch. Default value is 4096. | 4096 |
| spark.gluten.memory.isolation | (Experimental) Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. Note when true, setting spark.memory.storageFraction to a lower value is suggested since storage memory is considered non-usable by Gluten. | false |
| spark.gluten.sql.columnar.scanOnly | When enabled, this config will overwrite all other operators' enabling, and only Scan and Filter pushdown will be offloaded to native. | false |
Expand Down
12 changes: 12 additions & 0 deletions gluten-cbo/common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.glutenproject</groupId>
<artifactId>gluten-cbo</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>gluten-cbo-common</artifactId>
<name>Gluten Cbo Common</name>
</project>
Loading

0 comments on commit c1e1cca

Please sign in to comment.