Skip to content

Commit

Permalink
Make thread stopping in ThreadBasedExecutor more safe (#2503)
Browse files Browse the repository at this point in the history
Make `ThreadBasedExecutor` avoid race condition and not call `Thread.stop()` during while executing clean up callback
  • Loading branch information
IlyaMuravjov authored Aug 15, 2023
1 parent 0312936 commit 188f682
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
55 changes: 41 additions & 14 deletions utbot-core/src/main/kotlin/org/utbot/common/ThreadUtil.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.utbot.common

import java.util.WeakHashMap
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
import kotlin.properties.ReadOnlyProperty
import kotlin.random.Random
import kotlin.reflect.KProperty


Expand All @@ -25,15 +27,34 @@ class ThreadBasedExecutor {
val threadLocal by threadLocalLazy { ThreadBasedExecutor() }
}

// there's no `WeakHashSet`, so we use `WeakHashMap` with dummy values
private val timedOutThreads = WeakHashMap<Thread, Unit>()
/**
* Used to avoid calling [Thread.stop] during clean up.
*
* @see runCleanUpIfTimedOut
*/
private val timeOutCleanUpLock = ReentrantLock()

/**
* `null` when either:
* - no tasks have yet been run
* - current task timed out, and we are waiting for its thread to die
*/
@Volatile
private var thread: Thread? = null

private var requestQueue = ArrayBlockingQueue<() -> Any?>(1)
private var responseQueue = ArrayBlockingQueue<Result<Any?>>(1)

fun isCurrentThreadTimedOut(): Boolean =
Thread.currentThread() in timedOutThreads
/**
* Can be called from lambda passed to [invokeWithTimeout].
* [ThreadBasedExecutor] guarantees that it won't attempt to terminate [cleanUpBlock] with [Thread.stop].
*/
fun runCleanUpIfTimedOut(cleanUpBlock: () -> Unit) {
timeOutCleanUpLock.withLock {
if (thread == null)
cleanUpBlock()
}
}

/**
* Invoke [action] with timeout.
Expand Down Expand Up @@ -64,16 +85,22 @@ class ThreadBasedExecutor {
if (res == null) {
try {
val t = thread ?: return res
timedOutThreads[t] = Unit
thread = null
t.interrupt()
t.join(10)
if (t.isAlive)
@Suppress("DEPRECATION")
t.stop()
// to avoid race condition we need to wait for `t` to die
while (t.isAlive) {
timeOutCleanUpLock.withLock {
@Suppress("DEPRECATION")
t.stop()
}
// If somebody catches `ThreadDeath`, for now we
// just wait for at most 10s and throw another one.
//
// A better approach may be to kill instrumented process.
t.join(10_000)
}
} catch (_: Throwable) {}

thread = null

}
return res
}
Expand All @@ -90,9 +117,9 @@ class ThreadBasedExecutor {
requestQueue = ArrayBlockingQueue<() -> Any?>(1)
responseQueue = ArrayBlockingQueue<Result<Any?>>(1)

thread = thread(name = "executor", isDaemon = true) {
thread = thread(name = "executor @${Random.nextInt(10_000)}", isDaemon = true) {
try {
while (true) {
while (thread === Thread.currentThread()) {
val next = requestQueue.take()
responseQueue.offer(kotlin.runCatching { next() })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class PhasesController(
try {
phase.block()
} finally {
if (executor.isCurrentThreadTimedOut())
executor.runCleanUpIfTimedOut {
instrumentationContext.onPhaseTimeout(phase)
}
}
}
} ?: throw TimeoutException("Timeout $timeoutForCurrentPhase ms for phase ${phase.javaClass.simpleName} elapsed, controller timeout - $timeout")
Expand Down

0 comments on commit 188f682

Please sign in to comment.