Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5328] Batch supports priority scheduling #5352

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.metadata.store.jdbc.database.type | SQLITE | The database type for server jdbc metadata store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver `org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to specify corresponding JDBC driver.</li> Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specify them with the prefix: kyuubi.metadata.store.jdbc.datasource. For example, kyuubi.metadata.store.jdbc.datasource.connectionTimeout=10000. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.driver | &lt;undefined&gt; | JDBC driver class name for server jdbc metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.password || The password for server JDBC metadata store. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.priority.enabled | false | Whether to enable the priority scheduling for batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy for batch job scheduling. Note: this feature may cause significant performance issues when using MySQL 5.7 as the metastore backend due to the lack of support for mixed order index. See more details at KYUUBI #5329. | boolean | 1.8.0 |
| kyuubi.metadata.store.jdbc.url | jdbc:sqlite:kyuubi_state_store.db | The JDBC url for server JDBC metadata store. By default, it is a SQLite database url, and the state information is not shared across kyuubi instances. To enable high availability for multiple kyuubi instances, please specify a production JDBC url. | string | 1.6.0 |
| kyuubi.metadata.store.jdbc.user || The username for server JDBC metadata store. | string | 1.6.0 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
// default priority is 10, higher priority will be scheduled first
// when enabled metadata store priority feature
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
// keep consistent with table creation DDL
// find why we set 10 as default in KYUUBI #5329
priority: Int = 10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a magic code here. Default priority is worth to be extracted to explicit static value , and further mentioned in config docs. Or it's confusing for user and developers reviewing the UTs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's at least document it should be consistent with the metadata table DDL definition

peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
case CUSTOM => new GenericDatabaseDialect
}

private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)

private val datasourceProperties =
JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
private val hikariConfig = new HikariConfig(datasourceProperties)
Expand Down Expand Up @@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
|cluster_manager
|cluster_manager,
|priority
Copy link
Member

@turboFei turboFei Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the metadata table schema change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, metadata table schema changed in #5329

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @zwangsheng

|)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

JdbcUtils.withConnection { connection =>
Expand All @@ -190,15 +193,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
metadata.clusterManager.orNull)
metadata.clusterManager.orNull,
metadata.priority)
}
}

override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
|ORDER BY create_time ASC LIMIT 1
|ORDER BY ${if (priorityEnabled) "priority DESC, " else ""}create_time ASC LIMIT 1
|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
.serverOnly
.stringConf
.createWithDefault("")

val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
.doc("Whether to enable the priority scheduling for batch impl v2. " +
"When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy " +
"for batch job scheduling. Note: this feature may cause significant performance issues " +
"when using MySQL 5.7 as the metastore backend due to the lack of support " +
"for mixed order index. See more details at KYUUBI #5329.")
.version("1.8.0")
.serverOnly
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
Expand Down Expand Up @@ -177,7 +178,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
Expand Down Expand Up @@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)
engineType = batchRequest.getBatchType,
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
import org.apache.kyuubi.session.SessionType

class MetadataManagerSuite extends KyuubiFunSuite {
Expand Down Expand Up @@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
// build mock batch jobs
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = newMetadata(
identifier = "mock_batch_job_1",
state = OperationState.INITIALIZED.toString,
createTime = time + 10000,
// larger than default priority 10
priority = 20)
val mockBatchJob2 = newMetadata(
identifier = "mock_batch_job_2",
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = newMetadata(
identifier = "mock_batch_job_3",
state = OperationState.INITIALIZED.toString,
createTime = time + 5000)

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) {
zwangsheng marked this conversation as resolved.
Show resolved Hide resolved
metadataManager =>
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the highest priority batch job
val metadata1 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))

// pick the oldest batch job when same priority
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) {
metadataManager =>
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)

// pick the oldest batch job
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))

val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
}
}

private def withMetadataManager(
confOverlay: Map[String, String],
newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
Expand Down Expand Up @@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

private def newMetadata(): Metadata = {
private def newMetadata(
identifier: String = UUID.randomUUID().toString,
state: String = OperationState.PENDING.toString,
createTime: Long = System.currentTimeMillis(),
priority: Int = 10): Metadata = {
Metadata(
identifier = UUID.randomUUID().toString,
identifier = identifier,
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
ipAddress = "127.0.0.1",
kyuubiInstance = "localhost:10009",
state = "PENDING",
state = state,
resource = "intern",
className = "org.apache.kyuubi.SparkWC",
requestName = "kyuubi_batch",
requestConf = Map("spark.master" -> "local"),
requestArgs = Seq("100"),
createTime = System.currentTimeMillis(),
createTime = createTime,
engineType = "spark",
priority = priority,
clusterManager = Some("local"))
}
}