Skip to content

Commit

Permalink
Support timed operations. (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
aoli-al authored Nov 4, 2024
1 parent 93796c2 commit 568d3fe
Show file tree
Hide file tree
Showing 29 changed files with 404 additions and 200 deletions.
208 changes: 141 additions & 67 deletions core/src/main/kotlin/org/pastalab/fray/core/RunContext.kt

Large diffs are not rendered by default.

205 changes: 112 additions & 93 deletions core/src/main/kotlin/org/pastalab/fray/core/RuntimeDelegate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
return result
}

override fun onObjectWait(o: Any) {
override fun onObjectWait(o: Any, timeout: Long) {
if (checkEntered()) return
try {
context.objectWait(o)
context.objectWait(o, timeout != 0L)
} finally {
entered.set(false)
}
Expand Down Expand Up @@ -254,53 +254,45 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
}

override fun onConditionAwait(o: Condition) {
if (checkEntered()) {
onSkipMethod("Condition.await")
return
}
try {
context.conditionAwait(o, true)
} finally {
entered.set(false)
onSkipMethod("Condition.await")
}
onConditionAwaitImpl(o, true, false)
}

override fun onConditionAwaitUninterruptibly(o: Condition) {
fun onConditionAwaitImpl(o: Condition, canInterrupt: Boolean, timed: Boolean): Boolean {
if (checkEntered()) {
onSkipMethod("Condition.await")
return
return true
}
try {
context.conditionAwait(o, false)
context.conditionAwait(o, canInterrupt, timed)
} finally {
entered.set(false)
onSkipMethod("Condition.await")
}
return false
}

override fun onConditionAwaitDone(o: Condition) {
fun onConditionAwaitDoneImpl(o: Condition, canInterrupt: Boolean): Boolean {
if (!onSkipMethodDone("Condition.await")) {
return
return true
}
if (checkEntered()) return
if (checkEntered()) return true
try {
context.conditionAwaitDone(o, true)
return context.conditionAwaitDone(o, canInterrupt)
} finally {
entered.set(false)
}
}

override fun onConditionAwaitUninterruptibly(o: Condition) {
onConditionAwaitImpl(o, false, false)
}

override fun onConditionAwaitDone(o: Condition) {
onConditionAwaitDoneImpl(o, true)
}

override fun onConditionAwaitUninterruptiblyDone(o: Condition) {
if (!onSkipMethodDone("Condition.await")) {
return
}
if (checkEntered()) return
try {
context.conditionAwaitDone(o, false)
} finally {
entered.set(false)
}
onConditionAwaitDoneImpl(o, false)
}

override fun onConditionSignal(o: Condition) {
Expand Down Expand Up @@ -425,24 +417,40 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
return true
}

override fun onThreadPark() {
if (checkEntered()) return
fun onThreadParkImpl(): Boolean {
if (checkEntered()) {
onSkipMethod("Thread.park")
return true
}
try {
context.threadPark()
} finally {
entered.set(false)
onSkipMethod("Thread.park")
}
return false
}

override fun onThreadParkDone() {
override fun onThreadPark() {
onThreadParkImpl()
}

fun onThreadParkDoneImpl(timed: Boolean) {
if (!onSkipMethodDone("Thread.park")) {
return
}
if (checkEntered()) return
try {
context.threadParkDone()
context.threadParkDone(timed)
} finally {
entered.set(false)
}
}

override fun onThreadParkDone() {
onThreadParkDoneImpl(false)
}

override fun onThreadUnpark(t: Thread?) {
if (t == null) return
if (checkEntered()) {
Expand Down Expand Up @@ -578,34 +586,49 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
onSkipMethodDone("Semaphore.reducePermits")
}

override fun onLatchAwait(latch: CountDownLatch) {
fun onLatchAwaitImpl(latch: CountDownLatch, timed: Boolean): Boolean {
if (checkEntered()) {
onSkipMethod("Latch.await")
return
return true
}
try {
context.latchAwait(latch)
context.latchAwait(latch, timed)
} finally {
entered.set(false)
onSkipMethod("Latch.await")
}
return false
}

override fun onLatchAwait(latch: CountDownLatch) {
onLatchAwaitImpl(latch, false)
}

fun onLatchAwaitDoneImpl(latch: CountDownLatch): Boolean {
onSkipMethodDone("Latch.await")
if (checkEntered()) return true
try {
return context.latchAwaitDone(latch)
} finally {
entered.set(false)
}
}

override fun onLatchAwaitTimeout(latch: CountDownLatch, timeout: Long, unit: TimeUnit): Boolean {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
return false
} else {
if (onLatchAwaitImpl(latch, true)) {
onSkipMethodDone("Latch.await")
return latch.await(timeout, unit)
}
try {
latch.await()
return true
} catch (e: InterruptedException) {
// Do nothing
}
return onLatchAwaitDoneImpl(latch)
}

override fun onLatchAwaitDone(latch: CountDownLatch) {
onSkipMethodDone("Latch.await")
if (checkEntered()) return
context.latchAwaitDone(latch)
entered.set(false)
onLatchAwaitDoneImpl(latch)
}

override fun onLatchCountDown(latch: CountDownLatch) {
Expand Down Expand Up @@ -664,81 +687,77 @@ class RuntimeDelegate(val context: RunContext) : org.pastalab.fray.runtime.Deleg
}

override fun onThreadParkNanos(nanos: Long) {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
} else {
LockSupport.park()
if (onThreadParkImpl()) {
onSkipMethodDone("Thread.park")
LockSupport.parkNanos(nanos)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
}

override fun onThreadParkUntil(nanos: Long) {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
} else {
LockSupport.park()
override fun onThreadParkUntil(deadline: Long) {
if (onThreadParkImpl()) {
onSkipMethodDone("Thread.park")
LockSupport.parkUntil(deadline)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
}

override fun onThreadParkNanosWithBlocker(blocker: Any?, nanos: Long) {
if (checkEntered()) {
try {
LockSupport.parkNanos(nanos)
} finally {
entered.set(false)
}
}
entered.set(false)
if (context.config.executionInfo.timedOpAsYield) {
onYield()
} else {
LockSupport.park(blocker)
if (onThreadParkImpl()) {
onSkipMethodDone("Thread.park")
LockSupport.parkNanos(blocker, nanos)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
}

override fun onThreadParkUntilWithBlocker(blocker: Any?, nanos: Long) {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
} else {
LockSupport.park(blocker)
override fun onThreadParkUntilWithBlocker(blocker: Any?, deadline: Long) {
if (onThreadParkImpl()) {
onSkipMethodDone("Thread.park")
LockSupport.parkUntil(blocker, deadline)
return
}
LockSupport.park()
onThreadParkDoneImpl(true)
}

override fun onConditionAwaitTime(o: Condition, time: Long, unit: TimeUnit): Boolean {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
return false
} else {
o.await()
return true
if (onConditionAwaitImpl(o, true, true)) {
val result = o.await(time, unit)
onSkipMethodDone("Condition.await")
return result
}
o.await()
return onConditionAwaitDoneImpl(o, true)
}

override fun onConditionAwaitNanos(o: Condition, nanos: Long): Long {
if (checkEntered()) {
try {
return o.awaitNanos(nanos)
} finally {
entered.set(false)
}
if (onConditionAwaitImpl(o, true, true)) {
val result = o.awaitNanos(nanos)
onSkipMethodDone("Condition.await")
return result
}
entered.set(false)
if (context.config.executionInfo.timedOpAsYield) {
onYield()
o.await()
if (onConditionAwaitDoneImpl(o, true)) {
return 0
} else {
o.await()
return nanos
return nanos - 1
}
}

override fun onConditionAwaitUntil(o: Condition, deadline: Date): Boolean {
if (context.config.executionInfo.timedOpAsYield) {
onYield()
return false
} else {
o.await()
return true
if (onConditionAwaitImpl(o, true, true)) {
val result = o.awaitUntil(deadline)
onSkipMethodDone("Condition.await")
return result
}
o.await()
return onConditionAwaitDoneImpl(o, true)
}

override fun onThreadIsInterrupted(result: Boolean, t: Thread): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class Sync(val goal: Int) : Any() {
fun unblock() {
count += 1
signaler.add(Thread.currentThread().name)
if (count > goal) {
println("?")
}
assert(count <= goal)
if (count == goal) {
(this as Object).notify()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CountDownLatchContext(var count: Long) : Interruptible {
override fun interrupt(tid: Long) {
val lockWaiter = latchWaiters[tid] ?: return
if (lockWaiter.canInterrupt) {
lockWaiter.thread.pendingOperation = ThreadResumeOperation()
lockWaiter.thread.pendingOperation = ThreadResumeOperation(false)
lockWaiter.thread.state = ThreadState.Enabled
latchWaiters.remove(tid)
}
Expand All @@ -35,7 +35,7 @@ class CountDownLatchContext(var count: Long) : Interruptible {
count = 0
var threads = 0
for (lockWaiter in latchWaiters.values) {
lockWaiter.thread.pendingOperation = ThreadResumeOperation()
lockWaiter.thread.pendingOperation = ThreadResumeOperation(true)
lockWaiter.thread.state = ThreadState.Enabled
threads += 1
}
Expand All @@ -55,7 +55,7 @@ class CountDownLatchContext(var count: Long) : Interruptible {
if (count == 0L) {
var threads = 0
for (lockWaiter in latchWaiters.values) {
lockWaiter.thread.pendingOperation = ThreadResumeOperation()
lockWaiter.thread.pendingOperation = ThreadResumeOperation(true)
lockWaiter.thread.state = ThreadState.Enabled
threads += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
import org.pastalab.fray.core.ThreadContext
import org.pastalab.fray.core.ThreadState
import org.pastalab.fray.core.concurrency.operations.ConditionWakeBlocking
import org.pastalab.fray.core.concurrency.operations.ObjectWakeBlocking
import org.pastalab.fray.core.concurrency.operations.ConditionWakeBlocked
import org.pastalab.fray.core.concurrency.operations.ObjectWakeBlocked

class LockManager {
val lockContextManager = ReferencedContextManager<LockContext> { ReentrantLockContext() }
Expand Down Expand Up @@ -70,10 +70,11 @@ class LockManager {
}

// TODO(aoli): can we merge this logic with `objectNotifyImply`?
fun threadInterruptDuringObjectWait(
fun objectWaitUnblockedWithoutNotify(
waitingObject: Any,
lockObject: Any,
context: ThreadContext
context: ThreadContext,
isTimeout: Boolean
): Boolean {
val id = System.identityHashCode(waitingObject)
val lockContext = getLockContext(lockObject)
Expand All @@ -84,9 +85,9 @@ class LockManager {
}
addWakingThread(lockObject, context)
if (waitingObject == lockObject) {
context.pendingOperation = ObjectWakeBlocking(waitingObject)
context.pendingOperation = ObjectWakeBlocked(waitingObject, !isTimeout)
} else {
context.pendingOperation = ConditionWakeBlocking(waitingObject as Condition)
context.pendingOperation = ConditionWakeBlocked(waitingObject as Condition, !isTimeout)
}
if (lockContext.canLock(context.thread.id)) {
context.state = ThreadState.Enabled
Expand Down
Loading

0 comments on commit 568d3fe

Please sign in to comment.