Skip to content

Commit

Permalink
[KYUUBI #6201] Distinguish the collect mode of different engines
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

## Describe Your Solution 🔧

Currently, Spark, JDBC, and TRINO engines support incremental collection, but they share the same configuration option `kyuubi.operation.incremental.collect`. Sometimes, it's necessary to enable incremental collection only for specific engines, which requires distinguishing between them.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6201 from lsm1/branch-distinguish-engine-collect-mode.

Closes #6201

3c43d2c [senmiaoliu] fix conf version
aa53231 [senmiaoliu] distinguish kyuubi.operation.incremental.collect for engine type

Authored-by: senmiaoliu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
lsm1 authored and pan3793 committed Mar 20, 2024
1 parent 31469fa commit 67f099a
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 111 deletions.
195 changes: 99 additions & 96 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.engine.jdbc.dialect.{JdbcDialect, JdbcDialects}
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
Expand All @@ -41,9 +41,9 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
runAsync: Boolean,
queryTimeout: Long): Operation = {
val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
val incrementalCollect = normalizedConf.get(ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT.key).map(
_.toBoolean).getOrElse(
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
session.sessionManager.getConf.get(ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT))
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
val executeStatement =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
def this() = this(classOf[SparkSQLOperationManager].getSimpleName)

private lazy val planOnlyModeDefault = getConf.get(OPERATION_PLAN_ONLY_MODE)
private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
private lazy val operationIncrementalCollectDefault =
getConf.get(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT)
private lazy val operationLanguageDefault = getConf.get(OPERATION_LANGUAGE)
private lazy val operationConvertCatalogDatabaseDefault =
getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)
Expand Down Expand Up @@ -83,8 +84,10 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
spark.conf.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
mode match {
case NoneMode =>
val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
val incrementalCollect =
spark.conf.getOption(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key)
.orElse(spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key))
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
// TODO: respect the config of the operation ExecuteStatement, if it was set.
val resultFormat = spark.conf.get(OPERATION_RESULT_FORMAT.key, "thrift")
resultFormat.toLowerCase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
test("test fetch orientation with incremental collect mode") {
val sql = "SELECT id FROM range(2)"

withSessionConf(Map(KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true"))()() {
withSessionConf(Map(KyuubiConf.ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key -> "true"))()() {
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
return catalogDatabaseOperation
}
}
val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
val incrementalCollect = normalizedConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT.key).map(
_.toBoolean).getOrElse(
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
session.sessionManager.getConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT))
val operation =
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
addOperation(operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.kyuubi.engine.trino.operation

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT}

class TrinoOperationIncrementCollectSuite extends TrinoOperationSuite {
override def withKyuubiConf: Map[String, String] = Map(
ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
OPERATION_INCREMENTAL_COLLECT.key -> "true")
ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT.key -> "true")
}
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {

val tFetchResultsReq3 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_PRIOR, 1)
val tFetchResultsResp3 = client.FetchResults(tFetchResultsReq3)
if (kyuubiConf.get(OPERATION_INCREMENTAL_COLLECT)) {
if (kyuubiConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT)) {
assert(tFetchResultsResp3.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
} else {
assert(tFetchResultsResp3.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
Expand All @@ -615,7 +615,7 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {

val tFetchResultsReq4 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_FIRST, 3)
val tFetchResultsResp4 = client.FetchResults(tFetchResultsReq4)
if (kyuubiConf.get(OPERATION_INCREMENTAL_COLLECT)) {
if (kyuubiConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT)) {
assert(tFetchResultsResp4.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
} else {
assert(tFetchResultsResp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2033,8 +2033,7 @@ object KyuubiConf {
buildConf("kyuubi.operation.incremental.collect")
.internal
.doc("When true, the executor side result will be sequentially calculated and returned to" +
s" the Spark driver side. Note that, ${OPERATION_RESULT_MAX_ROWS.key} will be ignored" +
" on incremental collect mode.")
s" the engine side.")
.version("1.4.0")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -2700,6 +2699,14 @@ object KyuubiConf {
.stringConf
.createWithDefault("yyyy-MM-dd HH:mm:ss.SSS")

val ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.spark.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
s" the Spark driver. Note that, ${OPERATION_RESULT_MAX_ROWS.key} will be ignored" +
" on incremental collect mode. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.10.0")
.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.session.engine.spark.initialize.sql")
.doc("The initialize sql for Spark session. " +
Expand Down Expand Up @@ -2729,6 +2736,13 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.trino.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
" the trino. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.10.0")
.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_HIVE_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.hive.memory")
.doc("The heap memory for the Hive query engine")
Expand Down Expand Up @@ -3089,6 +3103,13 @@ object KyuubiConf {
.intConf
.createWithDefault(1000)

val ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.jdbc.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
" the JDBC engine. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.10.0")
.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.operation.convert.catalog.database.enabled")
.doc("When set to true, The engine converts the JDBC methods of set/get Catalog " +
Expand Down

0 comments on commit 67f099a

Please sign in to comment.