Skip to content

Commit

Permalink
[KYUUBI #5328] Batch supports priority scheduling
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Follow #5329 and close #5328:

1. Add new config `kyuubi.metadata.store.jdbc.priority.enabled` to control whether enable priority scheduling, due to users may experience performance issues when using MySQL5.7 as metastore backend and enabling kyuubi batch v2 priority feature.
2. When priority scheduling is enabled, `KyuubiBatchService` picks metadata job with `ORDER BY priority DESC, create_time ASC`.
3. Insert metadata with priority field, default priority value is `10`.
4. Add new config `kyuubi.batch.priority` for each batch priority.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5352 from zwangsheng/KYUUBI#5328.

Closes #5328

687ed1e [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
58621b5 [zwangsheng] fix comments
1bf81e7 [zwangsheng] fix style
7ed2551 [zwangsheng] update default priority desc & improve UT
21ceccb [zwangsheng] fix doc
27fc5e8 [zwangsheng] enrich desc
c0bbc0d [zwangsheng] fix style
6b8d0f0 [zwangsheng] fix comment
67eb252 [zwangsheng] fix comment
e1705c3 [zwangsheng] Add config to control whether pick order by priority or not
129a467 [zwangsheng] Add unit test for pickBatchForSubmitting
fcaf85d [zwangsheng] Fix unit test
f7ca221 [zwangsheng] Fix unit test
8d4b276 [wangsheng] fix code style
4c6b990 [wangsheng] fix comments
654ad84 [zwangsheng] [KYUUBI #5328][V2] Kyuubi Server Pick Metadata job with priority

Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: wangsheng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit b24d94e)
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
3 people committed Oct 16, 2023
1 parent 59e8d69 commit 160624d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.metadata.store.jdbc.database.type | SQLITE | The database type for server jdbc metadata store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver `org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to specify corresponding JDBC driver.</li> Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specify them with the prefix: kyuubi.metadata.store.jdbc.datasource. For example, kyuubi.metadata.store.jdbc.datasource.connectionTimeout=10000. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.driver | &lt;undefined&gt; | JDBC driver class name for server jdbc metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.password || The password for server JDBC metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.priority.enabled | false | Whether to enable the priority scheduling for batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy for batch job scheduling. Note: this feature may cause significant performance issues when using MySQL 5.7 as the metastore backend due to the lack of support for mixed order index. See more details at KYUUBI #5329. | boolean | 1.8.0 |
| kyuubi.metadata.store.jdbc.url | jdbc:sqlite:kyuubi_state_store.db | The JDBC url for server JDBC metadata store. By default, it is a SQLite database url, and the state information is not shared across kyuubi instances. To enable high availability for multiple kyuubi instances, please specify a production JDBC url. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.user || The username for server JDBC metadata store. | string | 1.6.0 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
// default priority is 10, higher priority will be scheduled first
// when enabled metadata store priority feature
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
// keep consistent with table creation DDL
// find why we set 10 as default in KYUUBI #5329
priority: Int = 10,
peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
case CUSTOM => new GenericDatabaseDialect
}

private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)

private val datasourceProperties =
JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
private val hikariConfig = new HikariConfig(datasourceProperties)
Expand Down Expand Up @@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
|cluster_manager
|cluster_manager,
|priority
|)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

JdbcUtils.withConnection { connection =>
Expand All @@ -190,15 +193,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
metadata.clusterManager.orNull)
metadata.clusterManager.orNull,
metadata.priority)
}
}

override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
|ORDER BY create_time ASC LIMIT 1
|ORDER BY ${if (priorityEnabled) "priority DESC, " else ""}create_time ASC LIMIT 1
|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
.serverOnly
.stringConf
.createWithDefault("")

val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
.doc("Whether to enable the priority scheduling for batch impl v2. " +
"When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy " +
"for batch job scheduling. Note: this feature may cause significant performance issues " +
"when using MySQL 5.7 as the metastore backend due to the lack of support " +
"for mixed order index. See more details at KYUUBI #5329.")
.version("1.8.0")
.serverOnly
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
Expand Down Expand Up @@ -181,7 +182,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
Expand Down Expand Up @@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)
engineType = batchRequest.getBatchType,
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
import org.apache.kyuubi.session.SessionType

class MetadataManagerSuite extends KyuubiFunSuite {
Expand Down Expand Up @@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
// build mock batch jobs
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = newMetadata(
identifier = "mock_batch_job_1",
state = OperationState.INITIALIZED.toString,
createTime = time + 10000,
// larger than default priority 10
priority = 20)
val mockBatchJob2 = newMetadata(
identifier = "mock_batch_job_2",
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = newMetadata(
identifier = "mock_batch_job_3",
state = OperationState.INITIALIZED.toString,
createTime = time + 5000)

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) {
metadataManager =>
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the highest priority batch job
val metadata1 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))

// pick the oldest batch job when same priority
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) {
metadataManager =>
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the oldest batch job
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}
}

private def withMetadataManager(
confOverlay: Map[String, String],
newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
Expand Down Expand Up @@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

private def newMetadata(): Metadata = {
private def newMetadata(
identifier: String = UUID.randomUUID().toString,
state: String = OperationState.PENDING.toString,
createTime: Long = System.currentTimeMillis(),
priority: Int = 10): Metadata = {
Metadata(
identifier = UUID.randomUUID().toString,
identifier = identifier,
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
ipAddress = "127.0.0.1",
kyuubiInstance = "localhost:10009",
state = "PENDING",
state = state,
resource = "intern",
className = "org.apache.kyuubi.SparkWC",
requestName = "kyuubi_batch",
requestConf = Map("spark.master" -> "local"),
requestArgs = Seq("100"),
createTime = System.currentTimeMillis(),
createTime = createTime,
engineType = "spark",
priority = priority,
clusterManager = Some("local"))
}
}

0 comments on commit 160624d

Please sign in to comment.