Skip to content

Commit

Permalink
Fix deadlock caused by spurious wakeups. (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
aoli-al authored Nov 5, 2024
1 parent ff39159 commit fb21845
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 55 deletions.
53 changes: 38 additions & 15 deletions core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ class RunContext(val config: Configuration) {
thread.pendingOperation =
when (pendingOperation) {
is ObjectWaitBlock -> {
ObjectWakeBlocked(pendingOperation.o, false)
ObjectWakeBlocked(pendingOperation.o, true)
}
is ConditionAwaitBlocked -> {
ConditionWakeBlocked(pendingOperation.condition, false)
ConditionWakeBlocked(
pendingOperation.condition, pendingOperation.canInterrupt, true)
}
is ObjectWakeBlocked -> {
pendingOperation
Expand Down Expand Up @@ -311,6 +312,7 @@ class RunContext(val config: Configuration) {
context.state = ThreadState.Enabled
scheduleNextOperation(true)

context.pendingOperation = ThreadResumeOperation(true)
// First we need to check if current thread is interrupted.
if (canInterrupt) {
context.checkInterrupt()
Expand Down Expand Up @@ -379,11 +381,11 @@ class RunContext(val config: Configuration) {
// We will unblock here only if the scheduler
// decides to run it.
while (context.state != ThreadState.Running) {
syncManager.signal(lockObject)
try {
if (context.interruptSignaled) {
Thread.interrupted()
}
syncManager.signal(lockObject)
if (waitingObject is Condition) {
// TODO(aoli): Is this necessary?
if (canInterrupt) {
Expand All @@ -400,19 +402,19 @@ class RunContext(val config: Configuration) {
}
// If a thread is enabled, the lock must be available.
assert(lockManager.lock(lockObject, context, false, true, false))
val pendingOperation = context.pendingOperation
assert(pendingOperation is ThreadResumeOperation)
if (canInterrupt) {
context.checkInterrupt()
}
val pendingOperation = context.pendingOperation
assert(pendingOperation is ThreadResumeOperation)
return (pendingOperation as ThreadResumeOperation).noTimeout
}

fun threadInterrupt(t: Thread) {
val context = registeredThreads[t.id]!!
context.interruptSignaled = true

if (context.state == ThreadState.Running || context.state == ThreadState.Enabled) {
if (context.state == ThreadState.Running) {
return
}

Expand All @@ -422,24 +424,31 @@ class RunContext(val config: Configuration) {
is ObjectWaitBlock -> {
if (lockManager.objectWaitUnblockedWithoutNotify(
pendingOperation.o, pendingOperation.o, context, false)) {
syncManager.createWait(pendingOperation.o, 1)
waitingObject = pendingOperation.o
}
}
is ObjectWakeBlocked -> {
if (context.state == ThreadState.Enabled) {
waitingObject = pendingOperation.o
}
}
is ConditionWakeBlocked -> {
if (pendingOperation.canInterrupt && context.state == ThreadState.Enabled) {
waitingObject = lockManager.lockFromCondition(pendingOperation.condition)
}
}
is ConditionAwaitBlocked -> {
if (pendingOperation.canInterrupt) {
val lock = lockManager.lockFromCondition(pendingOperation.condition)
if (lockManager.objectWaitUnblockedWithoutNotify(
pendingOperation.condition, lock, context, false)) {
syncManager.createWait(lock, 1)
waitingObject = lock
}
}
}
is CountDownLatchAwaitBlocking -> {
context.pendingOperation = ThreadResumeOperation(true)
context.state = ThreadState.Enabled
syncManager.createWait(pendingOperation.latch, 1)
waitingObject = pendingOperation.latch
}
is ParkBlocked -> {
Expand All @@ -452,6 +461,7 @@ class RunContext(val config: Configuration) {
}

if (waitingObject != null) {
syncManager.createWait(waitingObject, 1)
registeredThreads[Thread.currentThread().id]!!.pendingOperation =
InterruptPendingOperation(waitingObject)
}
Expand Down Expand Up @@ -524,9 +534,13 @@ class RunContext(val config: Configuration) {
val context = registeredThreads[t]!!
lockManager.addWakingThread(lockObject, context)
if (waitingObject == lockObject) {
context.pendingOperation = ObjectWakeBlocked(waitingObject, false)
context.pendingOperation = ObjectWakeBlocked(waitingObject, true)
} else {
context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, false)
context.pendingOperation =
ConditionWakeBlocked(
waitingObject as Condition,
(context.pendingOperation as ConditionAwaitBlocked).canInterrupt,
true)
}
it.remove(t)
if (it.size == 0) {
Expand Down Expand Up @@ -554,9 +568,13 @@ class RunContext(val config: Configuration) {
// We cannot enable the thread immediately because
// the thread is still waiting for the monitor lock.
if (waitingObject == lockObject) {
context.pendingOperation = ObjectWakeBlocked(waitingObject, false)
context.pendingOperation = ObjectWakeBlocked(waitingObject, true)
} else {
context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, false)
context.pendingOperation =
ConditionWakeBlocked(
waitingObject as Condition,
(context.pendingOperation as ConditionAwaitBlocked).canInterrupt,
true)
}
lockManager.addWakingThread(lockObject, context)
}
Expand Down Expand Up @@ -828,10 +846,12 @@ class RunContext(val config: Configuration) {
val pendingOperation = thread.pendingOperation
when (pendingOperation) {
is ObjectWaitBlock -> {
thread.pendingOperation = ObjectWakeBlocked(pendingOperation.o, false)
thread.pendingOperation = ObjectWakeBlocked(pendingOperation.o, true)
}
is ConditionAwaitBlocked -> {
thread.pendingOperation = ConditionWakeBlocked(pendingOperation.condition, false)
thread.pendingOperation =
ConditionWakeBlocked(
pendingOperation.condition, pendingOperation.canInterrupt, true)
}
is CountDownLatchAwaitBlocking -> {
val releasedThreads = latchManager.release(pendingOperation.latch)
Expand Down Expand Up @@ -917,6 +937,9 @@ class RunContext(val config: Configuration) {
.toList()
.filter { it.state == ThreadState.Enabled }
.sortedBy { it.thread.id }
if (mainExiting && (currentThreadId == mainThreadId || enabledOperations.size > 1)) {
enabledOperations = enabledOperations.filter { it.thread.id != mainThreadId }
}
}

if (enabledOperations.isEmpty()) {
Expand Down
100 changes: 68 additions & 32 deletions core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
}

override fun onLatchAwaitTimeout(latch: CountDownLatch, timeout: Long, unit: TimeUnit): Boolean {
if (onLatchAwaitImpl(latch, true)) {
onSkipMethodDone("Latch.await")
return latch.await(timeout, unit)
}
try {
if (onLatchAwaitImpl(latch, true)) {
onSkipMethodDone("Latch.await")
return latch.await(timeout, unit)
}
latch.await()
} catch (e: InterruptedException) {
// Do nothing
Expand Down Expand Up @@ -692,8 +692,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
LockSupport.parkNanos(nanos)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
try {
LockSupport.park()
} finally {
onThreadParkDoneImpl(true)
}
}

override fun onThreadParkUntil(deadline: Long) {
Expand All @@ -702,8 +705,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
LockSupport.parkUntil(deadline)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
try {
LockSupport.park()
} finally {
onThreadParkDoneImpl(true)
}
}

override fun onThreadParkNanosWithBlocker(blocker: Any?, nanos: Long) {
Expand All @@ -712,51 +718,81 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
LockSupport.parkNanos(blocker, nanos)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
try {
LockSupport.park()
} finally {
onThreadParkDoneImpl(true)
}
}

override fun onThreadParkUntilWithBlocker(blocker: Any?, deadline: Long) {
if (onThreadParkImpl()) {
onSkipMethodDone("Thread.park")
LockSupport.parkUntil(blocker, deadline)
try {
LockSupport.parkUntil(blocker, deadline)
} finally {
onSkipMethodDone("Thread.park")
}
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
try {
LockSupport.park()
} finally {
onThreadParkDoneImpl(true)
}
}

override fun onConditionAwaitTime(o: Condition, time: Long, unit: TimeUnit): Boolean {
if (onConditionAwaitImpl(o, true, true)) {
val result = o.await(time, unit)
onSkipMethodDone("Condition.await")
return result
try {
if (onConditionAwaitImpl(o, true, true)) {
try {
return o.await(time, unit)
} finally {
onSkipMethodDone("Condition.await")
}
}
o.await()
} catch (e: Throwable) {
onConditionAwaitDoneImpl(o, true)
throw e
}
o.await()
return onConditionAwaitDoneImpl(o, true)
}

override fun onConditionAwaitNanos(o: Condition, nanos: Long): Long {
if (onConditionAwaitImpl(o, true, true)) {
val result = o.awaitNanos(nanos)
onSkipMethodDone("Condition.await")
return result
try {
if (onConditionAwaitImpl(o, true, true)) {
try {
return o.awaitNanos(nanos)
} finally {
onSkipMethodDone("Condition.await")
}
}
o.await()
} catch (e: Throwable) {
onConditionAwaitDoneImpl(o, true)
throw e
}
o.await()
if (onConditionAwaitDoneImpl(o, true)) {
return 0
return if (onConditionAwaitDoneImpl(o, true)) {
0
} else {
return nanos - 1
nanos - 1
}
}

override fun onConditionAwaitUntil(o: Condition, deadline: Date): Boolean {
if (onConditionAwaitImpl(o, true, true)) {
val result = o.awaitUntil(deadline)
onSkipMethodDone("Condition.await")
return result
try {
if (onConditionAwaitImpl(o, true, true)) {
try {
return o.awaitUntil(deadline)
} finally {
onSkipMethodDone("Condition.await")
}
}
o.await()
} catch (e: Throwable) {
onConditionAwaitDoneImpl(o, true)
throw e
}
o.await()
return onConditionAwaitDoneImpl(o, true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
import org.pastalab.fray.core.ThreadContext
import org.pastalab.fray.core.ThreadState
import org.pastalab.fray.core.concurrency.operations.ConditionAwaitBlocked
import org.pastalab.fray.core.concurrency.operations.ConditionWakeBlocked
import org.pastalab.fray.core.concurrency.operations.ObjectWakeBlocked

Expand Down Expand Up @@ -87,7 +88,11 @@ class LockManager {
if (waitingObject == lockObject) {
context.pendingOperation = ObjectWakeBlocked(waitingObject, !isTimeout)
} else {
context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, !isTimeout)
context.pendingOperation =
ConditionWakeBlocked(
waitingObject as Condition,
(context.pendingOperation as ConditionAwaitBlocked).canInterrupt,
!isTimeout)
}
if (lockContext.canLock(context.thread.id)) {
context.state = ThreadState.Enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ package org.pastalab.fray.core.concurrency.operations

import java.util.concurrent.locks.Condition

class ConditionWakeBlocked(val condition: Condition, val noTimeout: Boolean) :
NonRacingOperation() {}
class ConditionWakeBlocked(
val condition: Condition,
val canInterrupt: Boolean,
val noTimeout: Boolean
) : NonRacingOperation() {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package org.pastalab.fray.core.observers

import java.io.File
import kotlinx.serialization.json.Json
import org.pastalab.fray.core.ThreadContext

class ScheduleVerifier(val schedules: List<ScheduleRecording>) : ScheduleObserver {
constructor(
path: String
) : this(Json.decodeFromString<List<ScheduleRecording>>(File(path).readText()))

var index = 0

override fun onExecutionStart() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.pastalab.fray.test.success.condition;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionAwaitTimeoutInterrupt {
public static void main(String[] args) {
Lock l = new ReentrantLock();
Condition c = l.newCondition();
AtomicBoolean flag = new AtomicBoolean(false);
Thread t = new Thread(() -> {
l.lock();
try {
c.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
flag.set(true);
}
l.unlock();
});
t.start();
Thread.yield();
t.interrupt();
l.lock();
if (!t.isInterrupted()) {
assert(flag.get());
}
l.unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static void main(String[] args) {
l.lock();
try {
boolean result = c.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS);
assert(!result);
assert(result == false);

boolean result2 = c.awaitUntil(new java.util.Date(System.currentTimeMillis() + 1000));
assert(!result2);
Expand Down
Loading

0 comments on commit fb21845

Please sign in to comment.