Skip to content

Commit

Permalink
Cleanup, and make GlobalContext a class
Browse files Browse the repository at this point in the history
  • Loading branch information
aoli-al committed Aug 5, 2024
1 parent e784917 commit 7da360e
Show file tree
Hide file tree
Showing 28 changed files with 324 additions and 343 deletions.
51 changes: 24 additions & 27 deletions core/src/main/kotlin/cmu/pasta/fray/core/GlobalContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import cmu.pasta.fray.core.concurrency.locks.SemaphoreManager
import cmu.pasta.fray.core.concurrency.operations.*
import cmu.pasta.fray.core.logger.LoggerBase
import cmu.pasta.fray.core.scheduler.Choice
import cmu.pasta.fray.core.scheduler.FifoScheduler
import cmu.pasta.fray.core.scheduler.Scheduler
import cmu.pasta.fray.instrumentation.memory.VolatileManager
import cmu.pasta.fray.runtime.DeadlockException
import cmu.pasta.fray.runtime.Delegate
Expand All @@ -22,6 +20,7 @@ import java.io.PrintWriter
import java.io.StringWriter
import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.concurrent.locks.Condition
Expand All @@ -32,14 +31,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
import kotlin.system.exitProcess

// TODO(aoli): make this a class maybe?
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
object GlobalContext {
class GlobalContext(val config: Configuration) {
val registeredThreads = mutableMapOf<Long, ThreadContext>()
var currentThreadId: Long = -1
var mainThreadId: Long = -1
var scheduler: Scheduler = FifoScheduler()
var config: Configuration? = null
var bugFound: Throwable? = null
var mainExiting = false
var nanoTime = System.nanoTime()
Expand All @@ -51,7 +47,7 @@ object GlobalContext {
private var step = 0
val syncManager = SynchronizationManager()
val loggers = mutableListOf<LoggerBase>()
var executor =
var executor: ExecutorService =
Executors.newSingleThreadExecutor { r ->
object : HelperThread() {
override fun run() {
Expand All @@ -72,7 +68,7 @@ object GlobalContext {
}

fun reportError(e: Throwable) {
if (bugFound == null && !config!!.executionInfo.ignoreUnhandledExceptions) {
if (bugFound == null && !config.executionInfo.ignoreUnhandledExceptions) {
bugFound = e
val sw = StringWriter()
sw.append("Error found: ${e}\n")
Expand All @@ -92,7 +88,7 @@ object GlobalContext {
for (logger in loggers) {
logger.applicationEvent(sw.toString())
}
if (config!!.exploreMode || config!!.noExitWhenBugFound) {
if (config.exploreMode || config.noExitWhenBugFound) {
return
}
loggers.forEach { it.executionDone(bugFound != null) }
Expand Down Expand Up @@ -165,13 +161,13 @@ object GlobalContext {
}

fun done() {
loggers.forEach { it.executionDone(bugFound != null && config!!.exploreMode) }
loggers.forEach { it.executionDone(bugFound != null && config.exploreMode) }
loggers.clear()
assert(lockManager.waitingThreads.isEmpty())
assert(syncManager.synchronizationPoints.isEmpty())
lockManager.done()
registeredThreads.clear()
scheduler.done()
config.scheduler.done()
}

fun shutDown() {
Expand Down Expand Up @@ -281,7 +277,7 @@ object GlobalContext {
var size = 0
lockManager.getLockContext(t).wakingThreads.let {
for (thread in it) {
registeredThreads[thread]!!.state = ThreadState.Enabled
thread.value.state = ThreadState.Enabled
}
size = it.size
}
Expand Down Expand Up @@ -323,7 +319,7 @@ object GlobalContext {
unlockImpl(lockObject, t, true, true, lockObject == waitingObject)
checkDeadlock {
context.pendingOperation = ThreadResumeOperation()
assert(lockManager.lock(lockObject, t, false, true, false))
assert(lockManager.lock(lockObject, context, false, true, false))
syncManager.removeWait(lockObject)
context.state = ThreadState.Running
}
Expand Down Expand Up @@ -375,7 +371,7 @@ object GlobalContext {
}
}
// If a thread is enabled, the lock must be available.
assert(lockManager.lock(lockObject, t.id, false, true, false))
assert(lockManager.lock(lockObject, context, false, true, false))
if (canInterrupt) {
context.checkInterrupt()
}
Expand Down Expand Up @@ -461,7 +457,7 @@ object GlobalContext {
val t = it.removeFirst()
lockManager.threadWaitsFor.remove(t)
val context = registeredThreads[t]!!
lockManager.addWakingThread(lockObject, context.thread)
lockManager.addWakingThread(lockObject, context)
if (waitingObject == lockObject) {
context.pendingOperation = ObjectWakeBlocking(waitingObject)
} else {
Expand Down Expand Up @@ -497,7 +493,7 @@ object GlobalContext {
} else {
context.pendingOperation = ConditionWakeBlocking(waitingObject as Condition)
}
lockManager.addWakingThread(lockObject, context.thread)
lockManager.addWakingThread(lockObject, context)
}
lockManager.waitingThreads.remove(id)
}
Expand Down Expand Up @@ -545,7 +541,7 @@ object GlobalContext {
// synchronized(lock) {
// lock.unlock();
// }
while (!lockManager.lock(lock, t, shouldBlock, false, canInterrupt) && shouldBlock) {
while (!lockManager.lock(lock, context, shouldBlock, false, canInterrupt) && shouldBlock) {
context.state = ThreadState.Paused
context.pendingOperation = LockBlocking(lock)
// We want to block current thread because we do
Expand Down Expand Up @@ -588,7 +584,7 @@ object GlobalContext {
isMonitorLock: Boolean
) {
var waitingThreads =
if (lockManager.unlock(lock, tid, unlockBecauseOfWait)) {
if (lockManager.unlock(lock, tid, unlockBecauseOfWait, bugFound != null)) {
lockManager.getNumThreadsBlockBy(lock, isMonitorLock)
} else {
0
Expand Down Expand Up @@ -649,7 +645,8 @@ object GlobalContext {
context.state = ThreadState.Enabled
scheduleNextOperation(true)

while (!semaphoreManager.acquire(sem, permits, shouldBlock, canInterrupt) && shouldBlock) {
while (!semaphoreManager.acquire(sem, permits, shouldBlock, canInterrupt, context) &&
shouldBlock) {
context.state = ThreadState.Paused

scheduleNextOperation(true)
Expand All @@ -672,7 +669,7 @@ object GlobalContext {
}

fun fieldOperation(obj: Any?, owner: String, name: String, type: MemoryOpType) {
if (!config!!.executionInfo.interleaveMemoryOps && !volatileManager.isVolatile(owner, name))
if (!config.executionInfo.interleaveMemoryOps && !volatileManager.isVolatile(owner, name))
return
val objIds = mutableListOf<Int>()
if (obj != null) {
Expand All @@ -690,7 +687,7 @@ object GlobalContext {
}

fun arrayOperation(obj: Any, index: Int, type: MemoryOpType) {
if (!config!!.executionInfo.interleaveMemoryOps) return
if (!config.executionInfo.interleaveMemoryOps) return
val objId = System.identityHashCode(obj)
memoryOperation((31 * objId) + index, type)
}
Expand All @@ -708,9 +705,9 @@ object GlobalContext {
}

fun latchAwait(latch: CountDownLatch) {
if (latchManager.await(latch, true)) {
val t = Thread.currentThread().id
val context = registeredThreads[t]!!
val t = Thread.currentThread().id
val context = registeredThreads[t]!!
if (latchManager.await(latch, true, context)) {
context.pendingOperation = CountDownLatchAwaitBlocking(latch)
context.state = ThreadState.Paused
checkDeadlock {
Expand Down Expand Up @@ -847,7 +844,7 @@ object GlobalContext {
}

step += 1
if (config!!.executionInfo.maxScheduledStep in 1 ..< step &&
if (config.executionInfo.maxScheduledStep in 1 ..< step &&
!currentThread.isExiting &&
Thread.currentThread() !is HelperThread &&
!(mainExiting && currentThreadId == mainThreadId)) {
Expand All @@ -857,11 +854,11 @@ object GlobalContext {
throw e
}

val nextThread = scheduler.scheduleNextOperation(enabledOperations)
val nextThread = config.scheduler.scheduleNextOperation(enabledOperations)
val index = enabledOperations.indexOf(nextThread)
currentThreadId = nextThread.thread.id

if (enabledOperations.size > 1 || config!!.fullSchedule) {
if (enabledOperations.size > 1 || config.fullSchedule) {
loggers.forEach {
it.newOperationScheduled(
nextThread.pendingOperation,
Expand Down
Loading

0 comments on commit 7da360e

Please sign in to comment.