Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5328] Batch supports priority scheduling #5352

Closed
wants to merge 16 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
final val KYUUBI_ENGINE_NAME = "kyuubi.engine.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
priority: Int = 10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a magic code here. Default priority is worth to be extracted to explicit static value , and further mentioned in config docs. Or it's confusing for user and developers reviewing the UTs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's at least document it should be consistent with the metadata table DDL definition

peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
|cluster_manager
|cluster_manager,
|priority
Copy link
Member

@turboFei turboFei Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the metadata table schema change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, metadata table schema changed in #5329

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @zwangsheng

|)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

JdbcUtils.withConnection { connection =>
Expand All @@ -190,15 +191,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
metadata.clusterManager.orNull)
metadata.clusterManager.orNull,
metadata.priority)
}
}

override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
|ORDER BY create_time ASC LIMIT 1
|ORDER BY priority DESC, create_time ASC LIMIT 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed before, there would cause significant performance issues on MySQL 5.7(which is adopted widely at the current stage), we should provide a switch to allow user disable this feature

|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
Expand Down Expand Up @@ -177,7 +178,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
Expand Down Expand Up @@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)
engineType = batchRequest.getBatchType,
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))

// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))
Expand Down
Loading