Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish the collect mode of different engines #6201

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.engine.jdbc.dialect.{JdbcDialect, JdbcDialects}
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
Expand All @@ -41,9 +41,9 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
runAsync: Boolean,
queryTimeout: Long): Operation = {
val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
val incrementalCollect = normalizedConf.get(ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT.key).map(
_.toBoolean).getOrElse(
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
session.sessionManager.getConf.get(ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT))
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
val executeStatement =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
def this() = this(classOf[SparkSQLOperationManager].getSimpleName)

private lazy val planOnlyModeDefault = getConf.get(OPERATION_PLAN_ONLY_MODE)
private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
private lazy val operationIncrementalCollectDefault =
getConf.get(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT)
private lazy val operationLanguageDefault = getConf.get(OPERATION_LANGUAGE)
private lazy val operationConvertCatalogDatabaseDefault =
getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)
Expand Down Expand Up @@ -83,8 +84,10 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
spark.conf.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
mode match {
case NoneMode =>
val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
val incrementalCollect =
spark.conf.getOption(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key)
.orElse(spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key))
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
// TODO: respect the config of the operation ExecuteStatement, if it was set.
val resultFormat = spark.conf.get(OPERATION_RESULT_FORMAT.key, "thrift")
resultFormat.toLowerCase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
test("test fetch orientation with incremental collect mode") {
val sql = "SELECT id FROM range(2)"

withSessionConf(Map(KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true"))()() {
withSessionConf(Map(KyuubiConf.ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key -> "true"))()() {
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
return catalogDatabaseOperation
}
}
val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
val incrementalCollect = normalizedConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT.key).map(
_.toBoolean).getOrElse(
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
session.sessionManager.getConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT))
val operation =
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
addOperation(operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.kyuubi.engine.trino.operation

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT}

class TrinoOperationIncrementCollectSuite extends TrinoOperationSuite {
override def withKyuubiConf: Map[String, String] = Map(
ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
OPERATION_INCREMENTAL_COLLECT.key -> "true")
ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT.key -> "true")
}
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {

val tFetchResultsReq3 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_PRIOR, 1)
val tFetchResultsResp3 = client.FetchResults(tFetchResultsReq3)
if (kyuubiConf.get(OPERATION_INCREMENTAL_COLLECT)) {
if (kyuubiConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT)) {
assert(tFetchResultsResp3.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
} else {
assert(tFetchResultsResp3.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
Expand All @@ -615,7 +615,7 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {

val tFetchResultsReq4 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_FIRST, 3)
val tFetchResultsResp4 = client.FetchResults(tFetchResultsReq4)
if (kyuubiConf.get(OPERATION_INCREMENTAL_COLLECT)) {
if (kyuubiConf.get(ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT)) {
assert(tFetchResultsResp4.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
} else {
assert(tFetchResultsResp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,8 +1985,7 @@ object KyuubiConf {
buildConf("kyuubi.operation.incremental.collect")
.internal
.doc("When true, the executor side result will be sequentially calculated and returned to" +
s" the Spark driver side. Note that, ${OPERATION_RESULT_MAX_ROWS.key} will be ignored" +
" on incremental collect mode.")
s" the engine side.")
.version("1.4.0")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -2652,6 +2651,14 @@ object KyuubiConf {
.stringConf
.createWithDefault("yyyy-MM-dd HH:mm:ss.SSS")

val ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.spark.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
s" the Spark driver. Note that, ${OPERATION_RESULT_MAX_ROWS.key} will be ignored" +
" on incremental collect mode. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.9.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.9.0 is out, use 1.10.0

.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.session.engine.spark.initialize.sql")
.doc("The initialize sql for Spark session. " +
Expand Down Expand Up @@ -2681,6 +2688,13 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_TRINO_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.trino.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
" the trino. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.9.0")
.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_HIVE_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.hive.memory")
.doc("The heap memory for the Hive query engine")
Expand Down Expand Up @@ -3041,6 +3055,13 @@ object KyuubiConf {
.intConf
.createWithDefault(1000)

val ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.jdbc.operation.incremental.collect")
.doc("When true, the result will be sequentially calculated and returned to" +
" the JDBC engine. It fallback to `kyuubi.operation.incremental.collect`")
.version("1.9.0")
.fallbackConf(OPERATION_INCREMENTAL_COLLECT)

val ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.operation.convert.catalog.database.enabled")
.doc("When set to true, The engine converts the JDBC methods of set/get Catalog " +
Expand Down