Skip to content

Commit

Permalink
[KYUUBI #6829] Add metrics for batch pending max elapse time
Browse files Browse the repository at this point in the history
### Why are the changes needed?

1. add metrics `kyuubi.operartion.batch_pending_max_elapse` for the batch pending max elapse time, which is helpful for batch health monitoring, and we can send alert if the batch pending elapse time too long
2. For `GET /api/v1/batches` api, limit the max time window for listing batches, which is helpful that, we want to reserve more metadata in kyuubi server end, for example: 90 days, but for list batches, we just want to allow user to search the last 7 days. It is optional. And if `create_time` is specified, order by `create_time` instead of `key_id`.
https://github.com/apache/kyuubi/blob/68a6f48da53dd0ad2e20b450a41ca600b8c1e1d2/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.8.0.mysql.sql#L32

### How was this patch tested?

GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #6829 from turboFei/batch_pending_time.

Closes #6829

ee4f931 [Wang, Fei] docs
bf8169a [Wang, Fei] comments
f493a2a [Wang, Fei] new config
ab7b6db [Wang, Fei] ut
1680175 [Wang, Fei] in memory session
510a30b [Wang, Fei] batchSearchWindow opt
1e93dd2 [Wang, Fei] save

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
turboFei authored and pan3793 committed Dec 5, 2024
1 parent f2cacd3 commit 3167692
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 76 deletions.
37 changes: 19 additions & 18 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

101 changes: 51 additions & 50 deletions docs/monitor/metrics.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,19 @@ object KyuubiConf {
.intConf
.createWithDefault(65536)

val METADATA_SEARCH_WINDOW: OptionalConfigEntry[Long] =
buildConf("kyuubi.metadata.search.window")
.doc("The time window to restrict user queries to metadata within a specific period, " +
"starting from the current time to the past. It only affects `GET /api/v1/batches` API. " +
"You may want to set this to short period to improve query performance and reduce load " +
"on the metadata store when administer want to reserve the metadata for long time. " +
"The side-effects is that, the metadata created outside the window will not be " +
"invisible to users. If it is undefined, all metadata will be visible for users.")
.version("1.10.1")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createOptional

val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object MetricsConstants {
final val OPERATION_TOTAL: String = OPERATION + "total"
final val OPERATION_STATE: String = OPERATION + "state"
final val OPERATION_EXEC_TIME: String = OPERATION + "exec_time"
final val OPERATION_BATCH_PENDING_MAX_ELAPSE: String = OPERATION + "batch_pending_max_elapse"

final private val BACKEND_SERVICE = KYUUBI + "backend_service."
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ class BatchJobSubmission(
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}

def getPendingElapsedTime: Long = {
if (state == OperationState.PENDING) {
System.currentTimeMillis() - createTime
} else {
0L
}
}
}

object BatchJobSubmission {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.{JavaUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

Expand Down Expand Up @@ -202,6 +204,14 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

private def getBatchPendingMaxElapse(): Long = {
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession => session.batchJobSubmissionOp.getPendingElapsedTime
case _ => 0L
}
if (batchPendingElapseTimes.isEmpty) 0L else batchPendingElapseTimes.max
}

def waitForServerStarted(): Unit = {
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
Expand All @@ -220,6 +230,9 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
isStarted.set(true)
startBatchChecker()
recoverBatchSessions()
MetricsSystem.tracing { ms =>
ms.registerGauge(OPERATION_BATCH_PENDING_MAX_ELAPSE, getBatchPendingMaxElapse, 0)
}
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW)

private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
Expand Down Expand Up @@ -420,13 +421,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}

val createTimeFilter =
math.max(createTime, metadataSearchWindow.map(System.currentTimeMillis() - _).getOrElse(0L))
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
requestName = batchName,
createTime = createTime,
createTime = createTimeFilter,
endTime = endTime)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
new GetBatchesResponse(from, batches.size, batches.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ class MetadataManager extends AbstractService("MetadataManager") {
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
buildBatch)
withMetadataRequestMetrics(_metadataStore.getMetadataList(
filter,
from,
size,
// if create_file field is set, order by create_time, which is faster, otherwise by key_id
orderBy = if (filter.createTime > 0) Some("create_time") else Some("key_id"),
direction = if (desc) "DESC" else "ASC")).map(buildBatch)
}

def countBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ trait MetadataStore extends Closeable {
* @param from the metadata offset.
* @param size the size to get.
* @param desc the order of metadata list.
* @param orderBy the order by column, default is the auto increment primary key, `key_id`.
* @param direction the order direction, default is `ASC`.
* @return selected metadata list.
*/
def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata]
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata]

/**
* Count the metadata list with filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata] = {
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
queryBuilder.append(if (desc) "DESC " else "ASC ")
orderBy.foreach(o => queryBuilder.append(s" ORDER BY $o $direction "))
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "true")
.queryParam("createTime", "1")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
Expand Down

0 comments on commit 3167692

Please sign in to comment.