diff --git a/core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt b/core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt index 99fb019..8564b29 100644 --- a/core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt +++ b/core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt @@ -123,10 +123,11 @@ class RunContext(val config: Configuration) { thread.pendingOperation = when (pendingOperation) { is ObjectWaitBlock -> { - ObjectWakeBlocked(pendingOperation.o, false) + ObjectWakeBlocked(pendingOperation.o, true) } is ConditionAwaitBlocked -> { - ConditionWakeBlocked(pendingOperation.condition, false) + ConditionWakeBlocked( + pendingOperation.condition, pendingOperation.canInterrupt, true) } is ObjectWakeBlocked -> { pendingOperation @@ -311,6 +312,7 @@ class RunContext(val config: Configuration) { context.state = ThreadState.Enabled scheduleNextOperation(true) + context.pendingOperation = ThreadResumeOperation(true) // First we need to check if current thread is interrupted. if (canInterrupt) { context.checkInterrupt() @@ -379,11 +381,11 @@ class RunContext(val config: Configuration) { // We will unblock here only if the scheduler // decides to run it. while (context.state != ThreadState.Running) { - syncManager.signal(lockObject) try { if (context.interruptSignaled) { Thread.interrupted() } + syncManager.signal(lockObject) if (waitingObject is Condition) { // TODO(aoli): Is this necessary? if (canInterrupt) { @@ -400,11 +402,11 @@ class RunContext(val config: Configuration) { } // If a thread is enabled, the lock must be available. assert(lockManager.lock(lockObject, context, false, true, false)) + val pendingOperation = context.pendingOperation + assert(pendingOperation is ThreadResumeOperation) if (canInterrupt) { context.checkInterrupt() } - val pendingOperation = context.pendingOperation - assert(pendingOperation is ThreadResumeOperation) return (pendingOperation as ThreadResumeOperation).noTimeout } @@ -412,7 +414,7 @@ class RunContext(val config: Configuration) { val context = registeredThreads[t.id]!! context.interruptSignaled = true - if (context.state == ThreadState.Running || context.state == ThreadState.Enabled) { + if (context.state == ThreadState.Running) { return } @@ -422,16 +424,24 @@ class RunContext(val config: Configuration) { is ObjectWaitBlock -> { if (lockManager.objectWaitUnblockedWithoutNotify( pendingOperation.o, pendingOperation.o, context, false)) { - syncManager.createWait(pendingOperation.o, 1) waitingObject = pendingOperation.o } } + is ObjectWakeBlocked -> { + if (context.state == ThreadState.Enabled) { + waitingObject = pendingOperation.o + } + } + is ConditionWakeBlocked -> { + if (pendingOperation.canInterrupt && context.state == ThreadState.Enabled) { + waitingObject = lockManager.lockFromCondition(pendingOperation.condition) + } + } is ConditionAwaitBlocked -> { if (pendingOperation.canInterrupt) { val lock = lockManager.lockFromCondition(pendingOperation.condition) if (lockManager.objectWaitUnblockedWithoutNotify( pendingOperation.condition, lock, context, false)) { - syncManager.createWait(lock, 1) waitingObject = lock } } @@ -439,7 +449,6 @@ class RunContext(val config: Configuration) { is CountDownLatchAwaitBlocking -> { context.pendingOperation = ThreadResumeOperation(true) context.state = ThreadState.Enabled - syncManager.createWait(pendingOperation.latch, 1) waitingObject = pendingOperation.latch } is ParkBlocked -> { @@ -452,6 +461,7 @@ class RunContext(val config: Configuration) { } if (waitingObject != null) { + syncManager.createWait(waitingObject, 1) registeredThreads[Thread.currentThread().id]!!.pendingOperation = InterruptPendingOperation(waitingObject) } @@ -524,9 +534,13 @@ class RunContext(val config: Configuration) { val context = registeredThreads[t]!! lockManager.addWakingThread(lockObject, context) if (waitingObject == lockObject) { - context.pendingOperation = ObjectWakeBlocked(waitingObject, false) + context.pendingOperation = ObjectWakeBlocked(waitingObject, true) } else { - context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, false) + context.pendingOperation = + ConditionWakeBlocked( + waitingObject as Condition, + (context.pendingOperation as ConditionAwaitBlocked).canInterrupt, + true) } it.remove(t) if (it.size == 0) { @@ -554,9 +568,13 @@ class RunContext(val config: Configuration) { // We cannot enable the thread immediately because // the thread is still waiting for the monitor lock. if (waitingObject == lockObject) { - context.pendingOperation = ObjectWakeBlocked(waitingObject, false) + context.pendingOperation = ObjectWakeBlocked(waitingObject, true) } else { - context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, false) + context.pendingOperation = + ConditionWakeBlocked( + waitingObject as Condition, + (context.pendingOperation as ConditionAwaitBlocked).canInterrupt, + true) } lockManager.addWakingThread(lockObject, context) } @@ -828,10 +846,12 @@ class RunContext(val config: Configuration) { val pendingOperation = thread.pendingOperation when (pendingOperation) { is ObjectWaitBlock -> { - thread.pendingOperation = ObjectWakeBlocked(pendingOperation.o, false) + thread.pendingOperation = ObjectWakeBlocked(pendingOperation.o, true) } is ConditionAwaitBlocked -> { - thread.pendingOperation = ConditionWakeBlocked(pendingOperation.condition, false) + thread.pendingOperation = + ConditionWakeBlocked( + pendingOperation.condition, pendingOperation.canInterrupt, true) } is CountDownLatchAwaitBlocking -> { val releasedThreads = latchManager.release(pendingOperation.latch) @@ -917,6 +937,9 @@ class RunContext(val config: Configuration) { .toList() .filter { it.state == ThreadState.Enabled } .sortedBy { it.thread.id } + if (mainExiting && (currentThreadId == mainThreadId || enabledOperations.size > 1)) { + enabledOperations = enabledOperations.filter { it.thread.id != mainThreadId } + } } if (enabledOperations.isEmpty()) { diff --git a/core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt b/core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt index af30666..f2b44e2 100644 --- a/core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt +++ b/core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt @@ -615,11 +615,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg } override fun onLatchAwaitTimeout(latch: CountDownLatch, timeout: Long, unit: TimeUnit): Boolean { - if (onLatchAwaitImpl(latch, true)) { - onSkipMethodDone("Latch.await") - return latch.await(timeout, unit) - } try { + if (onLatchAwaitImpl(latch, true)) { + onSkipMethodDone("Latch.await") + return latch.await(timeout, unit) + } latch.await() } catch (e: InterruptedException) { // Do nothing @@ -692,8 +692,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg LockSupport.parkNanos(nanos) return } - LockSupport.park() - onThreadParkDoneImpl(true) + try { + LockSupport.park() + } finally { + onThreadParkDoneImpl(true) + } } override fun onThreadParkUntil(deadline: Long) { @@ -702,8 +705,11 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg LockSupport.parkUntil(deadline) return } - LockSupport.park() - onThreadParkDoneImpl(true) + try { + LockSupport.park() + } finally { + onThreadParkDoneImpl(true) + } } override fun onThreadParkNanosWithBlocker(blocker: Any?, nanos: Long) { @@ -712,51 +718,81 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg LockSupport.parkNanos(blocker, nanos) return } - LockSupport.park() - onThreadParkDoneImpl(true) + try { + LockSupport.park() + } finally { + onThreadParkDoneImpl(true) + } } override fun onThreadParkUntilWithBlocker(blocker: Any?, deadline: Long) { if (onThreadParkImpl()) { - onSkipMethodDone("Thread.park") - LockSupport.parkUntil(blocker, deadline) + try { + LockSupport.parkUntil(blocker, deadline) + } finally { + onSkipMethodDone("Thread.park") + } return } - LockSupport.park() - onThreadParkDoneImpl(true) + try { + LockSupport.park() + } finally { + onThreadParkDoneImpl(true) + } } override fun onConditionAwaitTime(o: Condition, time: Long, unit: TimeUnit): Boolean { - if (onConditionAwaitImpl(o, true, true)) { - val result = o.await(time, unit) - onSkipMethodDone("Condition.await") - return result + try { + if (onConditionAwaitImpl(o, true, true)) { + try { + return o.await(time, unit) + } finally { + onSkipMethodDone("Condition.await") + } + } + o.await() + } catch (e: Throwable) { + onConditionAwaitDoneImpl(o, true) + throw e } - o.await() return onConditionAwaitDoneImpl(o, true) } override fun onConditionAwaitNanos(o: Condition, nanos: Long): Long { - if (onConditionAwaitImpl(o, true, true)) { - val result = o.awaitNanos(nanos) - onSkipMethodDone("Condition.await") - return result + try { + if (onConditionAwaitImpl(o, true, true)) { + try { + return o.awaitNanos(nanos) + } finally { + onSkipMethodDone("Condition.await") + } + } + o.await() + } catch (e: Throwable) { + onConditionAwaitDoneImpl(o, true) + throw e } - o.await() - if (onConditionAwaitDoneImpl(o, true)) { - return 0 + return if (onConditionAwaitDoneImpl(o, true)) { + 0 } else { - return nanos - 1 + nanos - 1 } } override fun onConditionAwaitUntil(o: Condition, deadline: Date): Boolean { - if (onConditionAwaitImpl(o, true, true)) { - val result = o.awaitUntil(deadline) - onSkipMethodDone("Condition.await") - return result + try { + if (onConditionAwaitImpl(o, true, true)) { + try { + return o.awaitUntil(deadline) + } finally { + onSkipMethodDone("Condition.await") + } + } + o.await() + } catch (e: Throwable) { + onConditionAwaitDoneImpl(o, true) + throw e } - o.await() return onConditionAwaitDoneImpl(o, true) } diff --git a/core/src/main/kotlin/org/pastalab/fray/core/concurrency/locks/LockManager.kt b/core/src/main/kotlin/org/pastalab/fray/core/concurrency/locks/LockManager.kt index 653a9dd..1245fae 100644 --- a/core/src/main/kotlin/org/pastalab/fray/core/concurrency/locks/LockManager.kt +++ b/core/src/main/kotlin/org/pastalab/fray/core/concurrency/locks/LockManager.kt @@ -6,6 +6,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock import org.pastalab.fray.core.ThreadContext import org.pastalab.fray.core.ThreadState +import org.pastalab.fray.core.concurrency.operations.ConditionAwaitBlocked import org.pastalab.fray.core.concurrency.operations.ConditionWakeBlocked import org.pastalab.fray.core.concurrency.operations.ObjectWakeBlocked @@ -87,7 +88,11 @@ class LockManager { if (waitingObject == lockObject) { context.pendingOperation = ObjectWakeBlocked(waitingObject, !isTimeout) } else { - context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, !isTimeout) + context.pendingOperation = + ConditionWakeBlocked( + waitingObject as Condition, + (context.pendingOperation as ConditionAwaitBlocked).canInterrupt, + !isTimeout) } if (lockContext.canLock(context.thread.id)) { context.state = ThreadState.Enabled diff --git a/core/src/main/kotlin/org/pastalab/fray/core/concurrency/operations/ConditionWakeBlocked.kt b/core/src/main/kotlin/org/pastalab/fray/core/concurrency/operations/ConditionWakeBlocked.kt index b0bcd43..12dfe70 100644 --- a/core/src/main/kotlin/org/pastalab/fray/core/concurrency/operations/ConditionWakeBlocked.kt +++ b/core/src/main/kotlin/org/pastalab/fray/core/concurrency/operations/ConditionWakeBlocked.kt @@ -2,5 +2,8 @@ package org.pastalab.fray.core.concurrency.operations import java.util.concurrent.locks.Condition -class ConditionWakeBlocked(val condition: Condition, val noTimeout: Boolean) : - NonRacingOperation() {} +class ConditionWakeBlocked( + val condition: Condition, + val canInterrupt: Boolean, + val noTimeout: Boolean +) : NonRacingOperation() {} diff --git a/core/src/main/kotlin/org/pastalab/fray/core/observers/ScheduleVerifier.kt b/core/src/main/kotlin/org/pastalab/fray/core/observers/ScheduleVerifier.kt index c213d06..bce48c7 100644 --- a/core/src/main/kotlin/org/pastalab/fray/core/observers/ScheduleVerifier.kt +++ b/core/src/main/kotlin/org/pastalab/fray/core/observers/ScheduleVerifier.kt @@ -1,8 +1,14 @@ package org.pastalab.fray.core.observers +import java.io.File +import kotlinx.serialization.json.Json import org.pastalab.fray.core.ThreadContext class ScheduleVerifier(val schedules: List) : ScheduleObserver { + constructor( + path: String + ) : this(Json.decodeFromString>(File(path).readText())) + var index = 0 override fun onExecutionStart() { diff --git a/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutInterrupt.java b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutInterrupt.java new file mode 100644 index 0000000..0b503f3 --- /dev/null +++ b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutInterrupt.java @@ -0,0 +1,31 @@ +package org.pastalab.fray.test.success.condition; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class ConditionAwaitTimeoutInterrupt { + public static void main(String[] args) { + Lock l = new ReentrantLock(); + Condition c = l.newCondition(); + AtomicBoolean flag = new AtomicBoolean(false); + Thread t = new Thread(() -> { + l.lock(); + try { + c.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + flag.set(true); + } + l.unlock(); + }); + t.start(); + Thread.yield(); + t.interrupt(); + l.lock(); + if (!t.isInterrupted()) { + assert(flag.get()); + } + l.unlock(); + } +} diff --git a/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNoDeadlock.java b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNoDeadlock.java index b38c997..8125f75 100644 --- a/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNoDeadlock.java +++ b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNoDeadlock.java @@ -11,7 +11,7 @@ public static void main(String[] args) { l.lock(); try { boolean result = c.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS); - assert(!result); + assert(result == false); boolean result2 = c.awaitUntil(new java.util.Date(System.currentTimeMillis() + 1000)); assert(!result2); diff --git a/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNotifyInterrupt.java b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNotifyInterrupt.java new file mode 100644 index 0000000..3a55641 --- /dev/null +++ b/integration-test/src/main/java/org/pastalab/fray/test/success/condition/ConditionAwaitTimeoutNotifyInterrupt.java @@ -0,0 +1,35 @@ +package org.pastalab.fray.test.success.condition; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class ConditionAwaitTimeoutNotifyInterrupt { + public static void main(String[] args) { + Lock l = new ReentrantLock(); + Condition c = l.newCondition(); + AtomicBoolean flag = new AtomicBoolean(false); + Thread t = new Thread(() -> { + l.lock(); + try { + c.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + flag.set(true); + } + l.unlock(); + }); + t.start(); +// Thread.yield(); + l.lock(); +// c.signal(); + l.unlock(); +// Thread.yield(); + t.interrupt(); + l.lock(); + if (!t.isInterrupted()) { + assert(flag.get()); + } + l.unlock(); + } +} diff --git a/integration-test/src/main/java/org/pastalab/fray/test/success/park/ParkTimeoutInterrupt.java b/integration-test/src/main/java/org/pastalab/fray/test/success/park/ParkTimeoutInterrupt.java new file mode 100644 index 0000000..c318cce --- /dev/null +++ b/integration-test/src/main/java/org/pastalab/fray/test/success/park/ParkTimeoutInterrupt.java @@ -0,0 +1,13 @@ +package org.pastalab.fray.test.success.park; + +import java.util.concurrent.locks.LockSupport; + +public class ParkTimeoutInterrupt { + public static void main(String[] args) { + Thread t = new Thread(() -> { + LockSupport.parkNanos(1000000000); + }); + t.start(); + t.interrupt(); + } +} diff --git a/integration-test/src/test/java/org/pastalab/fray/test/FrayTestCase.java b/integration-test/src/test/java/org/pastalab/fray/test/FrayTestCase.java index 10a7fe1..1d04768 100644 --- a/integration-test/src/test/java/org/pastalab/fray/test/FrayTestCase.java +++ b/integration-test/src/test/java/org/pastalab/fray/test/FrayTestCase.java @@ -2,21 +2,25 @@ import io.github.classgraph.ClassGraph; import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; import org.pastalab.fray.core.TestRunner; import org.pastalab.fray.core.command.Configuration; import org.pastalab.fray.core.command.ExecutionInfo; +import org.pastalab.fray.core.command.LambdaExecutor; import org.pastalab.fray.core.command.MethodExecutor; +import org.pastalab.fray.core.observers.ScheduleRecording; +import org.pastalab.fray.core.observers.ScheduleVerifier; import org.pastalab.fray.core.randomness.ControlledRandom; import org.pastalab.fray.core.scheduler.FifoScheduler; import org.pastalab.fray.core.scheduler.POSScheduler; import org.pastalab.fray.core.scheduler.RandomScheduler; import org.pastalab.fray.runtime.TargetTerminateException; +import org.pastalab.fray.test.success.condition.ConditionAwaitTimeoutInterrupt; +import org.pastalab.fray.test.success.condition.ConditionAwaitTimeoutNotifyInterrupt; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; +import java.io.File; +import java.util.*; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.DynamicTest.dynamicTest; @@ -60,6 +64,45 @@ private DynamicTest populateTest(String className, boolean testShouldFail) { }); } + @Test + public void testOne() throws Throwable { + System.setProperty("fray.recordSchedule", "true"); + Configuration config = new Configuration( + new ExecutionInfo( + new LambdaExecutor(() -> { + ConditionAwaitTimeoutNotifyInterrupt.main(new String[]{}); + return null; + }), + false, + true, + false, + -1 + ), + "/tmp/report2", + 100, + 60, + new RandomScheduler(new ControlledRandom( + new ArrayList<>(List.of(1, 1, 1, 0)), + new ArrayList<>(), + new Random() + )), + new ControlledRandom( + new ArrayList<>(List.of(0, 0, 0, 0, 0, 0)), + new ArrayList<>(), + new Random() + ), + true, + false, + true, + false, + false, + false + ); +// config.getScheduleObservers().add(new ScheduleVerifier("/tmp/report/recording_0/recording.json")); + TestRunner runner = new TestRunner(config); + runner.run(); + } + @TestFactory public List testCases() { List tests = new ArrayList<>();