Skip to content

Commit

Permalink
Use proper interrupt support on IO
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Oct 11, 2023
1 parent 06de816 commit 6c39236
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
2 changes: 1 addition & 1 deletion radar-commons-kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ First of all, it contains a cache implementation to use with coroutines, with su
import java.io.IOException

val dirCache = CachedMap<String, FileInfo> {
withContext(Dispatchers.IO) {
runInterruptible(Dispatchers.IO) {
client.fetchDirectoryMap()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
Expand Down Expand Up @@ -49,15 +49,13 @@ suspend fun <T> Future<T>.suspendGet(
}
}
try {
withContext(Dispatchers.IO) {
runInterruptible(Dispatchers.IO) {
if (duration != null) {
get(duration.inWholeMilliseconds, TimeUnit.MILLISECONDS)
} else {
get()
}
}
} catch (ex: InterruptedException) {
throw CancellationException("Future was interrupted", ex)
} finally {
channel.send(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package org.radarbase.mock.data

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
Expand Down Expand Up @@ -56,7 +56,7 @@ class MockRecordValidatorTest {
fun validateEnum() = runTest {
val config = makeConfig()
config.valueSchema = ApplicationServerStatus::class.java.name
withContext(Dispatchers.IO) {
runInterruptible(Dispatchers.IO) {
config.getDataFile(root).bufferedWriter().use { writer ->
writer.append("key.projectId,key.userId,key.sourceId,value.time,value.serverStatus,value.ipAddress\n")
writer.append("test,a,b,1,UNKNOWN,\n")
Expand Down Expand Up @@ -199,7 +199,7 @@ class MockRecordValidatorTest {

private suspend fun writeConfig(write: Writer.() -> Unit): MockDataConfig {
val config = makeConfig()
withContext(Dispatchers.IO) {
runInterruptible(Dispatchers.IO) {
config.getDataFile(root).bufferedWriter().use(write)
}
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import io.ktor.util.reflect.TypeInfo
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -181,11 +182,11 @@ class RestKafkaSender(config: Config) : KafkaSender {
return true
}
val lastState = try {
val response = withContext(Dispatchers.IO) {
val response = scope.async {
restClient.head {
url("")
}
}
}.await()
if (response.status.isSuccess()) {
_connectionState.didConnect()
ConnectionState.State.CONNECTED
Expand Down

0 comments on commit 6c39236

Please sign in to comment.