From b24d94e74f3cc6648c09d0d44ead781b1b2e70b1 Mon Sep 17 00:00:00 2001 From: zwangsheng Date: Mon, 16 Oct 2023 22:43:02 +0800 Subject: [PATCH] [KYUUBI #5328] Batch supports priority scheduling ### _Why are the changes needed?_ Follow #5329 and close #5328: 1. Add new config `kyuubi.metadata.store.jdbc.priority.enabled` to control whether enable priority scheduling, due to users may experience performance issues when using MySQL5.7 as metastore backend and enabling kyuubi batch v2 priority feature. 2. When priority scheduling is enabled, `KyuubiBatchService` picks metadata job with `ORDER BY priority DESC, create_time ASC`. 3. Insert metadata with priority field, default priority value is `10`. 4. Add new config `kyuubi.batch.priority` for each batch priority. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5352 from zwangsheng/KYUUBI#5328. Closes #5328 687ed1ed6 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala 58621b557 [zwangsheng] fix comments 1bf81e75c [zwangsheng] fix style 7ed2551b3 [zwangsheng] update default priority desc & improve UT 21ceccb01 [zwangsheng] fix doc 27fc5e825 [zwangsheng] enrich desc c0bbc0dfd [zwangsheng] fix style 6b8d0f091 [zwangsheng] fix comment 67eb2524d [zwangsheng] fix comment e1705c34d [zwangsheng] Add config to control whether pick order by priority or not 129a46729 [zwangsheng] Add unit test for pickBatchForSubmitting fcaf85d92 [zwangsheng] Fix unit test f7ca2219e [zwangsheng] Fix unit test 8d4b276ff [wangsheng] fix code style 4c6b99090 [wangsheng] fix comments 654ad843a [zwangsheng] [KYUUBI #5328][V2] Kyuubi Server Pick Metadata job with priority Lead-authored-by: zwangsheng Co-authored-by: wangsheng <2213335496@qq.com> Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- docs/configuration/settings.md | 1 + .../kyuubi/config/KyuubiReservedKeys.scala | 3 + .../kyuubi/server/metadata/api/Metadata.scala | 3 + .../metadata/jdbc/JDBCMetadataStore.scala | 12 ++-- .../metadata/jdbc/JDBCMetadataStoreConf.scala | 12 ++++ .../kyuubi/session/KyuubiBatchSession.scala | 4 +- .../kyuubi/session/KyuubiSessionManager.scala | 5 +- .../metadata/MetadataManagerSuite.scala | 67 +++++++++++++++++-- 8 files changed, 96 insertions(+), 11 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 2869a59cd92..d9d8d95efb1 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -347,6 +347,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.metadata.store.jdbc.database.type | SQLITE | The database type for server jdbc metadata store.
  • (Deprecated) DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.
  • SQLITE: SQLite3, JDBC driver `org.sqlite.JDBC`.
  • MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.
  • CUSTOM: User-defined database type, need to specify corresponding JDBC driver.
  • Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specify them with the prefix: kyuubi.metadata.store.jdbc.datasource. For example, kyuubi.metadata.store.jdbc.datasource.connectionTimeout=10000. | string | 1.6.0 | | kyuubi.metadata.store.jdbc.driver | <undefined> | JDBC driver class name for server jdbc metadata store. | string | 1.6.0 | | kyuubi.metadata.store.jdbc.password || The password for server JDBC metadata store. | string | 1.6.0 | +| kyuubi.metadata.store.jdbc.priority.enabled | false | Whether to enable the priority scheduling for batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy for batch job scheduling. Note: this feature may cause significant performance issues when using MySQL 5.7 as the metastore backend due to the lack of support for mixed order index. See more details at KYUUBI #5329. | boolean | 1.8.0 | | kyuubi.metadata.store.jdbc.url | jdbc:sqlite:kyuubi_state_store.db | The JDBC url for server JDBC metadata store. By default, it is a SQLite database url, and the state information is not shared across kyuubi instances. To enable high availability for multiple kyuubi instances, please specify a production JDBC url. | string | 1.6.0 | | kyuubi.metadata.store.jdbc.user || The username for server JDBC metadata store. | string | 1.6.0 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala index eb209caec99..592425a4b4c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala @@ -26,6 +26,9 @@ object KyuubiReservedKeys { final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign" final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user" final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url" + // default priority is 10, higher priority will be scheduled first + // when enabled metadata store priority feature + final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority" final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded" final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id" final val KYUUBI_ENGINE_ID = "kyuubi.engine.id" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index 3e3d9482841..0553cf90b54 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -78,6 +78,9 @@ case class Metadata( engineState: String = null, engineError: Option[String] = None, endTime: Long = 0L, + // keep consistent with table creation DDL + // find why we set 10 as default in KYUUBI #5329 + priority: Int = 10, peerInstanceClosed: Boolean = false) { def appMgrInfo: ApplicationManagerInfo = { ApplicationManagerInfo( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index dcb9c0f6685..419fa844750 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { case CUSTOM => new GenericDatabaseDialect } + private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED) + private val datasourceProperties = JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf) private val hikariConfig = new HikariConfig(datasourceProperties) @@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { |request_args, |create_time, |engine_type, - |cluster_manager + |cluster_manager, + |priority |) - |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + |VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |""".stripMargin JdbcUtils.withConnection { connection => @@ -190,7 +193,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { valueAsString(metadata.requestArgs), metadata.createTime, Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull, - metadata.clusterManager.orNull) + metadata.clusterManager.orNull, + metadata.priority) } } @@ -198,7 +202,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { JdbcUtils.executeQueryWithRowMapper( s"""SELECT identifier FROM $METADATA_TABLE |WHERE state=? - |ORDER BY create_time ASC LIMIT 1 + |ORDER BY ${if (priorityEnabled) "priority DESC, " else ""}create_time ASC LIMIT 1 |""".stripMargin) { stmt => stmt.setString(1, OperationState.INITIALIZED.toString) } { resultSet => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala index dd5d741382f..292cf417483 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala @@ -93,4 +93,16 @@ object JDBCMetadataStoreConf { .serverOnly .stringConf .createWithDefault("") + + val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.metadata.store.jdbc.priority.enabled") + .doc("Whether to enable the priority scheduling for batch impl v2. " + + "When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy " + + "for batch job scheduling. Note: this feature may cause significant performance issues " + + "when using MySQL 5.7 as the metastore backend due to the lack of support " + + "for mixed order index. See more details at KYUUBI #5329.") + .version("1.8.0") + .serverOnly + .booleanConf + .createWithDefault(false) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 8e4c5137fbf..8489e6d307b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.client.util.BatchUtils._ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} @@ -181,7 +182,8 @@ class KyuubiBatchSession( requestArgs = batchArgs, createTime = createTime, engineType = batchType, - clusterManager = batchJobSubmissionOp.builder.clusterManager()) + clusterManager = batchJobSubmissionOp.builder.clusterManager(), + priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10)) // there is a chance that operation failed w/ duplicated key error sessionManager.insertMetadata(newMetadata) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 8d323469959..02a3ee32c7c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest} import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY} import org.apache.kyuubi.credentials.HadoopCredentialsManager import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.metrics.MetricsConstants._ @@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { requestConf = conf, requestArgs = batchRequest.getArgs.asScala.toSeq, createTime = System.currentTimeMillis(), - engineType = batchRequest.getBatchType) + engineType = batchRequest.getBatchType, + priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10)) // there is a chance that operation failed w/ duplicated key error metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false)) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala index 564b5ebe939..fe7fa586858 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala @@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.metrics.MetricsConstants._ +import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED import org.apache.kyuubi.session.SessionType class MetadataManagerSuite extends KyuubiFunSuite { @@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite { } } + test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") { + // build mock batch jobs + val mockKyuubiInstance = "mock_kyuubi_instance" + val time = System.currentTimeMillis() + val mockBatchJob1 = newMetadata( + identifier = "mock_batch_job_1", + state = OperationState.INITIALIZED.toString, + createTime = time + 10000, + // larger than default priority 10 + priority = 20) + val mockBatchJob2 = newMetadata( + identifier = "mock_batch_job_2", + state = OperationState.INITIALIZED.toString, + createTime = time) + val mockBatchJob3 = newMetadata( + identifier = "mock_batch_job_3", + state = OperationState.INITIALIZED.toString, + createTime = time + 5000) + + withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) { + metadataManager => + metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false) + metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false) + metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false) + + // pick the highest priority batch job + val metadata1 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance) + assert(metadata1.exists(m => m.identifier === "mock_batch_job_1")) + + // pick the oldest batch job when same priority + val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance) + assert(metadata2.exists(m => m.identifier === "mock_batch_job_2")) + + val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance) + assert(metadata3.exists(m => m.identifier === "mock_batch_job_3")) + } + + withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) { + metadataManager => + metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false) + metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false) + metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false) + + // pick the oldest batch job + val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance) + assert(metadata2.exists(m => m.identifier === "mock_batch_job_2")) + + val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance) + assert(metadata3.exists(m => m.identifier === "mock_batch_job_3")) + } + } + private def withMetadataManager( confOverlay: Map[String, String], newMetadataMgr: () => MetadataManager = () => new MetadataManager())( @@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite { } } - private def newMetadata(): Metadata = { + private def newMetadata( + identifier: String = UUID.randomUUID().toString, + state: String = OperationState.PENDING.toString, + createTime: Long = System.currentTimeMillis(), + priority: Int = 10): Metadata = { Metadata( - identifier = UUID.randomUUID().toString, + identifier = identifier, sessionType = SessionType.BATCH, realUser = "kyuubi", username = "kyuubi", ipAddress = "127.0.0.1", kyuubiInstance = "localhost:10009", - state = "PENDING", + state = state, resource = "intern", className = "org.apache.kyuubi.SparkWC", requestName = "kyuubi_batch", requestConf = Map("spark.master" -> "local"), requestArgs = Seq("100"), - createTime = System.currentTimeMillis(), + createTime = createTime, engineType = "spark", + priority = priority, clusterManager = Some("local")) } }