diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index a6efe608574..a7e6520577e 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -120,101 +120,102 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Engine -| Key | Default | Meaning | Type | Since | -|----------------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.engine.chat.ernie.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the ernie bot server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 | -| kyuubi.engine.chat.ernie.http.proxy | <undefined> | HTTP proxy url for API calling in ernie bot engine. e.g. http://127.0.0.1:1088 | string | 1.9.0 | -| kyuubi.engine.chat.ernie.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after ernie bot server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 | -| kyuubi.engine.chat.ernie.model | completions | ID of the model used in ernie bot. Available models are completions_pro, ernie_bot_8k, completions and eb-instant[Model overview](https://cloud.baidu.com/doc/WENXINWORKSHOP/s/6lp69is2a). | string | 1.9.0 | -| kyuubi.engine.chat.ernie.token | <undefined> | The token to access ernie bot open API, which could be got at https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5 | string | 1.9.0 | -| kyuubi.engine.chat.extra.classpath | <undefined> | The extra classpath for the Chat engine, for configuring the location of the SDK and etc. | string | 1.8.0 | -| kyuubi.engine.chat.gpt.apiKey | <undefined> | The key to access OpenAI open API, which could be got at https://platform.openai.com/account/api-keys | string | 1.8.0 | -| kyuubi.engine.chat.gpt.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the Chat GPT server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 | -| kyuubi.engine.chat.gpt.http.proxy | <undefined> | HTTP proxy url for API calling in Chat GPT engine. e.g. http://127.0.0.1:1087 | string | 1.8.0 | -| kyuubi.engine.chat.gpt.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after Chat GPT server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 | -| kyuubi.engine.chat.gpt.model | gpt-3.5-turbo | ID of the model used in ChatGPT. Available models refer to OpenAI's [Model overview](https://platform.openai.com/docs/models/overview). | string | 1.8.0 | -| kyuubi.engine.chat.java.options | <undefined> | The extra Java options for the Chat engine | string | 1.8.0 | -| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 | -| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates: | string | 1.8.0 | -| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 | -| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | set | 1.2.0 | -| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | set | 1.2.0 | -| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 | -| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 | -| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger. | string | 1.3.0 | -| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go. Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 | -| kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 | -| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 | -| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | -| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | -| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | -| kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 | -| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | -| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 | -| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 | -| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 | -| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 | -| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.propagateCredential | false | Whether to use the session's user and password to connect to database | boolean | 1.8.0 | -| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 | -| kyuubi.engine.jdbc.connection.provider | <undefined> | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations:
  • doris: For establishing Doris connections.
  • mysql: For establishing MySQL connections.
  • phoenix: For establishing Phoenix connections.
  • postgresql: For establishing PostgreSQL connections.
  • starrocks: For establishing StarRocks connections.
  • | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 | -| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 | -| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 | -| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 | -| kyuubi.engine.jdbc.initialize.sql | SELECT 1 | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SELECT 1` to eagerly active JDBCClient. | seq | 1.8.0 | -| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 | -| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 | -| kyuubi.engine.jdbc.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. | seq | 1.8.0 | -| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 | -| kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 | -| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | -| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | -| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 | -| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. | string | 1.7.0 | -| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | -| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | -| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | -| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are: | string | 1.2.0 | -| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | -| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 | -| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | -| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go. | seq | 1.7.0 | -| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | -| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine: | string | 1.9.0 | -| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | -| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | -| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | -| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 | -| kyuubi.engine.trino.connection.keystore.password | <undefined> | The keystore password used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.keystore.path | <undefined> | The keystore path used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.keystore.type | <undefined> | The keystore type used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.password | <undefined> | The password used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.truststore.password | <undefined> | The truststore password used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.truststore.path | <undefined> | The truststore path used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.truststore.type | <undefined> | The truststore type used for connecting to trino cluster | string | 1.8.0 | -| kyuubi.engine.trino.connection.user | <undefined> | The user used for connecting to trino cluster | string | 1.9.0 | -| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | -| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | -| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | -| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 | -| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are: | string | 1.4.0 | -| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | -| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | -| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 | -| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 | -| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 | -| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 | -| kyuubi.engine.yarn.app.name | <undefined> | The YARN app name when the engine deploy mode is YARN. | string | 1.9.0 | -| kyuubi.engine.yarn.cores | 1 | kyuubi engine container core number when the engine deploy mode is YARN. | int | 1.9.0 | -| kyuubi.engine.yarn.java.options | <undefined> | The extra Java options for the AM when the engine deploy mode is YARN. | string | 1.9.0 | -| kyuubi.engine.yarn.memory | 1024 | kyuubi engine container memory in mb when the engine deploy mode is YARN. | int | 1.9.0 | -| kyuubi.engine.yarn.priority | <undefined> | kyuubi engine yarn priority when the engine deploy mode is YARN. | int | 1.9.0 | -| kyuubi.engine.yarn.queue | default | kyuubi engine yarn queue when the engine deploy mode is YARN. | string | 1.9.0 | -| kyuubi.engine.yarn.stagingDir | <undefined> | Staging directory used while submitting kyuubi engine to YARN, It should be a absolute path in HDFS. | string | 1.9.0 | -| kyuubi.engine.yarn.submit.timeout | PT30S | The engine submit timeout for YARN application. | duration | 1.7.2 | -| kyuubi.engine.yarn.tags | <undefined> | kyuubi engine yarn tags when the engine deploy mode is YARN. | seq | 1.9.0 | +| Key | Default | Meaning | Type | Since | +|----------------------------------------------------------|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.engine.chat.ernie.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the ernie bot server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 | +| kyuubi.engine.chat.ernie.http.proxy | <undefined> | HTTP proxy url for API calling in ernie bot engine. e.g. http://127.0.0.1:1088 | string | 1.9.0 | +| kyuubi.engine.chat.ernie.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after ernie bot server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 | +| kyuubi.engine.chat.ernie.model | completions | ID of the model used in ernie bot. Available models are completions_pro, ernie_bot_8k, completions and eb-instant[Model overview](https://cloud.baidu.com/doc/WENXINWORKSHOP/s/6lp69is2a). | string | 1.9.0 | +| kyuubi.engine.chat.ernie.token | <undefined> | The token to access ernie bot open API, which could be got at https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5 | string | 1.9.0 | +| kyuubi.engine.chat.extra.classpath | <undefined> | The extra classpath for the Chat engine, for configuring the location of the SDK and etc. | string | 1.8.0 | +| kyuubi.engine.chat.gpt.apiKey | <undefined> | The key to access OpenAI open API, which could be got at https://platform.openai.com/account/api-keys | string | 1.8.0 | +| kyuubi.engine.chat.gpt.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the Chat GPT server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 | +| kyuubi.engine.chat.gpt.http.proxy | <undefined> | HTTP proxy url for API calling in Chat GPT engine. e.g. http://127.0.0.1:1087 | string | 1.8.0 | +| kyuubi.engine.chat.gpt.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after Chat GPT server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 | +| kyuubi.engine.chat.gpt.model | gpt-3.5-turbo | ID of the model used in ChatGPT. Available models refer to OpenAI's [Model overview](https://platform.openai.com/docs/models/overview). | string | 1.8.0 | +| kyuubi.engine.chat.java.options | <undefined> | The extra Java options for the Chat engine | string | 1.8.0 | +| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 | +| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates: | string | 1.8.0 | +| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 | +| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | set | 1.2.0 | +| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | set | 1.2.0 | +| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 | +| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 | +| kyuubi.engine.doAs.enabled | true | Whether to enable user impersonation on launching engine. When enabled, for engines which supports user impersonation, e.g. SPARK, depends on the `kyuubi.engine.share.level`, different users will be used to launch the engine. Otherwise, Kyuubi Server's user will always be used to launch the engine. | boolean | 1.9.0 | +| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger. | string | 1.3.0 | +| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go. Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 | +| kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 | +| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 | +| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | +| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | +| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | +| kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 | +| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | +| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 | +| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 | +| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 | +| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 | +| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.propagateCredential | false | Whether to use the session's user and password to connect to database | boolean | 1.8.0 | +| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 | +| kyuubi.engine.jdbc.connection.provider | <undefined> | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations:
  • doris: For establishing Doris connections.
  • mysql: For establishing MySQL connections.
  • phoenix: For establishing Phoenix connections.
  • postgresql: For establishing PostgreSQL connections.
  • starrocks: For establishing StarRocks connections.
  • | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 | +| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 | +| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 | +| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 | +| kyuubi.engine.jdbc.initialize.sql | SELECT 1 | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SELECT 1` to eagerly active JDBCClient. | seq | 1.8.0 | +| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 | +| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 | +| kyuubi.engine.jdbc.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. | seq | 1.8.0 | +| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 | +| kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 | +| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | +| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | +| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 | +| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. | string | 1.7.0 | +| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | +| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | +| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | +| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are: See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 | +| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | +| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 | +| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | +| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go. | seq | 1.7.0 | +| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | +| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine: | string | 1.9.0 | +| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | +| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | +| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | +| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 | +| kyuubi.engine.trino.connection.keystore.password | <undefined> | The keystore password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.keystore.path | <undefined> | The keystore path used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.keystore.type | <undefined> | The keystore type used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.password | <undefined> | The password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.password | <undefined> | The truststore password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.path | <undefined> | The truststore path used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.type | <undefined> | The truststore type used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.user | <undefined> | The user used for connecting to trino cluster | string | 1.9.0 | +| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | +| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | +| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | +| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 | +| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are: | string | 1.4.0 | +| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | +| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | +| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 | +| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 | +| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 | +| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 | +| kyuubi.engine.yarn.app.name | <undefined> | The YARN app name when the engine deploy mode is YARN. | string | 1.9.0 | +| kyuubi.engine.yarn.cores | 1 | kyuubi engine container core number when the engine deploy mode is YARN. | int | 1.9.0 | +| kyuubi.engine.yarn.java.options | <undefined> | The extra Java options for the AM when the engine deploy mode is YARN. | string | 1.9.0 | +| kyuubi.engine.yarn.memory | 1024 | kyuubi engine container memory in mb when the engine deploy mode is YARN. | int | 1.9.0 | +| kyuubi.engine.yarn.priority | <undefined> | kyuubi engine yarn priority when the engine deploy mode is YARN. | int | 1.9.0 | +| kyuubi.engine.yarn.queue | default | kyuubi engine yarn queue when the engine deploy mode is YARN. | string | 1.9.0 | +| kyuubi.engine.yarn.stagingDir | <undefined> | Staging directory used while submitting kyuubi engine to YARN, It should be a absolute path in HDFS. | string | 1.9.0 | +| kyuubi.engine.yarn.submit.timeout | PT30S | The engine submit timeout for YARN application. | duration | 1.7.2 | +| kyuubi.engine.yarn.tags | <undefined> | kyuubi engine yarn tags when the engine deploy mode is YARN. | seq | 1.9.0 | ### Event diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index ea804575ecb..93153ad3620 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -192,7 +192,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite test("Check if spark.kubernetes.executor.podNamePrefix is invalid") { Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid => conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid) - val builder = new SparkProcessBuilder("test", conf) + val builder = new SparkProcessBuilder("test", true, conf) val e = intercept[KyuubiException](builder.validateConf) assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" + s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index d68bc2646e2..9f09e9bb636 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2087,20 +2087,33 @@ object KyuubiConf { .version("1.5.0") .fallbackConf(ENGINE_CONNECTION_URL_USE_HOSTNAME) + val ENGINE_DO_AS_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.engine.doAs.enabled") + .doc("Whether to enable user impersonation on launching engine. When enabled, " + + "for engines which supports user impersonation, e.g. SPARK, depends on the " + + s"`kyuubi.engine.share.level`, different users will be used to launch the engine. " + + "Otherwise, Kyuubi Server's user will always be used to launch the engine.") + .version("1.9.0") + .booleanConf + .createWithDefault(true) + val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level") .doc("Engines will be shared in different levels, available configs are: ") + "
  • SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched" + + " by Server's user.
  • " + + " " + + s" See also `${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}` and `${ENGINE_DO_AS_ENABLED.key}`.") .version("1.2.0") .fallbackConf(LEGACY_ENGINE_SHARE_LEVEL) @@ -2115,8 +2128,8 @@ object KyuubiConf { " all the capacity of the Trino." + "
  • HIVE_SQL: specify this engine type will launch a Hive engine which can provide" + " all the capacity of the Hive Server2.
  • " + - "
  • JDBC: specify this engine type will launch a JDBC engine which can forward " + - " queries to the database system through the certain JDBC driver, " + + "
  • JDBC: specify this engine type will launch a JDBC engine which can forward" + + " queries to the database system through the certain JDBC driver," + " for now, it supports Doris, MySQL, Phoenix, PostgreSQL and StarRocks.
  • " + "
  • CHAT: specify this engine type will launch a Chat engine.
  • " + "") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index eb9c7ab47c9..0d1b0adc775 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -49,12 +49,13 @@ import org.apache.kyuubi.server.KyuubiServer * The description and functionality of an engine at server side * * @param conf Engine configuration - * @param user Caller of the engine + * @param sessionUser Caller of the engine * @param engineRefId Id of the corresponding session in which the engine is created */ private[kyuubi] class EngineRef( conf: KyuubiConf, - user: String, + sessionUser: String, + doAsEnabled: Boolean, groupProvider: GroupProvider, engineRefId: String, engineManager: KyuubiApplicationManager, @@ -88,15 +89,18 @@ private[kyuubi] class EngineRef( private var builder: ProcBuilder = _ - private[kyuubi] def getEngineRefId(): String = engineRefId + private[kyuubi] def getEngineRefId: String = engineRefId - // Launcher of the engine - private[kyuubi] val appUser: String = shareLevel match { + // user for routing session to the engine + private[kyuubi] val routingUser: String = shareLevel match { case SERVER => Utils.currentUser - case GROUP => groupProvider.primaryGroup(user, conf.getAll.asJava) - case _ => user + case GROUP => groupProvider.primaryGroup(sessionUser, conf.getAll.asJava) + case _ => sessionUser } + // user for launching engine + private[kyuubi] val appUser: String = if (doAsEnabled) routingUser else Utils.currentUser + @VisibleForTesting private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match { case subdomain if clientPoolSize > 0 && (subdomain.isEmpty || enginePoolIgnoreSubdomain) => @@ -110,7 +114,7 @@ private[kyuubi] class EngineRef( val snPath = DiscoveryPaths.makePath( s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum", - appUser, + routingUser, clientPoolName) DiscoveryClientProvider.withDiscoveryClient(conf) { client => client.getAndIncrement(snPath) @@ -128,7 +132,7 @@ private[kyuubi] class EngineRef( */ @VisibleForTesting private[kyuubi] val defaultEngineName: String = { - val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${appUser}" + val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${routingUser}" shareLevel match { case CONNECTION => s"${commonNamePrefix}_$engineRefId" case _ => s"${commonNamePrefix}_${subdomain}_$engineRefId" @@ -151,8 +155,8 @@ private[kyuubi] class EngineRef( private[kyuubi] lazy val engineSpace: String = { val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType" shareLevel match { - case CONNECTION => DiscoveryPaths.makePath(commonParent, appUser, engineRefId) - case _ => DiscoveryPaths.makePath(commonParent, appUser, subdomain) + case CONNECTION => DiscoveryPaths.makePath(commonParent, routingUser, engineRefId) + case _ => DiscoveryPaths.makePath(commonParent, routingUser, subdomain) } } @@ -167,7 +171,7 @@ private[kyuubi] class EngineRef( val lockPath = DiscoveryPaths.makePath( s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock", - appUser, + routingUser, subdomain) discoveryClient.tryWithLock( lockPath, @@ -188,19 +192,25 @@ private[kyuubi] class EngineRef( builder = engineType match { case SPARK_SQL => conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) - new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case FLINK_SQL => conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName) - new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case TRINO => - new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case HIVE_SQL => conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName) - HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog, defaultEngineName) + HiveProcessBuilder( + appUser, + doAsEnabled, + conf, + engineRefId, + extraEngineLog, + defaultEngineName) case JDBC => - new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new JdbcProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case CHAT => - new ChatProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) } MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) @@ -222,7 +232,7 @@ private[kyuubi] class EngineRef( while (engineRef.isEmpty) { if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { exitValue = Some(process.exitValue()) - if (exitValue != Some(0)) { + if (!exitValue.contains(0)) { val error = builder.getError MetricsSystem.tracing { ms => ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser)) @@ -246,7 +256,7 @@ private[kyuubi] class EngineRef( // even the submit process succeeds, the application might meet failure when initializing, // check the engine application state from engine manager and fast fail on engine terminate - if (engineRef.isEmpty && exitValue == Some(0)) { + if (engineRef.isEmpty && exitValue.contains(0)) { Option(engineManager).foreach { engineMgr => if (lastApplicationInfo.isDefined) { TimeUnit.SECONDS.sleep(1) @@ -341,7 +351,7 @@ private[kyuubi] class EngineRef( discoveryClient: DiscoveryClient, hostPort: (String, Int)): Option[ServiceNodeInfo] = { val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace) - serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption + serviceNodes.find { sn => (sn.host, sn.port) == hostPort } } def close(): Unit = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 23196bf1ded..8a8f59ffe99 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -99,6 +99,8 @@ trait ProcBuilder { protected def proxyUser: String + protected def doAsEnabled: Boolean + protected val commands: Iterable[String] def conf: KyuubiConf diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala index ddf88e14924..5336e838991 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ class ChatProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } /** diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index a1e8cdcd38b..4ae714deefa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -38,14 +38,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ */ class FlinkProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } val flinkHome: String = getEngineHome(shortName) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala index 2d4145ff522..903e06575cc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala @@ -37,14 +37,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } protected val hiveHome: String = getEngineHome(shortName) @@ -121,16 +122,17 @@ object HiveProcessBuilder extends Logging { def apply( appUser: String, + doAsEnabled: Boolean, conf: KyuubiConf, engineRefId: String, extraEngineLog: Option[OperationLog], defaultEngineName: String): HiveProcessBuilder = { DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match { - case LOCAL => new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case YARN => warn(s"Hive on YARN model is experimental.") conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName)) - new HiveYarnModeProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case other => throw new KyuubiException(s"Unsupported deploy mode: $other") } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala index ba842cbd4d5..96e1c0b7cfb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala @@ -40,10 +40,12 @@ import org.apache.kyuubi.util.command.CommandLineUtils.{confKeyValue, confKeyVal */ class HiveYarnModeProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, override val engineRefId: String, override val extraEngineLog: Option[OperationLog] = None) - extends HiveProcessBuilder(proxyUser, conf, engineRefId, extraEngineLog) with Logging { + extends HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog) + with Logging { override protected def mainClass: String = "org.apache.kyuubi.engine.hive.deploy.HiveYarnModeSubmitter" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala index 2d08d510199..de93fd44b55 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ class JdbcProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } /** diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 0167f95516d..813e9dce775 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -34,7 +34,8 @@ class SparkBatchProcessBuilder( batchConf: Map[String, String], batchArgs: Seq[String], override val extraEngineLog: Option[OperationLog]) - extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) { +// TODO respect doAsEnabled + extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) { import SparkProcessBuilder._ override protected lazy val commands: Iterable[String] = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 972284f5c06..10e1e6ce9e5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -34,6 +34,7 @@ import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManage import org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST, KUBERNETES_SERVICE_PORT} import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.util.{KubernetesUtils, Validator} @@ -41,14 +42,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } import SparkProcessBuilder._ @@ -135,14 +137,12 @@ class SparkProcessBuilder( var allConf = conf.getAll // if enable sasl kerberos authentication for zookeeper, need to upload the server keytab file - if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) - == AuthTypes.KERBEROS) { + if (AuthTypes.withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) { allConf = allConf ++ zkAuthKeytabFileConf(allConf) } // pass spark engine log path to spark conf (allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach { - case (k, v) => - buffer ++= confKeyValue(convertConfigKey(k), v) + case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v) } setupKerberos(buffer) @@ -157,10 +157,12 @@ class SparkProcessBuilder( protected def setupKerberos(buffer: mutable.Buffer[String]): Unit = { // if the keytab is specified, PROXY_USER is not supported tryKeytab() match { - case None => + case None if doAsEnabled => setSparkUserName(proxyUser, buffer) buffer += PROXY_USER buffer += proxyUser + case None => // doAs disabled + setSparkUserName(Utils.currentUser, buffer) case Some(name) => setSparkUserName(name, buffer) } @@ -175,10 +177,15 @@ class SparkProcessBuilder( try { val ugi = UserGroupInformation .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get) - if (ugi.getShortUserName != proxyUser) { + if (doAsEnabled && ugi.getShortUserName != proxyUser) { warn(s"The session proxy user: $proxyUser is not same with " + - s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " + - s"Fallback to use proxy user.") + s"spark principal: ${ugi.getShortUserName}, skip using keytab. " + + "Fallback to use proxy user.") + None + } else if (!doAsEnabled && ugi.getShortUserName != Utils.currentUser) { + warn(s"The server's user: ${Utils.currentUser} is not same with " + + s"spark principal: ${ugi.getShortUserName}, skip using keytab. " + + "Fallback to use server's user.") None } else { Some(ugi.getShortUserName) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala index 96502fb9607..c07c92fd85b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._ class TrinoProcessBuilder( override val proxyUser: String, + override val doAsEnabled: Boolean, override val conf: KyuubiConf, val engineRefId: String, val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging { @VisibleForTesting - def this(proxyUser: String, conf: KyuubiConf) { - this(proxyUser, conf, "") + def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) { + this(proxyUser, doAsEnabled, conf, "") } override protected def module: String = "kyuubi-trino-engine" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index 54a7c96029a..d0289abb99a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -118,11 +118,10 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi } protected def sendCredentialsIfNeeded(): Unit = { - val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager] sessionManager.credentialsManager.sendCredentialsIfNeeded( session.handle.identifier.toString, - appUser, + session.asInstanceOf[KyuubiSessionImpl].engine.appUser, client.sendCredentials) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala index cfbd2a0ca9a..07c365bc96c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala @@ -70,7 +70,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool } override protected def applicationInfoMap: Option[Map[String, String]] = { - super.applicationInfoMap.map { _ + ("refId" -> session.engine.getEngineRefId()) } + super.applicationInfoMap.map { _ + ("refId" -> session.engine.getEngineRefId) } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index e34f7b2a06d..c0634455f9e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -49,6 +49,7 @@ class KyuubiSessionImpl( conf: Map[String, String], sessionManager: KyuubiSessionManager, sessionConf: KyuubiConf, + doAsEnabled: Boolean, parser: KyuubiParser) extends KyuubiSession(protocol, user, password, ipAddress, conf, sessionManager) { @@ -77,6 +78,7 @@ class KyuubiSessionImpl( lazy val engine: EngineRef = new EngineRef( sessionConf, user, + doAsEnabled, sessionManager.groupProvider, handle.identifier.toString, sessionManager.applicationManager, @@ -333,7 +335,7 @@ class KyuubiSessionImpl( engineAliveFailCount = engineAliveFailCount + 1 if (now - engineLastAlive > engineAliveTimeout && engineAliveFailCount >= engineAliveMaxFailCount) { - error(s"The engineRef[${engine.getEngineRefId()}] is marked as not alive " + error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive " + s"due to a lack of recent successful alive probes. " + s"The time since last successful probe: " + s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. " @@ -342,7 +344,7 @@ class KyuubiSessionImpl( false } else { warn( - s"The engineRef[${engine.getEngineRefId()}] alive probe fails, " + + s"The engineRef[${engine.getEngineRefId}] alive probe fails, " + s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " + s"and has failed $engineAliveFailCount times.", e) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 0696af74fa4..8d84c86beb1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -87,6 +87,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { password: String, ipAddress: String, conf: Map[String, String]): Session = { + val userConf = this.getConf.getUserDefaults(user) new KyuubiSessionImpl( protocol, user, @@ -94,7 +95,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { ipAddress, conf, this, - this.getConf.getUserDefaults(user), + userConf, + userConf.get(ENGINE_DO_AS_ENABLED), parser) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala index f49ebbeb13f..de33ada4586 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala @@ -33,7 +33,7 @@ trait BatchTestHelper { val sparkBatchTestAppName = "Spark Pi" // the app name is hard coded in spark example code val sparkBatchTestResource: Option[String] = { - val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) + val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", true, KyuubiConf()) Paths.get(sparkProcessBuilder.sparkHome, "examples", "jars").toFile.listFiles().find( _.getName.startsWith("spark-examples")) map (_.getCanonicalPath) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala index 4341c541584..b2b266322d1 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala @@ -74,7 +74,7 @@ trait EngineRefTests extends KyuubiFunSuite { domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _)) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val engine = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}", @@ -90,7 +90,7 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val appName = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val appName = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(appName.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL", @@ -102,7 +102,8 @@ trait EngineRefTests extends KyuubiFunSuite { k => conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN) conf.set(k.key, "abc") - val appName2 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val appName2 = + new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(appName2.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}", @@ -121,7 +122,7 @@ trait EngineRefTests extends KyuubiFunSuite { val primaryGroupName = PluginLoader.loadGroupProvider(conf).primaryGroup(user, Map.empty[String, String].asJava) - val engineRef = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engineRef = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engineRef.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", @@ -134,7 +135,8 @@ trait EngineRefTests extends KyuubiFunSuite { k => conf.unset(k) conf.set(k.key, "abc") - val engineRef2 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engineRef2 = + new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engineRef2.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}", @@ -151,7 +153,7 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val appName = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val appName = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(appName.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", @@ -160,7 +162,7 @@ trait EngineRefTests extends KyuubiFunSuite { assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id") conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc") - val appName2 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val appName2 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(appName2.engineSpace === DiscoveryPaths.makePath( s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", @@ -177,31 +179,31 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(ENGINE_POOL_SIZE, -1) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val engine1 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine1 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine1.subdomain === "abc") // unset subdomain and disable engine pool conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN) conf.set(ENGINE_POOL_SIZE, -1) - val engine2 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine2 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine2.subdomain === "default") // set subdomain and 1 <= engine pool size < threshold conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc") conf.set(ENGINE_POOL_SIZE, 1) - val engine3 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine3 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine3.subdomain === "abc") // unset subdomain and 1 <= engine pool size < threshold conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN) conf.set(ENGINE_POOL_SIZE, 3) - val engine4 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine4 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine4.subdomain.startsWith("engine-pool-")) // unset subdomain and engine pool size > threshold conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN) conf.set(ENGINE_POOL_SIZE, 100) - val engine5 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine5 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) val engineNumber = Integer.parseInt(engine5.subdomain.substring(12)) val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get assert(engineNumber <= threshold) @@ -211,7 +213,7 @@ trait EngineRefTests extends KyuubiFunSuite { val enginePoolName = "test-pool" conf.set(ENGINE_POOL_NAME, enginePoolName) conf.set(ENGINE_POOL_SIZE, 3) - val engine6 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine6 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine6.subdomain.startsWith(s"$enginePoolName-")) conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN) @@ -222,7 +224,7 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString()) conf.set(ENGINE_POOL_SELECT_POLICY, "POLLING") (0 until 10).foreach { i => - val engine7 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine7 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) val engineNumber = Integer.parseInt(engine7.subdomain.substring(pool_name.length + 1)) assert(engineNumber == (i % conf.get(ENGINE_POOL_SIZE))) } @@ -237,7 +239,7 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString()) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val engine = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) var port1 = 0 var port2 = 0 @@ -294,6 +296,7 @@ trait EngineRefTests extends KyuubiFunSuite { new EngineRef( cloned, user, + true, PluginLoader.loadGroupProvider(conf), id, null).getOrCreate(client) @@ -326,20 +329,20 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(ENGINE_POOL_SIZE, -1) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val engine1 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine1 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine1.subdomain === "abc") conf.set(ENGINE_POOL_SIZE, 1) - val engine2 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine2 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine2.subdomain === "abc") conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN) - val engine3 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine3 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine3.subdomain.startsWith("engine-pool-")) conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc") conf.set(ENGINE_POOL_IGNORE_SUBDOMAIN, true) - val engine4 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine4 = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) assert(engine4.subdomain.startsWith("engine-pool-")) } @@ -352,7 +355,7 @@ trait EngineRefTests extends KyuubiFunSuite { conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString()) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") - val engine = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine = new EngineRef(conf, user, true, PluginLoader.loadGroupProvider(conf), id, null) DiscoveryClientProvider.withDiscoveryClient(conf) { client => val hp = engine.getOrCreate(client) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala index 40fc818706c..a27506e13ee 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefWithZookeeperSuite.scala @@ -80,6 +80,7 @@ class EngineRefWithZookeeperSuite extends EngineRefTests { new EngineRef( conf1, user, + true, PluginLoader.loadGroupProvider(conf), UUID.randomUUID().toString, null) @@ -95,6 +96,7 @@ class EngineRefWithZookeeperSuite extends EngineRefTests { new EngineRef( conf2, user, + true, PluginLoader.loadGroupProvider(conf), UUID.randomUUID().toString, null) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala index bdb0fa787fb..b667d9c39ed 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala @@ -75,7 +75,7 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite { .set("spark.abc", id) .set("spark.master", "local") .set(SESSION_IDLE_TIMEOUT, Duration.ofMinutes(3).toMillis) - val builder = new SparkProcessBuilder(user, conf) + val builder = new SparkProcessBuilder(user, true, conf) builder.start assert(jps.isSupported(ApplicationManagerInfo(builder.clusterManager()))) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 84be010ed4b..4c0e30d8daa 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -147,14 +147,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine" test("session mode - all hadoop related environment variables are configured") { - val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) { + val builder = new FlinkProcessBuilder("vinoyang", true, sessionModeConf) { override def env: Map[String, String] = envWithAllHadoop } matchActualAndExpectedSessionMode(builder) } test("session mode - only FLINK_HADOOP_CLASSPATH environment variables are configured") { - val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) { + val builder = new FlinkProcessBuilder("vinoyang", true, sessionModeConf) { override def env: Map[String, String] = envDefault + (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop") } @@ -162,7 +162,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { } test("application mode - all hadoop related environment variables are configured") { - val builder = new FlinkProcessBuilder("paullam", applicationModeConf) { + val builder = new FlinkProcessBuilder("paullam", true, applicationModeConf) { override def env: Map[String, String] = envWithAllHadoop } matchActualAndExpectedApplicationMode(builder) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala index a2f39633ca4..5d37bad3044 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilderSuite.scala @@ -26,7 +26,7 @@ class HiveProcessBuilderSuite extends KyuubiFunSuite { test("hive process builder") { val conf = KyuubiConf().set("kyuubi.on", "off") - val builder = new HiveProcessBuilder("kyuubi", conf) { + val builder = new HiveProcessBuilder("kyuubi", true, conf) { override def env: Map[String, String] = super.env + (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop") } val commands = builder.toString.split('\n') @@ -39,7 +39,7 @@ class HiveProcessBuilderSuite extends KyuubiFunSuite { test("default engine memory") { val conf = KyuubiConf() .set(ENGINE_HIVE_EXTRA_CLASSPATH, "/hadoop") - val builder = new HiveProcessBuilder("kyuubi", conf) + val builder = new HiveProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx1g")) } @@ -48,7 +48,7 @@ class HiveProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf() .set(ENGINE_HIVE_MEMORY, "5g") .set(ENGINE_HIVE_EXTRA_CLASSPATH, "/hadoop") - val builder = new HiveProcessBuilder("kyuubi", conf) + val builder = new HiveProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx5g")) } @@ -59,14 +59,14 @@ class HiveProcessBuilderSuite extends KyuubiFunSuite { .set( ENGINE_HIVE_JAVA_OPTIONS, "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") - val builder = new HiveProcessBuilder("kyuubi", conf) + val builder = new HiveProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")) } test("set engine extra classpath") { val conf = KyuubiConf().set(ENGINE_HIVE_EXTRA_CLASSPATH, "/dummy_classpath/*") - val builder = new HiveProcessBuilder("kyuubi", conf) + val builder = new HiveProcessBuilder("kyuubi", true, conf) val commands = builder.toString assert(commands.contains("/dummy_classpath/*")) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala index 7c896309c5a..d913204bf21 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala @@ -25,7 +25,7 @@ class HiveYarnModeProcessBuilderSuite extends KyuubiFunSuite { test("hive yarn mode process builder") { val conf = KyuubiConf().set("kyuubi.on", "off") - val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") { + val builder = new HiveYarnModeProcessBuilder("kyuubi", true, conf, "") { override def env: Map[String, String] = super.env + ("HIVE_CONF_DIR" -> "/etc/hive/conf") + (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop") } @@ -40,7 +40,7 @@ class HiveYarnModeProcessBuilderSuite extends KyuubiFunSuite { test("hadoop conf dir") { val conf = KyuubiConf().set("kyuubi.on", "off") - val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") { + val builder = new HiveYarnModeProcessBuilder("kyuubi", true, conf, "") { override def env: Map[String, String] = super.env + ("HADOOP_CONF_DIR" -> "/etc/hadoop/conf") + (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop") @@ -51,7 +51,7 @@ class HiveYarnModeProcessBuilderSuite extends KyuubiFunSuite { test("yarn conf dir") { val conf = KyuubiConf().set("kyuubi.on", "off") - val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") { + val builder = new HiveYarnModeProcessBuilder("kyuubi", true, conf, "") { override def env: Map[String, String] = super.env + ("YARN_CONF_DIR" -> "/etc/hadoop/yarn/conf") + (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilderSuite.scala index 2be39d0f319..7d3be568dca 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilderSuite.scala @@ -26,7 +26,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf().set("kyuubi.on", "off") .set(ENGINE_JDBC_CONNECTION_URL.key, "") .set(ENGINE_JDBC_CONNECTION_PASSWORD.key, "123456") - val builder = new JdbcProcessBuilder("kyuubi", conf) + val builder = new JdbcProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("bin/java"), "wrong exec") assert(command.contains("--conf kyuubi.session.user=kyuubi")) @@ -38,7 +38,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { test("capture error from jdbc process builder") { val e1 = intercept[IllegalArgumentException]( - new JdbcProcessBuilder("kyuubi", KyuubiConf()).processBuilder) + new JdbcProcessBuilder("kyuubi", true, KyuubiConf()).processBuilder) assert(e1.getMessage contains s"Jdbc server url can not be null! Please set ${ENGINE_JDBC_CONNECTION_URL.key}") } @@ -46,7 +46,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { test("default engine memory") { val conf = KyuubiConf() .set(ENGINE_JDBC_CONNECTION_URL.key, "") - val builder = new JdbcProcessBuilder("kyuubi", conf) + val builder = new JdbcProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx1g")) } @@ -55,7 +55,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf() .set(ENGINE_JDBC_MEMORY, "5g") .set(ENGINE_JDBC_CONNECTION_URL.key, "") - val builder = new JdbcProcessBuilder("kyuubi", conf) + val builder = new JdbcProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx5g")) } @@ -66,7 +66,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { .set( ENGINE_JDBC_JAVA_OPTIONS, "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") - val builder = new JdbcProcessBuilder("kyuubi", conf) + val builder = new JdbcProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")) } @@ -75,7 +75,7 @@ class JdbcProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf() .set(ENGINE_JDBC_CONNECTION_URL.key, "") .set(ENGINE_JDBC_EXTRA_CLASSPATH, "/dummy_classpath/*") - val builder = new JdbcProcessBuilder("kyuubi", conf) + val builder = new JdbcProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("/dummy_classpath/*")) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 8cbbed5af40..74c28539b2e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -41,7 +41,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { private def conf = KyuubiConf().set("kyuubi.on", "off") test("spark process builder") { - val builder = new SparkProcessBuilder("kentyao", conf) + val builder = new SparkProcessBuilder("kentyao", true, conf) val commands = builder.toString.split(' ') assert(commands(2) === "org.apache.kyuubi.engine.spark.SparkSQLEngine") assert(commands.contains("spark.kyuubi.on=off")) @@ -57,7 +57,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { } test("capture error from spark process builder") { - val processBuilder = new SparkProcessBuilder("kentyao", conf.set("spark.ui.port", "abc")) + val processBuilder = new SparkProcessBuilder("kentyao", true, conf.set("spark.ui.port", "abc")) processBuilder.start eventually(timeout(90.seconds), interval(500.milliseconds)) { val error = processBuilder.getError @@ -67,7 +67,10 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { } val processBuilder1 = - new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy")) + new SparkProcessBuilder( + "kentyao", + true, + conf.set("spark.hive.metastore.uris", "thrift://dummy")) processBuilder1.start eventually(timeout(90.seconds), interval(500.milliseconds)) { @@ -79,7 +82,10 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { test("engine log truncation") { val pb = - new SparkProcessBuilder("kentyao", conf.set("spark.hive.metastore.uris", "thrift://dummy")) + new SparkProcessBuilder( + "kentyao", + true, + conf.set("spark.hive.metastore.uris", "thrift://dummy")) pb.start eventually(timeout(90.seconds), interval(500.milliseconds)) { val error1 = pb.getError @@ -89,6 +95,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { val pb2 = new SparkProcessBuilder( "kentyao", + true, conf.set("spark.hive.metastore.uris", "thrift://dummy") .set(KyuubiConf.ENGINE_ERROR_MAX_SIZE, 200)) pb2.start @@ -99,7 +106,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { } val pb3 = - new SparkProcessBuilder("kentyao", conf.set("spark.kerberos.principal", testPrincipal)) + new SparkProcessBuilder("kentyao", true, conf.set("spark.kerberos.principal", testPrincipal)) pb3.start eventually(timeout(90.seconds), interval(500.milliseconds)) { val error1 = pb3.getError @@ -110,29 +117,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { } test("proxy user or keytab") { - val b1 = new SparkProcessBuilder("kentyao", conf) + val b1 = new SparkProcessBuilder("kentyao", true, conf) assert(b1.toString.contains("--proxy-user kentyao")) val conf1 = conf.set("spark.kerberos.principal", testPrincipal) - val b2 = new SparkProcessBuilder("kentyao", conf1) + val b2 = new SparkProcessBuilder("kentyao", true, conf1) assert(b2.toString.contains("--proxy-user kentyao")) val conf2 = conf.set("spark.kerberos.keytab", testKeytab) - val b3 = new SparkProcessBuilder("kentyao", conf2) + val b3 = new SparkProcessBuilder("kentyao", true, conf2) assert(b3.toString.contains("--proxy-user kentyao")) tryWithSecurityEnabled { val conf3 = conf.set("spark.kerberos.principal", testPrincipal) .set("spark.kerberos.keytab", "testKeytab") - val b4 = new SparkProcessBuilder(Utils.currentUser, conf3) + val b4 = new SparkProcessBuilder(Utils.currentUser, true, conf3) assert(b4.toString.contains(s"--proxy-user ${Utils.currentUser}")) val conf4 = conf.set("spark.kerberos.principal", testPrincipal) .set("spark.kerberos.keytab", testKeytab) - val b5 = new SparkProcessBuilder("kentyao", conf4) + val b5 = new SparkProcessBuilder("kentyao", true, conf4) assert(b5.toString.contains("--proxy-user kentyao")) - val b6 = new SparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4) + val b6 = new SparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), true, conf4) assert(!b6.toString.contains("--proxy-user kentyao")) } } @@ -228,13 +235,13 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { val conf = KyuubiConf() conf.set(ENGINE_SPARK_MAIN_RESOURCE, hdfsPath) - val b1 = new SparkProcessBuilder("test", conf) + val b1 = new SparkProcessBuilder("test", true, conf) assert(b1.mainResource.get.startsWith("hdfs://")) assert(b1.mainResource.get == hdfsPath) // user specified jar not exist, get default jar and expect not equals conf.set(ENGINE_SPARK_MAIN_RESOURCE, jarPath.toString) - val b2 = new SparkProcessBuilder("test", conf) + val b2 = new SparkProcessBuilder("test", true, conf) assert(b2.mainResource.getOrElse("") != jarPath.toString) } @@ -244,7 +251,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { conf.set("spark.vino", "yang") conf.set("kent", "yao") conf.set("hadoop.kent", "yao") - val builder = new SparkProcessBuilder("", conf) + val builder = new SparkProcessBuilder("", true, conf) val commands = builder.toString.split(' ') assert(commands.contains("spark.kyuubi.kent=yao")) assert(commands.contains("spark.vino=yang")) @@ -258,14 +265,14 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab) conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal) - val b1 = new SparkProcessBuilder("test", conf) + val b1 = new SparkProcessBuilder("test", true, conf) assert(b1.toString.contains(s"--conf spark.files=$testKeytab")) } test("SparkProcessBuilder commands immutable") { val conf = KyuubiConf(false) val engineRefId = UUID.randomUUID().toString - val pb = new SparkProcessBuilder("", conf, engineRefId) + val pb = new SparkProcessBuilder("", true, conf, engineRefId) assert(pb.toString.contains(engineRefId)) val engineRefId2 = UUID.randomUUID().toString conf.set("spark.yarn.tags", engineRefId2) @@ -276,7 +283,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") { val proxyName = "kyuubi" val conf1 = KyuubiConf(false).set("spark.master", "k8s://test:12345") - val b1 = new SparkProcessBuilder(proxyName, conf1) + val b1 = new SparkProcessBuilder(proxyName, true, conf1) val c1 = b1.toString.split(' ') assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName")) assert(c1.contains(s"spark.executorEnv.SPARK_USER_NAME=$proxyName")) @@ -286,7 +293,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { .set("spark.kerberos.principal", testPrincipal) .set("spark.kerberos.keytab", testKeytab) val name = ServiceUtils.getShortName(testPrincipal) - val b2 = new SparkProcessBuilder(name, conf2) + val b2 = new SparkProcessBuilder(name, true, conf2) val c2 = b2.toString.split(' ') assert(c2.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$name")) assert(c2.contains(s"spark.executorEnv.SPARK_USER_NAME=$name")) @@ -295,14 +302,14 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { // Test no-kubernetes case val conf3 = KyuubiConf(false) - val b3 = new SparkProcessBuilder(proxyName, conf3) + val b3 = new SparkProcessBuilder(proxyName, true, conf3) val c3 = b3.toString.split(' ') assert(!c3.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName")) assert(!c3.contains(s"spark.executorEnv.SPARK_USER_NAME=$proxyName")) } test("[KYUUBI #5009] Test pass spark engine log path to spark conf") { - val b1 = new SparkProcessBuilder("kyuubi", conf) + val b1 = new SparkProcessBuilder("kyuubi", true, conf) assert( b1.toString.contains( s"$CONF spark.$KYUUBI_ENGINE_LOG_PATH_KEY=${b1.engineLog.getAbsolutePath}")) @@ -313,6 +320,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { val appName = "test-app" val processBuilder = new SparkProcessBuilder( "kyuubi", + true, conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"), engineRefId) val conf1 = Map(APP_KEY -> "test-app") @@ -339,6 +347,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { assert(driverPodName4 === Some(s"kyuubi-test-$engineRefId-driver")) val newProcessBuilder = new SparkProcessBuilder( "kyuubi", + true, conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster").set( KUBERNETES_FORCIBLY_REWRITE_DRIVER_POD_NAME, true), @@ -353,6 +362,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { val appName = "test-app" val processBuilder = new SparkProcessBuilder( "kyuubi", + true, conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"), engineRefId) val conf1 = Map(APP_KEY -> "test-app") @@ -375,6 +385,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId")) val newProcessBuilder = new SparkProcessBuilder( "kyuubi", + true, conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster").set( KUBERNETES_FORCIBLY_REWRITE_EXEC_POD_NAME_PREFIX, true), @@ -386,7 +397,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { } test("extract spark core scala version") { - val builder = new SparkProcessBuilder("kentyao", KyuubiConf(false)) + val builder = new SparkProcessBuilder("kentyao", true, KyuubiConf(false)) Seq( "spark-core_2.13-3.4.1.jar", "spark-core_2.13-3.5.0-abc-20230921.jar", @@ -428,19 +439,19 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { test("default spark.yarn.maxAppAttempts conf in yarn mode") { val conf1 = KyuubiConf(false) conf1.set("spark.master", "k8s://test:12345") - val builder1 = new SparkProcessBuilder("", conf1) + val builder1 = new SparkProcessBuilder("", true, conf1) val commands1 = builder1.toString.split(' ') assert(!commands1.contains("spark.yarn.maxAppAttempts")) val conf2 = KyuubiConf(false) conf2.set("spark.master", "yarn") - val builder2 = new SparkProcessBuilder("", conf2) + val builder2 = new SparkProcessBuilder("", true, conf2) val commands2 = builder2.toString.split(' ') assert(commands2.contains("spark.yarn.maxAppAttempts=1")) } } class FakeSparkProcessBuilder(config: KyuubiConf) - extends SparkProcessBuilder("fake", config) { + extends SparkProcessBuilder("fake", true, config) { override protected lazy val commands: Iterable[String] = Seq("ls") } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilderSuite.scala index a4dfad186a1..f6cafb0d2da 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilderSuite.scala @@ -28,7 +28,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf() .set(ENGINE_TRINO_CONNECTION_URL, "dummy_url") .set(ENGINE_TRINO_CONNECTION_CATALOG, "dummy_catalog") - val builder = new TrinoProcessBuilder("kyuubi", conf) + val builder = new TrinoProcessBuilder("kyuubi", true, conf) val commands = builder.toString.split("\n") assert(commands.head.contains("java")) assert(builder.toString.contains(s"--conf ${KYUUBI_SESSION_USER_KEY}=kyuubi")) @@ -39,7 +39,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { test("capture error from trino process builder") { val e1 = intercept[IllegalArgumentException]( - new TrinoProcessBuilder("kyuubi", KyuubiConf()).processBuilder) + new TrinoProcessBuilder("kyuubi", true, KyuubiConf()).processBuilder) assert(e1.getMessage contains s"Trino server url can not be null! Please set ${ENGINE_TRINO_CONNECTION_URL.key}") } @@ -48,7 +48,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { val conf = KyuubiConf() .set(ENGINE_TRINO_CONNECTION_URL, "dummy_url") .set(ENGINE_TRINO_CONNECTION_CATALOG, "dummy_catalog") - val builder = new TrinoProcessBuilder("kyuubi", conf) + val builder = new TrinoProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx1g")) } @@ -58,7 +58,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { .set(ENGINE_TRINO_CONNECTION_URL, "dummy_url") .set(ENGINE_TRINO_CONNECTION_CATALOG, "dummy_catalog") .set(ENGINE_TRINO_MEMORY, "5g") - val builder = new TrinoProcessBuilder("kyuubi", conf) + val builder = new TrinoProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-Xmx5g")) } @@ -70,7 +70,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { .set( ENGINE_TRINO_JAVA_OPTIONS, "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") - val builder = new TrinoProcessBuilder("kyuubi", conf) + val builder = new TrinoProcessBuilder("kyuubi", true, conf) val command = builder.toString assert(command.contains("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")) } @@ -80,7 +80,7 @@ class TrinoProcessBuilderSuite extends KyuubiFunSuite { .set(ENGINE_TRINO_CONNECTION_URL, "dummy_url") .set(ENGINE_TRINO_CONNECTION_CATALOG, "dummy_catalog") .set(ENGINE_TRINO_EXTRA_CLASSPATH, "/dummy_classpath/*") - val builder = new TrinoProcessBuilder("kyuubi", conf) + val builder = new TrinoProcessBuilder("kyuubi", true, conf) val commands = builder.toString assert(commands.contains("/dummy_classpath/*")) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index c69a97cefd6..95aa3de025d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -273,7 +273,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val engine = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", @@ -317,7 +323,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val engine = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL", @@ -363,7 +375,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val id = UUID.randomUUID().toString val engine = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", Utils.currentUser, @@ -398,9 +416,15 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") // In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user - // the user is the proxyUser, and in our test it is normalUser + // the sessionUser is the proxyUser, and in our test it is normalUser val engine = - new EngineRef(conf.clone, user = normalUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + sessionUser = normalUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) // so as the firstChild in engineSpace we use normalUser val engineSpace = DiscoveryPaths.makePath( @@ -459,7 +483,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val engine = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", @@ -503,7 +533,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val engine = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL", @@ -554,7 +590,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val id1 = UUID.randomUUID().toString val engine1 = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id1, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id1, + null) val engineSpace1 = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", Utils.currentUser, @@ -562,7 +604,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val id2 = UUID.randomUUID().toString val engine2 = - new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id2, null) + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id2, + null) val engineSpace2 = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", Utils.currentUser, @@ -619,9 +667,15 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") // In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user - // the user is the proxyUser, and in our test it is normalUser + // the sessionUser is the proxyUser, and in our test it is normalUser val engine = - new EngineRef(conf.clone, user = normalUser, PluginLoader.loadGroupProvider(conf), id, null) + new EngineRef( + conf.clone, + sessionUser = normalUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) // so as the firstChild in engineSpace we use normalUser val engineSpace = DiscoveryPaths.makePath( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala index 986b171c142..0c262d2389a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala @@ -60,7 +60,8 @@ class AdminCtlSuite extends RestClientTestHelper with TestPrematureExit { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val user = ldapUser - val engine = new EngineRef(conf.clone, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine = + new EngineRef(conf.clone, user, true, PluginLoader.loadGroupProvider(conf), id, null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala index 4a05dc079c0..43eb06482e7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala @@ -53,7 +53,8 @@ class AdminRestApiSuite extends RestClientTestHelper { conf.set(KyuubiConf.AUTHENTICATION_METHOD, Seq("LDAP", "CUSTOM")) conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") val user = ldapUser - val engine = new EngineRef(conf.clone, user, PluginLoader.loadGroupProvider(conf), id, null) + val engine = + new EngineRef(conf.clone, user, true, PluginLoader.loadGroupProvider(conf), id, null) val engineSpace = DiscoveryPaths.makePath( s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala index 8e33eb38229..104a3161f82 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala @@ -30,7 +30,7 @@ class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper override val sparkBatchTestMainClass: String = null // For PySpark, mainClass isn't needed. override val sparkBatchTestAppName: String = "PythonPi" override val sparkBatchTestResource: Option[String] = { - val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) + val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", true, KyuubiConf()) val piScript = Paths.get(sparkProcessBuilder.sparkHome, "examples/src/main/python/pi.py") Some(piScript.toAbsolutePath.toString)