Skip to content

Commit

Permalink
[KYUUBI #5328][V2] Kyuubi Server Pick Metadata job with priority
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Oct 7, 2023
1 parent 2690d6d commit 654ad84
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
final val KYUUBI_SESSION_PRIORITY = "kyuubi.session.priority"
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
priority: Option[Int] = None,
peerInstanceClosed: Boolean = false) {
def appMgrInfo: ApplicationManagerInfo = {
ApplicationManagerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|request_args,
|create_time,
|engine_type,
|cluster_manager
|cluster_manager,
|priority
|)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

JdbcUtils.withConnection { connection =>
Expand All @@ -190,15 +191,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
valueAsString(metadata.requestArgs),
metadata.createTime,
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
metadata.clusterManager.orNull)
metadata.clusterManager.orNull,
metadata.priority.orNull)
}
}

override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
JdbcUtils.executeQueryWithRowMapper(
s"""SELECT identifier FROM $METADATA_TABLE
|WHERE state=?
|ORDER BY create_time ASC LIMIT 1
|ORDER BY priority DESC, create_time ASC LIMIT 1
|""".stripMargin) { stmt =>
stmt.setString(1, OperationState.INITIALIZED.toString)
} { resultSet =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_PRIORITY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
Expand Down Expand Up @@ -177,7 +178,8 @@ class KyuubiBatchSession(
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
priority = conf.get(KYUUBI_SESSION_PRIORITY).map(_.toInt))

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
package org.apache.kyuubi.session

import java.util.concurrent.{Semaphore, TimeUnit}

import scala.collection.JavaConverters._

import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
Expand Down Expand Up @@ -237,7 +234,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)
engineType = batchRequest.getBatchType,
priority = conf.get(KYUUBI_SESSION_PRIORITY).map(_.toInt))

// there is a chance that operation failed w/ duplicated key error
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))
Expand Down

0 comments on commit 654ad84

Please sign in to comment.