Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make thread stopping in ThreadBasedExecutor more safe #2503

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions utbot-core/src/main/kotlin/org/utbot/common/ThreadUtil.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.utbot.common

import java.util.WeakHashMap
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
import kotlin.properties.ReadOnlyProperty
import kotlin.random.Random
import kotlin.reflect.KProperty


Expand All @@ -25,15 +27,34 @@ class ThreadBasedExecutor {
val threadLocal by threadLocalLazy { ThreadBasedExecutor() }
}

// there's no `WeakHashSet`, so we use `WeakHashMap` with dummy values
private val timedOutThreads = WeakHashMap<Thread, Unit>()
/**
* Used to avoid calling [Thread.stop] during clean up.
*
* @see runCleanUpIfTimedOut
*/
private val timeOutCleanUpLock = ReentrantLock()

/**
* `null` when either:
* - no tasks have yet been run
* - current task timed out, and we are waiting for its thread to die
*/
@Volatile
private var thread: Thread? = null

private var requestQueue = ArrayBlockingQueue<() -> Any?>(1)
private var responseQueue = ArrayBlockingQueue<Result<Any?>>(1)

fun isCurrentThreadTimedOut(): Boolean =
Thread.currentThread() in timedOutThreads
/**
* Can be called from lambda passed to [invokeWithTimeout].
* [ThreadBasedExecutor] guarantees that it won't attempt to terminate [cleanUpBlock] with [Thread.stop].
*/
fun runCleanUpIfTimedOut(cleanUpBlock: () -> Unit) {
timeOutCleanUpLock.withLock {
if (thread == null)
cleanUpBlock()
}
}

/**
* Invoke [action] with timeout.
Expand Down Expand Up @@ -64,16 +85,22 @@ class ThreadBasedExecutor {
if (res == null) {
try {
val t = thread ?: return res
timedOutThreads[t] = Unit
thread = null
t.interrupt()
t.join(10)
if (t.isAlive)
@Suppress("DEPRECATION")
t.stop()
// to avoid race condition we need to wait for `t` to die
while (t.isAlive) {
timeOutCleanUpLock.withLock {
@Suppress("DEPRECATION")
t.stop()
}
// If somebody catches `ThreadDeath`, for now we
// just wait for at most 10s and throw another one.
//
// A better approach may be to kill instrumented process.
t.join(10_000)
}
} catch (_: Throwable) {}

thread = null

}
return res
}
Expand All @@ -90,9 +117,9 @@ class ThreadBasedExecutor {
requestQueue = ArrayBlockingQueue<() -> Any?>(1)
responseQueue = ArrayBlockingQueue<Result<Any?>>(1)

thread = thread(name = "executor", isDaemon = true) {
thread = thread(name = "executor @${Random.nextInt(10_000)}", isDaemon = true) {
try {
while (true) {
while (thread === Thread.currentThread()) {
val next = requestQueue.take()
responseQueue.offer(kotlin.runCatching { next() })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class PhasesController(
try {
phase.block()
} finally {
if (executor.isCurrentThreadTimedOut())
executor.runCleanUpIfTimedOut {
instrumentationContext.onPhaseTimeout(phase)
}
}
}
} ?: throw TimeoutException("Timeout $timeoutForCurrentPhase ms for phase ${phase.javaClass.simpleName} elapsed, controller timeout - $timeout")
Expand Down