From 639822f0d0fe4727994d576bdf82467d526149be Mon Sep 17 00:00:00 2001 From: Ao Li Date: Thu, 28 Mar 2024 12:16:23 -0400 Subject: [PATCH] Handle Deadlocks. --- .../cmu/pasta/sfuzz/core/GlobalContext.kt | 60 ++++++++++++------- .../cmu/pasta/sfuzz/core/RuntimeDelegate.kt | 32 +++++++--- .../cmu/pasta/sfuzz/core/ThreadContext.kt | 7 +-- .../core/concurrency/locks/LockManager.kt | 14 +++++ 4 files changed, 78 insertions(+), 35 deletions(-) diff --git a/core/src/main/kotlin/cmu/pasta/sfuzz/core/GlobalContext.kt b/core/src/main/kotlin/cmu/pasta/sfuzz/core/GlobalContext.kt index 2758fbba..641f0d89 100644 --- a/core/src/main/kotlin/cmu/pasta/sfuzz/core/GlobalContext.kt +++ b/core/src/main/kotlin/cmu/pasta/sfuzz/core/GlobalContext.kt @@ -72,6 +72,7 @@ object GlobalContext { for (thread in registeredThreads.values) { if (thread.state == ThreadState.Paused) { thread.state = ThreadState.Enabled + lockManager.threadUnblockedDueToDeadlock(thread.thread) break } } @@ -96,9 +97,9 @@ object GlobalContext { fun done(result: AnalysisResult) { loggers.forEach { it.executionDone(result) } - lockManager.done() assert(lockManager.waitingThreads.isEmpty()) assert(syncManager.synchronizationPoints.isEmpty()) + lockManager.done() registeredThreads.clear() scheduler.done() } @@ -138,7 +139,7 @@ object GlobalContext { val t = Thread.currentThread() if (!registeredThreads[t.id]!!.unparkSignaled) { registeredThreads[t.id]?.pendingOperation = PausedOperation() - registeredThreads[t.id]?.state = ThreadState.Parked + registeredThreads[t.id]?.state = ThreadState.Paused scheduleNextOperation(false) } else { registeredThreads[t.id]!!.unparkSignaled = false @@ -153,14 +154,14 @@ object GlobalContext { if (context.state == ThreadState.Running) { return } - assert(context.state == ThreadState.Parked) + assert(context.state == ThreadState.Paused) syncManager.signal(t) context.block() } fun threadUnpark(t: Thread) { val context = registeredThreads[t.id]!! - if (context.state != ThreadState.Parked) { + if (context.state != ThreadState.Paused) { context.unparkSignaled = true } else { syncManager.createWait(t, 1) @@ -170,7 +171,7 @@ object GlobalContext { } fun threadUnparkDone(t: Thread) { - if (registeredThreads[t.id]!!.state == ThreadState.Parked) { + if (registeredThreads[t.id]!!.state == ThreadState.Paused) { // SFuzz only needs to wait if `t` is parked and then // waken up by this `unpark` operation. syncManager.wait(t) @@ -193,6 +194,7 @@ object GlobalContext { } objectNotifyAll(t) registeredThreads[t.id]?.state = ThreadState.Completed + lockManager.threadUnblockedDueToDeadlock(t) // We do not want to send notify all because // we don't have monitor lock here. var size = 0 @@ -216,20 +218,7 @@ object GlobalContext { unlockImpl(t, t.id, false, false, true) syncManager.synchronizationPoints.remove(System.identityHashCode(t)) } - try { - scheduleNextOperation(false) - } catch (e: TargetTerminateException) { - // If deadlock detected let's try to unblock one thread and continue. - if (e.status == -1) { - for (thread in registeredThreads.values) { - if (thread.state == ThreadState.Paused) { - thread.state = ThreadState.Running - thread.unblock() - break - } - } - } - } + scheduleNextOperationAndCheckDeadlock(false) } } @@ -251,9 +240,10 @@ object GlobalContext { context.pendingOperation = ThreadResumeOperation() context.state = ThreadState.Enabled } else { - lockManager.addWaitingThread(waitingObject, Thread.currentThread()) context.pendingOperation = PausedOperation() context.state = ThreadState.Paused + checkDeadLock() + lockManager.addWaitingThread(waitingObject, Thread.currentThread()) } unlockImpl(lockObject, t, true, true, lockObject == waitingObject) @@ -266,7 +256,7 @@ object GlobalContext { Thread.yield() } lockUnlockDone(lockObject) - scheduleNextOperation(false) + scheduleNextOperationAndCheckDeadlock(false) } } @@ -296,6 +286,7 @@ object GlobalContext { // We want to also catch interrupt exception here. } } + lockManager.threadWaitsFor.remove(t.id) // If a thread is enabled, the lock must be available. assert(lockManager.lock(lockObject, t.id, false, true)) context.checkInterrupt() @@ -551,11 +542,12 @@ object GlobalContext { registeredThreads[t]?.pendingOperation = PausedOperation() registeredThreads[t]?.state = ThreadState.Paused } + checkDeadLock() executor.submit { while (registeredThreads[t]!!.thread.state == Thread.State.RUNNABLE) { Thread.yield() } - scheduleNextOperation(false) + scheduleNextOperationAndCheckDeadlock(false) } } @@ -587,6 +579,30 @@ object GlobalContext { } } + fun scheduleNextOperationAndCheckDeadlock(shouldBlockCurrentThread: Boolean) { + try { + scheduleNextOperation(shouldBlockCurrentThread) + } catch (e: TargetTerminateException) { + for (thread in registeredThreads.values) { + if (thread.state == ThreadState.Paused) { + thread.state = ThreadState.Enabled + lockManager.threadUnblockedDueToDeadlock(thread.thread) + scheduleNextOperation(shouldBlockCurrentThread) + break + } + } + } + } + + fun checkDeadLock() { + val deadLock = registeredThreads.values.toList().none { it.schedulable() } + if (deadLock) { + registeredThreads[Thread.currentThread().id]!!.state = ThreadState.Enabled + lockManager.threadUnblockedDueToDeadlock(Thread.currentThread()) + throw TargetTerminateException(-1) + } + } + fun scheduleNextOperation(shouldBlockCurrentThread: Boolean) { // Our current design makes sure that reschedule is only called // by scheduled thread. diff --git a/core/src/main/kotlin/cmu/pasta/sfuzz/core/RuntimeDelegate.kt b/core/src/main/kotlin/cmu/pasta/sfuzz/core/RuntimeDelegate.kt index 90b0b7a6..2da92fc6 100644 --- a/core/src/main/kotlin/cmu/pasta/sfuzz/core/RuntimeDelegate.kt +++ b/core/src/main/kotlin/cmu/pasta/sfuzz/core/RuntimeDelegate.kt @@ -43,12 +43,17 @@ class RuntimeDelegate : Delegate() { } override fun onThreadStart(t: Thread) { - if (checkEntered()) return + if (checkEntered()) { + skipFunctionEntered.set(1 + skipFunctionEntered.get()) + return + } GlobalContext.threadStart(t) + skipFunctionEntered.set(1 + skipFunctionEntered.get()) entered.set(false) } override fun onThreadStartDone(t: Thread) { + skipFunctionEntered.set(skipFunctionEntered.get() - 1) if (checkEntered()) return GlobalContext.threadStartDone(t) entered.set(false) @@ -68,8 +73,11 @@ class RuntimeDelegate : Delegate() { override fun onObjectWait(o: Any) { if (checkEntered()) return - GlobalContext.objectWait(o) - entered.set(false) + try { + GlobalContext.objectWait(o) + } finally { + entered.set(false) + } } override fun onObjectWaitDone(o: Any) { @@ -166,9 +174,12 @@ class RuntimeDelegate : Delegate() { skipFunctionEntered.set(1 + skipFunctionEntered.get()) return } - GlobalContext.conditionAwait(o) - entered.set(false) - skipFunctionEntered.set(1 + skipFunctionEntered.get()) + try { + GlobalContext.conditionAwait(o) + } finally { + entered.set(false) + skipFunctionEntered.set(1 + skipFunctionEntered.get()) + } } override fun onConditionAwaitDone(o: Condition) { @@ -353,9 +364,12 @@ class RuntimeDelegate : Delegate() { skipFunctionEntered.set(1 + skipFunctionEntered.get()) return } - GlobalContext.latchAwait(latch) - entered.set(false) - skipFunctionEntered.set(skipFunctionEntered.get() + 1) + try { + GlobalContext.latchAwait(latch) + } finally { + entered.set(false) + skipFunctionEntered.set(skipFunctionEntered.get() + 1) + } } override fun onLatchAwaitDone(latch: CountDownLatch) { diff --git a/core/src/main/kotlin/cmu/pasta/sfuzz/core/ThreadContext.kt b/core/src/main/kotlin/cmu/pasta/sfuzz/core/ThreadContext.kt index c6fcfbaa..eb7b1758 100644 --- a/core/src/main/kotlin/cmu/pasta/sfuzz/core/ThreadContext.kt +++ b/core/src/main/kotlin/cmu/pasta/sfuzz/core/ThreadContext.kt @@ -8,14 +8,11 @@ enum class ThreadState { Enabled, Running, Paused, - Parked, Completed, - // Thread is started but not yet available. - STARTED, } class ThreadContext(val thread: Thread, val index: Int) { - var state = ThreadState.STARTED + var state = ThreadState.Paused var unparkSignaled = false var interruptSignaled = false @@ -31,6 +28,8 @@ class ThreadContext(val thread: Thread, val index: Int) { sync.block() } + fun schedulable() = state == ThreadState.Enabled || state == ThreadState.Running + fun unblock() { if (sync.isBlocked) { sync.unblock() diff --git a/core/src/main/kotlin/cmu/pasta/sfuzz/core/concurrency/locks/LockManager.kt b/core/src/main/kotlin/cmu/pasta/sfuzz/core/concurrency/locks/LockManager.kt index 2cd5f466..80629648 100644 --- a/core/src/main/kotlin/cmu/pasta/sfuzz/core/concurrency/locks/LockManager.kt +++ b/core/src/main/kotlin/cmu/pasta/sfuzz/core/concurrency/locks/LockManager.kt @@ -11,9 +11,19 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock class LockManager { val lockContextManager = ReferencedContextManager { ReentrantLockContext() } val waitingThreads = mutableMapOf>() + val threadWaitsFor = mutableMapOf() val conditionToLock = mutableMapOf() val lockToConditions = mutableMapOf>() + fun threadUnblockedDueToDeadlock(t: Thread) { + val id = threadWaitsFor[t.id] ?: return + waitingThreads[id]?.remove(t.id) + if (waitingThreads[id]?.isEmpty() == true) { + waitingThreads.remove(id) + } + threadWaitsFor.remove(t.id) + } + fun getLockContext(lock: Any): LockContext { return lockContextManager.getLockContext(lock) } @@ -38,13 +48,16 @@ class LockManager { if (id !in waitingThreads) { waitingThreads[id] = mutableListOf() } + assert(t.id !in waitingThreads[id]!!) waitingThreads[id]!!.add(t.id) + threadWaitsFor[t.id] = id } // TODO(aoli): can we merge this logic with `objectNotifyImply`? fun threadInterruptDuringObjectWait(waitingObject: Any, lockObject: Any, context: ThreadContext) { val id = System.identityHashCode(waitingObject) val lockContext = getLockContext(lockObject) + threadWaitsFor.remove(context.thread.id) waitingThreads[id]?.remove(context.thread.id) if (waitingThreads[id]?.isEmpty() == true) { waitingThreads.remove(id) @@ -83,6 +96,7 @@ class LockManager { fun done() { assert(waitingThreads.isEmpty()) + assert(threadWaitsFor.isEmpty()) conditionToLock.clear() lockToConditions.clear() lockContextManager.done()