Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from apache:master #49

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ io.vertx:vertx-grpc
com.squareup.retrofit2:retrofit
com.squareup.okhttp3:okhttp
org.apache.kafka:kafka-clients
org.lz4:lz4-java
org.xerial.snappy:snappy-java
org.xerial:sqlite-jdbc

BSD
Expand All @@ -319,7 +317,6 @@ jline:jline
com.thoughtworks.paranamer:paranamer
com.google.protobuf:protobuf-java-util
com.google.protobuf:protobuf-java
com.github.luben:zstd-jni
org.postgresql:postgresql

Eclipse Distribution License - v 1.0
Expand Down
16 changes: 0 additions & 16 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -1063,19 +1063,3 @@ which can be obtained at:
* license/LICENSE.kafka.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/apache/kafka

This product optionally depends on 'snappy-java', Snappy compression and
decompression for Java, which can be obtained at:

* LICENSE:
* license/LICENSE.snappy-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/xerial/snappy-java

This product optionally depends on 'lz4-java', Lz4 compression and
decompression for Java, which can be obtained at:

* LICENSE:
* license/LICENSE.lz4-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/lz4/lz4-java
3 changes: 0 additions & 3 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ log4j-api/2.24.2//log4j-api-2.24.2.jar
log4j-core/2.24.2//log4j-core-2.24.2.jar
log4j-slf4j-impl/2.24.2//log4j-slf4j-impl-2.24.2.jar
logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
metrics-core/4.2.26//metrics-core-4.2.26.jar
metrics-jmx/4.2.26//metrics-jmx-4.2.26.jar
metrics-json/4.2.26//metrics-json-4.2.26.jar
Expand Down Expand Up @@ -173,7 +172,6 @@ simpleclient_tracer_otel_agent/0.16.0//simpleclient_tracer_otel_agent-0.16.0.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml-engine/2.7//snakeyaml-engine-2.7.jar
snakeyaml/2.2//snakeyaml-2.2.jar
snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar
sqlite-jdbc/3.46.1.3//sqlite-jdbc-3.46.1.3.jar
swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
swagger-core/2.2.1//swagger-core-2.2.1.jar
Expand All @@ -185,4 +183,3 @@ units/1.7//units-1.7.jar
vertx-core/4.5.3//vertx-core-4.5.3.jar
vertx-grpc/4.5.3//vertx-grpc-4.5.3.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zstd-jni/1.5.5-1//zstd-jni-1.5.5-1.jar
37 changes: 19 additions & 18 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

101 changes: 51 additions & 50 deletions docs/monitor/metrics.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,19 @@ object KyuubiConf {
.intConf
.createWithDefault(65536)

val METADATA_SEARCH_WINDOW: OptionalConfigEntry[Long] =
buildConf("kyuubi.metadata.search.window")
.doc("The time window to restrict user queries to metadata within a specific period, " +
"starting from the current time to the past. It only affects `GET /api/v1/batches` API. " +
"You may want to set this to short period to improve query performance and reduce load " +
"on the metadata store when administer want to reserve the metadata for long time. " +
"The side-effects is that, the metadata created outside the window will not be " +
"invisible to users. If it is undefined, all metadata will be visible for users.")
.version("1.10.1")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createOptional

val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object MetricsConstants {
final val OPERATION_TOTAL: String = OPERATION + "total"
final val OPERATION_STATE: String = OPERATION + "state"
final val OPERATION_EXEC_TIME: String = OPERATION + "exec_time"
final val OPERATION_BATCH_PENDING_MAX_ELAPSE: String = OPERATION + "batch_pending_max_elapse"

final private val BACKEND_SERVICE = KYUUBI + "backend_service."
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ class BatchJobSubmission(
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}

def getPendingElapsedTime: Long = {
if (state == OperationState.PENDING) {
System.currentTimeMillis() - createTime
} else {
0L
}
}
}

object BatchJobSubmission {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.{JavaUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

Expand Down Expand Up @@ -202,6 +204,14 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

private def getBatchPendingMaxElapse(): Long = {
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession => session.batchJobSubmissionOp.getPendingElapsedTime
case _ => 0L
}
if (batchPendingElapseTimes.isEmpty) 0L else batchPendingElapseTimes.max
}

def waitForServerStarted(): Unit = {
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
Expand All @@ -220,6 +230,9 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
isStarted.set(true)
startBatchChecker()
recoverBatchSessions()
MetricsSystem.tracing { ms =>
ms.registerGauge(OPERATION_BATCH_PENDING_MAX_ELAPSE, getBatchPendingMaxElapse, 0)
}
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW)

private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
Expand Down Expand Up @@ -420,13 +421,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}

val createTimeFilter =
math.max(createTime, metadataSearchWindow.map(System.currentTimeMillis() - _).getOrElse(0L))
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
requestName = batchName,
createTime = createTime,
createTime = createTimeFilter,
endTime = endTime)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
new GetBatchesResponse(from, batches.size, batches.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ class MetadataManager extends AbstractService("MetadataManager") {
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
buildBatch)
withMetadataRequestMetrics(_metadataStore.getMetadataList(
filter,
from,
size,
// if create_file field is set, order by create_time, which is faster, otherwise by key_id
orderBy = if (filter.createTime > 0) Some("create_time") else Some("key_id"),
direction = if (desc) "DESC" else "ASC")).map(buildBatch)
}

def countBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ trait MetadataStore extends Closeable {
* @param from the metadata offset.
* @param size the size to get.
* @param desc the order of metadata list.
* @param orderBy the order by column, default is the auto increment primary key, `key_id`.
* @param direction the order direction, default is `ASC`.
* @return selected metadata list.
*/
def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata]
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata]

/**
* Count the metadata list with filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata] = {
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
queryBuilder.append(if (desc) "DESC " else "ASC ")
orderBy.foreach(o => queryBuilder.append(s" ORDER BY $o $direction "))
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "true")
.queryParam("createTime", "1")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
Expand Down
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,20 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Loading