Skip to content

Commit

Permalink
update default priority desc & improve UT
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Oct 12, 2023
1 parent 21ceccb commit 7ed2551
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
/**
* default priority is 10, higher priority will be scheduled first
* when enabled metadata store priority feature
*/
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
// keep consistent with table creation DDL
// find why we set 10 as default in KYUUBI #5329
priority: Int = 10,
peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineType
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.operation.OperationState
Expand Down Expand Up @@ -146,35 +145,26 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}

test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
// build mock batch jobs
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = newMetadata(
identifier = "mock_batch_job_1",
state = OperationState.INITIALIZED.toString,
createTime = time + 10000,
// larger than default priority 10
priority = 20)
val mockBatchJob2 = newMetadata(
identifier = "mock_batch_job_2",
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = newMetadata(
identifier = "mock_batch_job_3",
state = OperationState.INITIALIZED.toString,
createTime = time + 5000)

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) {
metadataManager =>
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = Metadata(
identifier = "mock_batch_job_1",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
priority = 20,
createTime = time + 10000)
val mockBatchJob2 = Metadata(
identifier = "mock_batch_job_2",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = Metadata(
identifier = "mock_batch_job_3",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time + 10000)
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)
Expand All @@ -193,33 +183,6 @@ class MetadataManagerSuite extends KyuubiFunSuite {

withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) {
metadataManager =>
val mockKyuubiInstance = "mock_kyuubi_instance"
val time = System.currentTimeMillis()
val mockBatchJob1 = Metadata(
identifier = "mock_batch_job_1",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
priority = 20,
createTime = time + 10000)
val mockBatchJob2 = Metadata(
identifier = "mock_batch_job_2",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time)
val mockBatchJob3 = Metadata(
identifier = "mock_batch_job_3",
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
engineType = EngineType.SPARK_SQL.toString,
state = OperationState.INITIALIZED.toString,
createTime = time + 500)
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)
Expand Down Expand Up @@ -260,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
}
}

private def newMetadata(): Metadata = {
private def newMetadata(
identifier: String = UUID.randomUUID().toString,
state: String = OperationState.PENDING.toString,
createTime: Long = System.currentTimeMillis(),
priority: Int = 10): Metadata = {
Metadata(
identifier = UUID.randomUUID().toString,
identifier = identifier,
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
ipAddress = "127.0.0.1",
kyuubiInstance = "localhost:10009",
state = "PENDING",
state = state,
resource = "intern",
className = "org.apache.kyuubi.SparkWC",
requestName = "kyuubi_batch",
requestConf = Map("spark.master" -> "local"),
requestArgs = Seq("100"),
createTime = System.currentTimeMillis(),
createTime = createTime,
engineType = "spark",
priority = priority,
clusterManager = Some("local"))
}
}

0 comments on commit 7ed2551

Please sign in to comment.