diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala index e5dc43a9d2d..288c55b0b25 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala @@ -27,6 +27,10 @@ object KyuubiReservedKeys { final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user" final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url" final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded" + /** + * default priority is 10, higher priority will be scheduled first + * when enabled metadata store priority feature + */ final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority" final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id" final val KYUUBI_ENGINE_ID = "kyuubi.engine.id" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index f2474dcd2c4..0553cf90b54 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -78,6 +78,8 @@ case class Metadata( engineState: String = null, engineError: Option[String] = None, endTime: Long = 0L, + // keep consistent with table creation DDL + // find why we set 10 as default in KYUUBI #5329 priority: Int = 10, peerInstanceClosed: Boolean = false) { def appMgrInfo: ApplicationManagerInfo = { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala index 2b42b7f021a..fe7fa586858 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala @@ -26,7 +26,6 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ -import org.apache.kyuubi.engine.EngineType import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.operation.OperationState @@ -146,35 +145,26 @@ class MetadataManagerSuite extends KyuubiFunSuite { } test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") { + // build mock batch jobs + val mockKyuubiInstance = "mock_kyuubi_instance" + val time = System.currentTimeMillis() + val mockBatchJob1 = newMetadata( + identifier = "mock_batch_job_1", + state = OperationState.INITIALIZED.toString, + createTime = time + 10000, + // larger than default priority 10 + priority = 20) + val mockBatchJob2 = newMetadata( + identifier = "mock_batch_job_2", + state = OperationState.INITIALIZED.toString, + createTime = time) + val mockBatchJob3 = newMetadata( + identifier = "mock_batch_job_3", + state = OperationState.INITIALIZED.toString, + createTime = time + 5000) + withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) { metadataManager => - val mockKyuubiInstance = "mock_kyuubi_instance" - val time = System.currentTimeMillis() - val mockBatchJob1 = Metadata( - identifier = "mock_batch_job_1", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - priority = 20, - createTime = time + 10000) - val mockBatchJob2 = Metadata( - identifier = "mock_batch_job_2", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - createTime = time) - val mockBatchJob3 = Metadata( - identifier = "mock_batch_job_3", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - createTime = time + 10000) metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false) metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false) metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false) @@ -193,33 +183,6 @@ class MetadataManagerSuite extends KyuubiFunSuite { withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) { metadataManager => - val mockKyuubiInstance = "mock_kyuubi_instance" - val time = System.currentTimeMillis() - val mockBatchJob1 = Metadata( - identifier = "mock_batch_job_1", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - priority = 20, - createTime = time + 10000) - val mockBatchJob2 = Metadata( - identifier = "mock_batch_job_2", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - createTime = time) - val mockBatchJob3 = Metadata( - identifier = "mock_batch_job_3", - sessionType = SessionType.BATCH, - realUser = "kyuubi", - username = "kyuubi", - engineType = EngineType.SPARK_SQL.toString, - state = OperationState.INITIALIZED.toString, - createTime = time + 500) metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false) metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false) metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false) @@ -260,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite { } } - private def newMetadata(): Metadata = { + private def newMetadata( + identifier: String = UUID.randomUUID().toString, + state: String = OperationState.PENDING.toString, + createTime: Long = System.currentTimeMillis(), + priority: Int = 10): Metadata = { Metadata( - identifier = UUID.randomUUID().toString, + identifier = identifier, sessionType = SessionType.BATCH, realUser = "kyuubi", username = "kyuubi", ipAddress = "127.0.0.1", kyuubiInstance = "localhost:10009", - state = "PENDING", + state = state, resource = "intern", className = "org.apache.kyuubi.SparkWC", requestName = "kyuubi_batch", requestConf = Map("spark.master" -> "local"), requestArgs = Seq("100"), - createTime = System.currentTimeMillis(), + createTime = createTime, engineType = "spark", + priority = priority, clusterManager = Some("local")) } }