diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala index 51deee84a35..b8d0bba3165 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala @@ -17,8 +17,8 @@ package org.apache.kyuubi.engine.trino -import java.util.{Timer, TimerTask} -import java.util.concurrent.Executors +import java.util.OptionalDouble +import java.util.concurrent.{Executors, TimeUnit} import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -38,6 +38,7 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_DEBUG import org.apache.kyuubi.engine.trino.TrinoConf.DATA_PROCESSING_POOL_SIZE import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.ThreadUtils /** * Trino client communicate with trino cluster. @@ -58,7 +59,9 @@ class TrinoStatement( private lazy val showProcess = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS) private lazy val showDebug = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS_DEBUG) - private lazy val timer = new Timer("refresh status info", true) + private val timer = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Trino-Status-Printer", false) + private var lastStats: OptionalDouble = OptionalDouble.empty() implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(dataProcessingPoolSize)) @@ -105,7 +108,7 @@ class TrinoStatement( getData() } } else { - timer.cancel() + timer.shutdown() Verify.verify(trino.isFinished) if (operationLog.isDefined && showProcess) { TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug) @@ -153,18 +156,22 @@ class TrinoStatement( } def printStatusInfo(): Unit = { if (operationLog.isDefined && showProcess) { - timer.schedule( - new TimerTask { - override def run(): Unit = { - if (trino.isRunning) { - TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug) - } + timer.scheduleWithFixedDelay( + () => { + if (trino.isRunning) { + lastStats = + TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug, lastStats) } }, 500L, - kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL)) + kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL), + TimeUnit.MILLISECONDS) } } + + def stopPrinter(): Unit = { + timer.shutdown() + } } object TrinoStatement { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala index 2654f54133b..70a4d88aa52 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatusPrinter.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.trino +import java.util.OptionalDouble import java.util.concurrent.TimeUnit._ import io.airlift.units.DataSize @@ -35,7 +36,8 @@ object TrinoStatusPrinter { def printStatusInfo( client: StatementClient, operationLog: OperationLog, - debug: Boolean = false): Unit = { + debug: Boolean = false, + lastStats: OptionalDouble = null): OptionalDouble = { val out = new TrinoConsoleProgressBar(operationLog) val results = if (client.isRunning) { @@ -46,11 +48,16 @@ object TrinoStatusPrinter { val stats = results.getStats + if (lastStats != null && + stats.getProgressPercentage.equals(lastStats)) { + return lastStats + } + val wallTime = Duration.succinctDuration(stats.getElapsedTimeMillis(), MILLISECONDS) val nodes = stats.getNodes if ((nodes == 0) || (stats.getTotalSplits == 0)) { - return + return stats.getProgressPercentage } // Query 12, FINISHED, 1 node @@ -122,6 +129,7 @@ object TrinoStatusPrinter { s"[${formatCountRate(stats.getProcessedRows(), wallTime, false)} rows/s, " + s"${formatDataRate(DataSize.ofBytes(stats.getProcessedBytes()), wallTime, true)}]" out.printLine(statsLine) + stats.getProgressPercentage } def percentage(count: Double, total: Double): Int = { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 0ba2297c394..250b8d64b1e 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -127,6 +127,7 @@ class ExecuteStatement( } catch { onError(cancel = true) } finally { + trinoStatement.stopPrinter() shutdownTimeoutMonitor() } }