Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5328] Batch supports priority scheduling #5352

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.metadata.store.jdbc.database.type | SQLITE | The database type for server jdbc metadata store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver `org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to specify corresponding JDBC driver.</li> Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specify them with the prefix: kyuubi.metadata.store.jdbc.datasource. For example, kyuubi.metadata.store.jdbc.datasource.connectionTimeout=10000. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.driver | &lt;undefined&gt; | JDBC driver class name for server jdbc metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.password || The password for server JDBC metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.priority.enabled | false | Whether to enable picking up jobs from the metadata store in order of priority.If this feature is disabled, setting kyuubi.batch.priority will not take effect.However, it is recommended to enable this feature if you already have a metadata store that can support mixed order indexing(such as MySQL 8).See why recommend this in https://github.com/apache/kyuubi/pull/5329 | boolean | 1.8.0 |
| kyuubi.metadata.store.jdbc.url | jdbc:sqlite:kyuubi_state_store.db | The JDBC url for server JDBC metadata store. By default, it is a SQLite database url, and the state information is not shared across kyuubi instances. To enable high availability for multiple kyuubi instances, please specify a production JDBC url. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.user || The username for server JDBC metadata store. | string | 1.6.0 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
final val KYUUBI_ENGINE_NAME = "kyuubi.engine.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
priority: Int = 10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a magic code here. Default priority is worth to be extracted to explicit static value , and further mentioned in config docs. Or it's confusing for user and developers reviewing the UTs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's at least document it should be consistent with the metadata table DDL definition

peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
case CUSTOM => new GenericDatabaseDialect
}

private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)

private val datasourceProperties =
JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
private val hikariConfig = new HikariConfig(datasourceProperties)
Expand Down Expand Up @@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
|cluster_manager
|cluster_manager,
|priority
Copy link
Member

@turboFei turboFei Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the metadata table schema change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, metadata table schema changed in #5329

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @zwangsheng

|)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

JdbcUtils.withConnection { connection =>
Expand All @@ -190,15 +193,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
metadata.clusterManager.orNull)
metadata.clusterManager.orNull,
metadata.priority)
}
}

override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
|ORDER BY create_time ASC LIMIT 1
|ORDER BY ${if (priorityEnabled) "priority DESC," else ""} create_time ASC LIMIT 1
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
.serverOnly
.stringConf
.createWithDefault("")

val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
.doc("Whether to enable picking up jobs from the metadata store in order of priority." +
"If this feature is disabled, setting kyuubi.batch.priority will not take effect." +
"However, it is recommended to enable this feature if you already have a metadata store " +
"that can support mixed order indexing(such as MySQL 8)." +
"See why recommend this in https://github.com/apache/kyuubi/pull/5329")
Copy link
Member

@pan3793 pan3793 Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggested change:

Whether to enable the priority scheduling for batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy for batch job scheduling. Note: it's recommended to use MySQL 8.0 as metadata backend when enabling this feature, since it may cause significant performance issues on MySQL 5.7 due to the lack of support for mixed order index. See more details at KYUUBI #5329.

Also, please take care of the space between each line.

.version("1.8.0")
.serverOnly
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
Expand Down Expand Up @@ -177,7 +178,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
Expand Down Expand Up @@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)
engineType = batchRequest.getBatchType,
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineType
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
import org.apache.kyuubi.session.SessionType

class MetadataManagerSuite extends KyuubiFunSuite {
Expand Down Expand Up @@ -142,6 +145,94 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) {
zwangsheng marked this conversation as resolved.
Show resolved Hide resolved
metadataManager =>
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = Metadata(
identifier = "mock_batch_job_1",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
priority = 20,
createTime = time + 10000)
val mockBatchJob2 = Metadata(
identifier = "mock_batch_job_2",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = Metadata(
identifier = "mock_batch_job_3",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time + 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be extracted to a dedicate method for generating mocked batch job objects?
And make sure same job objects are the same in positive and negative tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Reused and modified newMetadata method to build mock batch job and keep use same mock object in both positive and negative case.

metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the highest priority batch job
val metadata1 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))

// pick the oldest batch job when same priority
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) {
metadataManager =>
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = Metadata(
identifier = "mock_batch_job_1",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
priority = 20,
createTime = time + 10000)
val mockBatchJob2 = Metadata(
identifier = "mock_batch_job_2",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = Metadata(
identifier = "mock_batch_job_3",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time + 500)
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the oldest batch job
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}
}

private def withMetadataManager(
confOverlay: Map[String, String],
newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
Expand Down
Loading