Skip to content

Commit

Permalink
Merge branch 'apache:master' into trino-scala13
Browse files Browse the repository at this point in the history
  • Loading branch information
naive-zhang authored Dec 8, 2024
2 parents 6a038ec + dc3ac89 commit 852398d
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 99 deletions.
3 changes: 0 additions & 3 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ io.vertx:vertx-grpc
com.squareup.retrofit2:retrofit
com.squareup.okhttp3:okhttp
org.apache.kafka:kafka-clients
org.lz4:lz4-java
org.xerial.snappy:snappy-java
org.xerial:sqlite-jdbc

BSD
Expand All @@ -319,7 +317,6 @@ jline:jline
com.thoughtworks.paranamer:paranamer
com.google.protobuf:protobuf-java-util
com.google.protobuf:protobuf-java
com.github.luben:zstd-jni
org.postgresql:postgresql

Eclipse Distribution License - v 1.0
Expand Down
16 changes: 0 additions & 16 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -1063,19 +1063,3 @@ which can be obtained at:
* license/LICENSE.kafka.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/apache/kafka

This product optionally depends on 'snappy-java', Snappy compression and
decompression for Java, which can be obtained at:

* LICENSE:
* license/LICENSE.snappy-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/xerial/snappy-java

This product optionally depends on 'lz4-java', Lz4 compression and
decompression for Java, which can be obtained at:

* LICENSE:
* license/LICENSE.lz4-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/lz4/lz4-java
2 changes: 2 additions & 0 deletions charts/kyuubi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ metrics:
enabled: false
# List of service endpoints serving metrics to be scraped by Prometheus, see Prometheus Operator docs for more details
endpoints: []
# endpoints:
# - port: prometheus
# Additional labels to be used to make ServiceMonitor discovered by Prometheus
labels: {}

Expand Down
3 changes: 0 additions & 3 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ log4j-api/2.24.2//log4j-api-2.24.2.jar
log4j-core/2.24.2//log4j-core-2.24.2.jar
log4j-slf4j-impl/2.24.2//log4j-slf4j-impl-2.24.2.jar
logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
metrics-core/4.2.26//metrics-core-4.2.26.jar
metrics-jmx/4.2.26//metrics-jmx-4.2.26.jar
metrics-json/4.2.26//metrics-json-4.2.26.jar
Expand Down Expand Up @@ -173,7 +172,6 @@ simpleclient_tracer_otel_agent/0.16.0//simpleclient_tracer_otel_agent-0.16.0.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml-engine/2.7//snakeyaml-engine-2.7.jar
snakeyaml/2.2//snakeyaml-2.2.jar
snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar
sqlite-jdbc/3.46.1.3//sqlite-jdbc-3.46.1.3.jar
swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
swagger-core/2.2.1//swagger-core-2.2.1.jar
Expand All @@ -185,4 +183,3 @@ units/1.7//units-1.7.jar
vertx-core/4.5.3//vertx-core-4.5.3.jar
vertx-grpc/4.5.3//vertx-grpc-4.5.3.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zstd-jni/1.5.5-1//zstd-jni-1.5.5-1.jar
37 changes: 19 additions & 18 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

101 changes: 51 additions & 50 deletions docs/monitor/metrics.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,19 @@ object KyuubiConf {
.intConf
.createWithDefault(65536)

val METADATA_SEARCH_WINDOW: OptionalConfigEntry[Long] =
buildConf("kyuubi.metadata.search.window")
.doc("The time window to restrict user queries to metadata within a specific period, " +
"starting from the current time to the past. It only affects `GET /api/v1/batches` API. " +
"You may want to set this to short period to improve query performance and reduce load " +
"on the metadata store when administer want to reserve the metadata for long time. " +
"The side-effects is that, the metadata created outside the window will not be " +
"invisible to users. If it is undefined, all metadata will be visible for users.")
.version("1.10.1")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createOptional

val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object MetricsConstants {
final val OPERATION_TOTAL: String = OPERATION + "total"
final val OPERATION_STATE: String = OPERATION + "state"
final val OPERATION_EXEC_TIME: String = OPERATION + "exec_time"
final val OPERATION_BATCH_PENDING_MAX_ELAPSE: String = OPERATION + "batch_pending_max_elapse"

final private val BACKEND_SERVICE = KYUUBI + "backend_service."
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ class BatchJobSubmission(
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}

def getPendingElapsedTime: Long = {
if (state == OperationState.PENDING) {
System.currentTimeMillis() - createTime
} else {
0L
}
}
}

object BatchJobSubmission {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.{JavaUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

Expand Down Expand Up @@ -202,6 +204,14 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

private def getBatchPendingMaxElapse(): Long = {
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession => session.batchJobSubmissionOp.getPendingElapsedTime
case _ => 0L
}
if (batchPendingElapseTimes.isEmpty) 0L else batchPendingElapseTimes.max
}

def waitForServerStarted(): Unit = {
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
Expand All @@ -220,6 +230,9 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
isStarted.set(true)
startBatchChecker()
recoverBatchSessions()
MetricsSystem.tracing { ms =>
ms.registerGauge(OPERATION_BATCH_PENDING_MAX_ELAPSE, getBatchPendingMaxElapse, 0)
}
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW)

private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
Expand Down Expand Up @@ -420,13 +421,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}

val createTimeFilter =
math.max(createTime, metadataSearchWindow.map(System.currentTimeMillis() - _).getOrElse(0L))
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
requestName = batchName,
createTime = createTime,
createTime = createTimeFilter,
endTime = endTime)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
new GetBatchesResponse(from, batches.size, batches.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ class MetadataManager extends AbstractService("MetadataManager") {
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
buildBatch)
withMetadataRequestMetrics(_metadataStore.getMetadataList(
filter,
from,
size,
// if create_file field is set, order by create_time, which is faster, otherwise by key_id
orderBy = if (filter.createTime > 0) Some("create_time") else Some("key_id"),
direction = if (desc) "DESC" else "ASC")).map(buildBatch)
}

def countBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ trait MetadataStore extends Closeable {
* @param from the metadata offset.
* @param size the size to get.
* @param desc the order of metadata list.
* @param orderBy the order by column, default is the auto increment primary key, `key_id`.
* @param direction the order direction, default is `ASC`.
* @return selected metadata list.
*/
def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata]
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata]

/**
* Count the metadata list with filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata] = {
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
queryBuilder.append(if (desc) "DESC " else "ASC ")
orderBy.foreach(o => queryBuilder.append(s" ORDER BY $o $direction "))
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "true")
.queryParam("createTime", "1")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
Expand Down
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,20 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down

0 comments on commit 852398d

Please sign in to comment.