Skip to content

Commit

Permalink
[KYUUBI #6696] Fix Trino Status Printer to Prevent Thread Leak
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

## Describe Your Solution 🔧

- use `newDaemonSingleThreadScheduledExecutor` avoid `timer` thread leak
- reduce same status info out

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6696 from lsm1/branch-fix-trino-printer.

Closes #6696

01f917c [senmiaoliu] fix style
0d20fd1 [senmiaoliu] fix trino info printer thread leak

Authored-by: senmiaoliu <[email protected]>
Signed-off-by: senmiaoliu <[email protected]>
  • Loading branch information
lsm1 committed Sep 19, 2024
1 parent 635c793 commit 8056235
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.kyuubi.engine.trino

import java.util.{Timer, TimerTask}
import java.util.concurrent.Executors
import java.util.OptionalDouble
import java.util.concurrent.{Executors, TimeUnit}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
Expand All @@ -38,6 +38,7 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_DEBUG
import org.apache.kyuubi.engine.trino.TrinoConf.DATA_PROCESSING_POOL_SIZE
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.ThreadUtils

/**
* Trino client communicate with trino cluster.
Expand All @@ -58,7 +59,9 @@ class TrinoStatement(
private lazy val showProcess = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS)
private lazy val showDebug = kyuubiConf.get(ENGINE_TRINO_SHOW_PROGRESS_DEBUG)

private lazy val timer = new Timer("refresh status info", true)
private val timer =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Trino-Status-Printer", false)
private var lastStats: OptionalDouble = OptionalDouble.empty()

implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(dataProcessingPoolSize))
Expand Down Expand Up @@ -105,7 +108,7 @@ class TrinoStatement(
getData()
}
} else {
timer.cancel()
timer.shutdown()
Verify.verify(trino.isFinished)
if (operationLog.isDefined && showProcess) {
TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug)
Expand Down Expand Up @@ -153,18 +156,22 @@ class TrinoStatement(
}
def printStatusInfo(): Unit = {
if (operationLog.isDefined && showProcess) {
timer.schedule(
new TimerTask {
override def run(): Unit = {
if (trino.isRunning) {
TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug)
}
timer.scheduleWithFixedDelay(
() => {
if (trino.isRunning) {
lastStats =
TrinoStatusPrinter.printStatusInfo(trino, operationLog.get, showDebug, lastStats)
}
},
500L,
kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL))
kyuubiConf.get(KyuubiConf.ENGINE_TRINO_SHOW_PROGRESS_UPDATE_INTERVAL),
TimeUnit.MILLISECONDS)
}
}

def stopPrinter(): Unit = {
timer.shutdown()
}
}

object TrinoStatement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.engine.trino

import java.util.OptionalDouble
import java.util.concurrent.TimeUnit._

import io.airlift.units.DataSize
Expand All @@ -35,7 +36,8 @@ object TrinoStatusPrinter {
def printStatusInfo(
client: StatementClient,
operationLog: OperationLog,
debug: Boolean = false): Unit = {
debug: Boolean = false,
lastStats: OptionalDouble = null): OptionalDouble = {
val out = new TrinoConsoleProgressBar(operationLog)
val results =
if (client.isRunning) {
Expand All @@ -46,11 +48,16 @@ object TrinoStatusPrinter {

val stats = results.getStats

if (lastStats != null &&
stats.getProgressPercentage.equals(lastStats)) {
return lastStats
}

val wallTime = Duration.succinctDuration(stats.getElapsedTimeMillis(), MILLISECONDS)

val nodes = stats.getNodes
if ((nodes == 0) || (stats.getTotalSplits == 0)) {
return
return stats.getProgressPercentage
}

// Query 12, FINISHED, 1 node
Expand Down Expand Up @@ -122,6 +129,7 @@ object TrinoStatusPrinter {
s"[${formatCountRate(stats.getProcessedRows(), wallTime, false)} rows/s, " +
s"${formatDataRate(DataSize.ofBytes(stats.getProcessedBytes()), wallTime, true)}]"
out.printLine(statsLine)
stats.getProgressPercentage
}

def percentage(count: Double, total: Double): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
trinoStatement.stopPrinter()
shutdownTimeoutMonitor()
}
}
Expand Down

0 comments on commit 8056235

Please sign in to comment.