diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 779be1f..2f75e59 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -37,20 +37,20 @@ jobs: **/build/reports/ **/build/test-results/ -# test-sbt: -# runs-on: ubuntu-latest -# steps: -# - uses: actions/checkout@v4 -# - name: Set up JDK 21 -# uses: actions/setup-java@v4 -# with: -# cache: 'sbt' -# java-version: 21 -# distribution: 'temurin' -# - name: Setup Gradle -# uses: gradle/actions/setup-gradle@v3 -# - name: Setup sbt -# uses: sbt/setup-sbt@v1 -# - name: Build and test -# run: sbt -v publishLocalGradleDependencies ++test -# working-directory: ./tasks-scala + test-sbt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 21 + uses: actions/setup-java@v4 + with: + cache: 'sbt' + java-version: 21 + distribution: 'temurin' + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + - name: Setup sbt + uses: sbt/setup-sbt@v1 + - name: Build and test + run: sbt -v publishLocalGradleDependencies ++test + working-directory: ./tasks-scala diff --git a/build.gradle.kts b/build.gradle.kts index 2d4f0db..40d3151 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,6 +3,7 @@ import com.github.benmanes.gradle.versions.updates.DependencyUpdatesTask val projectVersion = property("project.version").toString() plugins { + id("org.jetbrains.dokka") id("com.github.ben-manes.versions") } @@ -10,6 +11,24 @@ repositories { mavenCentral() } +buildscript { + dependencies { + classpath("org.jetbrains.dokka:dokka-base:2.0.0") + // classpath("org.jetbrains.dokka:kotlin-as-java-plugin:2.0.0") + } +} + +//dokka { +// dokkaPublications.html { +// outputDirectory.set(rootDir.resolve("build/dokka")) +// outputDirectory.set(file("build/dokka")) +// } +//} + +tasks.dokkaHtmlMultiModule { + outputDirectory.set(file("build/dokka")) +} + tasks.named("dependencyUpdates").configure { fun isNonStable(version: String): Boolean { val stableKeyword = listOf("RELEASE", "FINAL", "GA").any { version.uppercase().contains(it) } diff --git a/gradle.properties b/gradle.properties index 98f733a..f30a13c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,3 +2,6 @@ kotlin.code.style=official # TO BE modified whenever a new version is released project.version=0.0.3 + +# https://kotlinlang.org/docs/dokka-migration.html#sync-your-project-with-gradle +org.jetbrains.dokka.experimental.gradle.pluginMode=V2EnabledWithHelpers diff --git a/settings.gradle.kts b/settings.gradle.kts index 98f724f..f09c77b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,9 @@ rootProject.name = "tasks" include("tasks-jvm") +include("tasks-kotlin") +include("tasks-kotlin-coroutines") +include("tasks-scala") pluginManagement { repositories { diff --git a/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api b/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api new file mode 100644 index 0000000..86a457c --- /dev/null +++ b/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api @@ -0,0 +1,9 @@ +public final class org/funfix/tasks/kotlin/CoroutinesJvmKt { + public static final fun fromSuspended (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun fromSuspended$default (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runSuspended (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun runSuspended$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runSuspended-A-R0woo (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun runSuspended-A-R0woo$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + diff --git a/tasks-kotlin-coroutines/build.gradle.kts b/tasks-kotlin-coroutines/build.gradle.kts new file mode 100644 index 0000000..808d7fb --- /dev/null +++ b/tasks-kotlin-coroutines/build.gradle.kts @@ -0,0 +1,70 @@ +@file:OptIn(ExperimentalKotlinGradlePluginApi::class) + +import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi +import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode + +plugins { + id("tasks.kmp-project") +} + +mavenPublishing { + pom { + name = "Tasks / Kotlin Coroutines" + description = "Integration with Kotlin's Coroutines" + } +} + +kotlin { + sourceSets { + val commonMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + + val commonTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jvmMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-jvm")) + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + + val jvmTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jsMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + } +} diff --git a/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt new file mode 100644 index 0000000..f0c3f19 --- /dev/null +++ b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt @@ -0,0 +1,40 @@ +package org.funfix.tasks.kotlin + +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Similar with `runBlocking`, however this is a "suspended" function, + * to be executed in the context of [kotlinx.coroutines]. + * + * NOTES: + * - The [CoroutineDispatcher], made available via the "coroutine context", is + * used to execute the task, being passed to the task's implementation as an + * `Executor`. + * - The coroutine's cancellation protocol cooperates with that of [Task], + * so cancelling the coroutine will also cancel the task (including the + * possibility for back-pressuring on the fiber's completion after + * cancellation). + * + * @param executor is an override of the `Executor` to be used for executing + * the task. If `null`, the `Executor` will be derived from the + * `CoroutineDispatcher` + */ +public expect suspend fun Task.runSuspended( + executor: Executor? = null +): T + +/** + * See documentation for [Task.runSuspended]. + */ +public expect suspend fun PlatformTask.runSuspended( + executor: Executor? = null +): T + +/** + * Creates a [Task] from a suspended block of code. + */ +public expect suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend () -> T +): Task diff --git a/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt new file mode 100644 index 0000000..b67a24c --- /dev/null +++ b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt @@ -0,0 +1,15 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.coroutineContext + +/** + * Internal API: gets the current [CoroutineDispatcher] from the coroutine context. + */ +internal suspend fun currentDispatcher(): CoroutineDispatcher { + // Access the coroutineContext to get the ContinuationInterceptor + val continuationInterceptor = coroutineContext[ContinuationInterceptor] + return continuationInterceptor as? CoroutineDispatcher ?: Dispatchers.Default +} diff --git a/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt b/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt new file mode 100644 index 0000000..95a6105 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt @@ -0,0 +1,115 @@ +@file:OptIn(DelicateCoroutinesApi::class) + +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException + +public actual suspend fun PlatformTask.runSuspended( + executor: Executor? +): T = run { + val executorOrDefault = executor ?: buildExecutor(currentDispatcher()) + suspendCancellableCoroutine { cont -> + val contCallback = cont.asCompletionCallback() + try { + val token = this.invoke(executorOrDefault, contCallback) + cont.invokeOnCancellation { + token.cancel() + } + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + contCallback(Outcome.Failure(e)) + } + } +} + +internal fun buildExecutor(dispatcher: CoroutineDispatcher): Executor = + DispatcherExecutor(dispatcher) + +internal fun buildCoroutineDispatcher( + @Suppress("UNUSED_PARAMETER") executor: Executor +): CoroutineDispatcher = + // Building this CoroutineDispatcher from an Executor is problematic, and there's no + // point in even trying on top of JS engines. + Dispatchers.Default + +private class DispatcherExecutor(val dispatcher: CoroutineDispatcher) : Executor { + override fun execute(command: Runnable) { + if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { + dispatcher.dispatch( + EmptyCoroutineContext, + kotlinx.coroutines.Runnable { command.run() } + ) + } else { + command.run() + } + } + + override fun toString(): String = + dispatcher.toString() +} + +internal fun CancellableContinuation.asCompletionCallback(): Callback { + var isActive = true + return { outcome -> + if (outcome is Outcome.Failure) { + UncaughtExceptionHandler.rethrowIfFatal(outcome.exception) + } + if (isActive) { + isActive = false + when (outcome) { + is Outcome.Success -> + resume(outcome.value) { _, _, _ -> + // on cancellation? + } + is Outcome.Failure -> + resumeWithException(outcome.exception) + is Outcome.Cancellation -> + resumeWithException(kotlinx.coroutines.CancellationException()) + } + } else if (outcome is Outcome.Failure) { + UncaughtExceptionHandler.logOrRethrow(outcome.exception) + } + } +} + +/** + * Creates a [Task] from a suspended block of code. + */ +public actual suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext, + block: suspend () -> T +): Task = + Task.fromAsync { executor, callback -> + val job = GlobalScope.launch( + buildCoroutineDispatcher(executor) + coroutineContext + ) { + try { + val r = block() + callback(Outcome.Success(r)) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + when (e) { + is CancellationException, is TaskCancellationException -> + callback(Outcome.Cancellation) + else -> + callback(Outcome.Failure(e)) + } + } + } + Cancellable { + job.cancel() + } + } + +public actual suspend fun Task.runSuspended(executor: Executor?): T = + asPlatform.runSuspended(executor) diff --git a/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt b/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt new file mode 100644 index 0000000..aff9371 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt @@ -0,0 +1,114 @@ +@file:JvmName("CoroutinesJvmKt") +@file:OptIn(DelicateCoroutinesApi::class) + +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.asExecutor +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import org.funfix.tasks.jvm.CompletionCallback +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException +import org.funfix.tasks.jvm.Outcome + +public actual suspend fun PlatformTask.runSuspended(executor: Executor?): T = + run { + val executorOrDefault = executor ?: currentDispatcher().asExecutor() + suspendCancellableCoroutine { cont -> + val contCallback = CoroutineAsCompletionCallback(cont) + try { + val token = runAsync(executorOrDefault, contCallback) + cont.invokeOnCancellation { + token.cancel() + } + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + contCallback.onFailure(e) + } + } + } + +/** + * Internal API: wraps a [CancellableContinuation] into a [CompletionCallback]. + */ +internal class CoroutineAsCompletionCallback( + private val cont: CancellableContinuation +) : CompletionCallback { + private val isActive = AtomicBoolean(true) + + private inline fun completeWith(crossinline block: () -> Unit): Boolean = + if (isActive.getAndSet(false)) { + block() + true + } else { + false + } + + override fun onOutcome(outcome: Outcome) { + when (outcome) { + is Outcome.Success -> onSuccess(outcome.value) + is Outcome.Failure -> onFailure(outcome.exception) + is Outcome.Cancellation -> onCancellation() + } + } + + override fun onSuccess(value: T) { + completeWith { + cont.resume(value) { _, _, _ -> + // on cancellation? + } + } + } + + override fun onFailure(e: Throwable) { + if (!completeWith { + cont.resumeWithException(e) + }) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + + override fun onCancellation() { + completeWith { + cont.resumeWithException(kotlinx.coroutines.CancellationException()) + } + } +} + +public actual suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext, + block: suspend () -> T +): Task = Task( + PlatformTask.fromAsync { executor, callback -> + val job = GlobalScope.launch( + executor.asCoroutineDispatcher() + coroutineContext + ) { + try { + val r = block() + callback.onSuccess(r) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + when (e) { + is CancellationException, + is TaskCancellationException, + is InterruptedException -> + callback.onCancellation() + else -> + callback.onFailure(e) + } + } + } + Cancellable { + job.cancel() + } + } +) + +public actual suspend fun Task.runSuspended(executor: Executor?): T = + asPlatform.runSuspended(executor) diff --git a/tasks-kotlin/api/tasks-kotlin.api b/tasks-kotlin/api/tasks-kotlin.api new file mode 100644 index 0000000..78fee6d --- /dev/null +++ b/tasks-kotlin/api/tasks-kotlin.api @@ -0,0 +1,120 @@ +public final class org/funfix/tasks/kotlin/ExecutorsJvmKt { + public static final fun getSharedIOExecutor ()Ljava/util/concurrent/Executor; + public static final fun getTrampolineExecutor ()Ljava/util/concurrent/Executor; +} + +public final class org/funfix/tasks/kotlin/Fiber : org/funfix/tasks/jvm/Cancellable { + public static final synthetic fun box-impl (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/kotlin/Fiber; + public fun cancel ()V + public static fun cancel-impl (Lorg/funfix/tasks/jvm/Fiber;)V + public static fun constructor-impl (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/jvm/Fiber; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Lorg/funfix/tasks/jvm/Fiber;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Lorg/funfix/tasks/jvm/Fiber;Lorg/funfix/tasks/jvm/Fiber;)Z + public final fun getAsPlatform ()Lorg/funfix/tasks/jvm/Fiber; + public fun hashCode ()I + public static fun hashCode-impl (Lorg/funfix/tasks/jvm/Fiber;)I + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Lorg/funfix/tasks/jvm/Fiber; +} + +public final class org/funfix/tasks/kotlin/FiberJvmKt { + public static final fun asKotlin (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/jvm/Fiber; + public static final fun awaitAsync-bilpdk0 (Lorg/funfix/tasks/jvm/Fiber;Lkotlin/jvm/functions/Function1;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun awaitBlocking-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/Object; + public static final fun awaitBlockingTimed-MI-qbaI (Lorg/funfix/tasks/jvm/Fiber;J)Ljava/lang/Object; + public static final fun getOutcomeOrNull-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/kotlin/Outcome; + public static final fun getResultOrThrow-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/Object; + public static final fun joinAsync-bilpdk0 (Lorg/funfix/tasks/jvm/Fiber;Ljava/lang/Runnable;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun joinBlocking-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)V + public static final fun joinBlockingTimed-MI-qbaI (Lorg/funfix/tasks/jvm/Fiber;J)V +} + +public abstract interface class org/funfix/tasks/kotlin/Outcome { + public static final field Companion Lorg/funfix/tasks/kotlin/Outcome$Companion; + public fun getOrThrow ()Ljava/lang/Object; +} + +public final class org/funfix/tasks/kotlin/Outcome$Cancellation : org/funfix/tasks/kotlin/Outcome { + public static final field INSTANCE Lorg/funfix/tasks/kotlin/Outcome$Cancellation; + public fun equals (Ljava/lang/Object;)Z + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Outcome$Companion { + public final fun cancellation ()Lorg/funfix/tasks/kotlin/Outcome; + public final fun failure (Ljava/lang/Throwable;)Lorg/funfix/tasks/kotlin/Outcome; + public final fun success (Ljava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome; +} + +public final class org/funfix/tasks/kotlin/Outcome$Failure : org/funfix/tasks/kotlin/Outcome { + public fun (Ljava/lang/Throwable;)V + public final fun component1 ()Ljava/lang/Throwable; + public final fun copy (Ljava/lang/Throwable;)Lorg/funfix/tasks/kotlin/Outcome$Failure; + public static synthetic fun copy$default (Lorg/funfix/tasks/kotlin/Outcome$Failure;Ljava/lang/Throwable;ILjava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Failure; + public fun equals (Ljava/lang/Object;)Z + public final fun getException ()Ljava/lang/Throwable; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Outcome$Success : org/funfix/tasks/kotlin/Outcome { + public fun (Ljava/lang/Object;)V + public final fun component1 ()Ljava/lang/Object; + public final fun copy (Ljava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Success; + public static synthetic fun copy$default (Lorg/funfix/tasks/kotlin/Outcome$Success;Ljava/lang/Object;ILjava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Success; + public fun equals (Ljava/lang/Object;)Z + public final fun getValue ()Ljava/lang/Object; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Task { + public static final field Companion Lorg/funfix/tasks/kotlin/Task$Companion; + public static final synthetic fun box-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/kotlin/Task; + public static fun constructor-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Lorg/funfix/tasks/jvm/Task;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Lorg/funfix/tasks/jvm/Task;Lorg/funfix/tasks/jvm/Task;)Z + public static final fun getAsJava-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; + public final fun getAsPlatform ()Lorg/funfix/tasks/jvm/Task; + public fun hashCode ()I + public static fun hashCode-impl (Lorg/funfix/tasks/jvm/Task;)I + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Lorg/funfix/tasks/jvm/Task;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Lorg/funfix/tasks/jvm/Task; +} + +public final class org/funfix/tasks/kotlin/Task$Companion { +} + +public final class org/funfix/tasks/kotlin/TaskJvmKt { + public static final fun ensureRunningOnExecutor-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Lorg/funfix/tasks/jvm/Task; + public static synthetic fun ensureRunningOnExecutor-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromAsync (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function2;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromBlockingFuture (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromBlockingIO (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromCancellableFuture (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromCompletionStage (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun runAsync-A-R0woo (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/jvm/functions/Function1;)Lorg/funfix/tasks/jvm/Cancellable; + public static synthetic fun runAsync-A-R0woo$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun runBlocking-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Ljava/lang/Object; + public static synthetic fun runBlocking-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runBlockingTimed-4GGJJa0 (Lorg/funfix/tasks/jvm/Task;JLjava/util/concurrent/Executor;)Ljava/lang/Object; + public static synthetic fun runBlockingTimed-4GGJJa0$default (Lorg/funfix/tasks/jvm/Task;JLjava/util/concurrent/Executor;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runFiber-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Lorg/funfix/tasks/jvm/Fiber; + public static synthetic fun runFiber-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Fiber; +} + +public final class org/funfix/tasks/kotlin/TaskKt { + public static final fun asKotlin (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; +} + +public final class org/funfix/tasks/kotlin/UncaughtExceptionHandler { + public static final field INSTANCE Lorg/funfix/tasks/kotlin/UncaughtExceptionHandler; + public final fun logOrRethrow (Ljava/lang/Throwable;)V + public final fun rethrowIfFatal (Ljava/lang/Throwable;)V +} + diff --git a/tasks-kotlin/build.gradle.kts b/tasks-kotlin/build.gradle.kts new file mode 100644 index 0000000..40a4518 --- /dev/null +++ b/tasks-kotlin/build.gradle.kts @@ -0,0 +1,59 @@ +@file:OptIn(ExperimentalKotlinGradlePluginApi::class) + +import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi +import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode + +plugins { + id("tasks.kmp-project") +} + +mavenPublishing { + pom { + name = "Tasks / Kotlin" + description = "Integration with Kotlin Multiplatform" + } +} + +kotlin { + sourceSets { + val commonMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + } + + val commonTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jvmMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-jvm")) + compileOnly(libs.jetbrains.annotations) + } + } + + val jvmTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jsMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + } + } +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt new file mode 100644 index 0000000..d7b550c --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt @@ -0,0 +1,22 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Represents a non-blocking piece of logic that triggers the cancellation + * procedure of an asynchronous computation. + * + * MUST NOT block the calling thread. Interruption of the computation + * isn't guaranteed to have happened after this call returns. + * + * MUST BE idempotent, i.e. calling it multiple times should have the same + * effect as calling it once. + * + * MUST BE thread-safe. + */ +public expect fun interface Cancellable { + /** + * Triggers the cancellation of the computation. + */ + public fun cancel() +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt new file mode 100644 index 0000000..6f4dd4d --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt @@ -0,0 +1,59 @@ +package org.funfix.tasks.kotlin + +/** + * Represents the result of a computation. + * + * This is a union type that can signal: + * - a successful result, via [Outcome.Success] + * - a failure (with an exception), via [Outcome.Failure] + * - a cancelled computation, via [Outcome.Cancellation] + */ +public sealed interface Outcome { + public val orThrow: T + /** + * Returns the successful result of a computation, or throws an exception + * if the computation failed or was cancelled. + * + * @throws TaskCancellationException in case this is an [Outcome.Cancellation] + * @throws Throwable in case this is an [Outcome.Failure] + */ + @Throws(TaskCancellationException::class) + get() = + when (this) { + is Success -> value + is Failure -> throw exception + is Cancellation -> throw TaskCancellationException("Task was cancelled") + } + + /** + * Returned in case the task was successful. + */ + public data class Success(val value: T): Outcome + + /** + * Returned in case the task failed with an exception. + */ + public data class Failure(val exception: Throwable): Outcome + + /** + * Returned in case the task was cancelled. + */ + public data object Cancellation: Outcome + + public companion object { + /** + * Constructs a successful [Outcome] with the given value. + */ + public fun success(value: T): Outcome = Success(value) + + /** + * Constructs a failed [Outcome] with the given exception. + */ + public fun failure(e: Throwable): Outcome = Failure(e) + + /** + * Constructs a cancelled [Outcome]. + */ + public fun cancellation(): Outcome = Cancellation + } +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt new file mode 100644 index 0000000..7e3b021 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt @@ -0,0 +1,121 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An alias for a platform-specific implementation that powers [Task]. + */ +public expect class PlatformTask + +/** + * Kotlin-specific callback type used for signaling the completion of running + * tasks. + */ +public typealias Callback = (Outcome) -> Unit + +/** + * A task is a computation that can be executed asynchronously. + * + * In the vocabulary of "reactive streams", this is a "cold data source", + * meaning that the computation hasn't executed yet, and when it will execute, + * the result won't get cached (memoized). In the vocabulary of + * "functional programming", this is a pure value, being somewhat equivalent + * to `IO`. + * + * This is designed to be a compile-time type that's going to be erased at + * runtime. Therefore, for the JVM at least, when using it in your APIs, it + * won't pollute it with Kotlin-specific wrappers. + */ +public expect value class Task public constructor( + public val asPlatform: PlatformTask +) { + // Companion object currently doesn't do anything, but we + // need to define one to make the class open for extensions. + public companion object +} + +/** + * Converts a platform task to a Kotlin task. + * + * E.g., can convert a `jvm.Task` to a `kotlin.Task`. + */ +public fun PlatformTask.asKotlin(): Task = + Task(this) + +/** + * Ensures that the task starts asynchronously and runs on the given executor, + * regardless of the `run` method that is used, or the injected executor in + * any of those methods. + * + * One example where this is useful is for blocking I/O operations, for + * ensuring that the task runs on the thread-pool meant for blocking I/O, + * regardless of what executor is passed to [runAsync]. + * + * Example: + * ```kotlin + * Task.fromBlockingIO { + * // Reads a file from disk + * Files.readString(Paths.get("file.txt")) + * }.ensureRunningOnExecutor( + * BlockingIOExecutor + * ) + * ``` + * + * Another use-case is for ensuring that the task runs asynchronously, on + * another thread. Otherwise, tasks may be able to execute on the current thread: + * + * ```kotlin + * val task = Task.fromBlockingIO { + * // Reads a file from disk + * Files.readString(Paths.get("file.txt")) + * } + * + * task + * // Ensuring the task runs on a different thread + * .ensureRunningOnExecutor() + * // Blocking the current thread for the result (JVM API) + * .runBlocking() + * ``` + * + * @param executor is the [Executor] used as an override. If `null`, then + * the executor injected (e.g., in [runAsync]) will be used. + */ +public expect fun Task.ensureRunningOnExecutor(executor: Executor? = null): Task + +/** + * Executes the task asynchronously. + * + * @param executor is the [Executor] to use for running the task + * @param callback is the callback given for signaling completion + * @return a [Cancellable] that can be used to cancel the running task + */ +public expect fun Task.runAsync( + executor: Executor? = null, + callback: Callback +): Cancellable + +/** + * Creates a task from an asynchronous computation, initiated on the current thread. + * + * This method ensures: + * 1. Idempotent cancellation + * 2. Trampolined execution to avoid stack-overflows + * + * The created task will execute the given function on the current + * thread, by using a "trampoline" to avoid stack overflows. This may + * be useful if the computation for initiating the async process is + * expected to be fast. However, if the computation can block the + * current thread, it is recommended to use [fromForkedAsync] instead, + * which will initiate the computation by yielding first (i.e., on the + * JVM this means the execution will start on a different thread). + * + * @param start is the function that will trigger the async computation, + * injecting a callback that will be used to signal the result, and an + * executor that can be used for creating additional threads. + * + * @return a new task that will execute the given builder function upon execution + * @see fromForkedAsync + */ +public expect fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt new file mode 100644 index 0000000..eda1499 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt @@ -0,0 +1,19 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Utilities for handling uncaught exceptions. + */ +public expect object UncaughtExceptionHandler { + /** + * Used for filtering the fatal exceptions that should + * crash the process (e.g., `OutOfMemoryError`). + */ + public fun rethrowIfFatal(e: Throwable) + + /** + * Logs a caught exception, or rethrows it if it's fatal. + */ + public fun logOrRethrow(e: Throwable) +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt new file mode 100644 index 0000000..d29fa54 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt @@ -0,0 +1,24 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An exception that is thrown when waiting for the result of a task + * that has been cancelled. + * + * Note, this is unlike the JVM's `InterruptedException` or Kotlin's + * `CancellationException`, which are thrown when the current thread or fiber is + * interrupted. This exception is thrown when waiting for the result of a task + * that has been cancelled concurrently, but this doesn't mean that the current + * thread or fiber was interrupted. + */ +public expect open class TaskCancellationException(message: String?): Exception { + public constructor() +} + +/** + * Exception thrown when trying to get the result of a fiber that + * hasn't completed yet. + */ +public expect class FiberNotCompletedException public constructor() : + Exception diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt new file mode 100644 index 0000000..db83c28 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt @@ -0,0 +1,49 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An [Executor] is an abstraction for a thread-pool or a single-threaded + * event-loop, used for running tasks. + * + * On the JVM, this is an alias for the `java.util.concurrent.Executor` + * interface. On top of JavaScript, one way to implement this is via + * `setTimeout`. + */ +public expect fun interface Executor { + public fun execute(command: Runnable) +} + +/** + * A simple interface for a task that can be executed asynchronously. + * + * On the JVM, this is an alias for the `java.lang.Runnable` interface. + */ +public expect fun interface Runnable { + public fun run() +} + +/** + * The global executor, used for running tasks that don't specify an + * explicit executor. + * + * On top of the JVM, this is powered by "virtual threads" (project loom), if + * the runtime supports it (Java 21+). Otherwise, it's an unlimited "cached" + * thread-pool. On top of JavaScript, blocking I/O operations are not possible + * in the browser, and discouraged in Node.js. JS runtimes don't have + * multi-threading with shared-memory concurrency, so this will be just a plain + * executor. + */ +public expect val SharedIOExecutor: Executor + +/** + * An [Executor] that runs tasks on the current thread. + * + * Uses a [trampoline](https://en.wikipedia.org/wiki/Trampoline_(computing)) + * to ensure that recursive calls don't blow the stack. + * + * Using this executor is useful for making asynchronous callbacks stack-safe. + * Note, however, that the tasks get executed on the current thread, immediately, + * even if the implementation guards against stack overflows. + */ +public expect val TrampolineExecutor: Executor diff --git a/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt new file mode 100644 index 0000000..aa186ec --- /dev/null +++ b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt @@ -0,0 +1,20 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +interface AsyncTestUtils { + fun runTest( + context: CoroutineContext = EmptyCoroutineContext, + testBody: suspend TestScope.() -> Unit + ) { + kotlinx.coroutines.test.runTest(context) { + withContext(Dispatchers.Unconfined) { + testBody() + } + } + } +} diff --git a/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt new file mode 100644 index 0000000..4e02a99 --- /dev/null +++ b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt @@ -0,0 +1,135 @@ +//package org.funfix.tasks.kotlin +// +//import kotlinx.coroutines.yield +//import kotlin.test.Test +//import kotlin.test.assertEquals +//import kotlin.test.fail +// +//class AsyncTests: AsyncTestUtils { +// @Test +// fun createAsync() = runTest { +// val task = taskFromAsync { executor, callback -> +// executor.execute { +// callback(Outcome.Success(1 + 1)) +// } +// EmptyCancellable +// } +// +// val r = task.executeSuspended() +// assertEquals(2, r) +// } +// +// @Test +// fun fromSuspendedHappy() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val r = task.executeSuspended() +// assertEquals(2, r) +// } +// +// @Test +// fun fromSuspendedFailure() = runTest { +// val e = RuntimeException("Boom") +// val task = taskFromSuspended { +// yield() +// throw e +// } +// +// try { +// task.executeSuspended() +// fail("Should have thrown") +// } catch (e: RuntimeException) { +// assertEquals("Boom", e.message) +// } +// } +// +// @Test +// fun simpleSuspendedChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeSuspended() + 1 +// } +// +// val r = task2.executeSuspended() +// assertEquals(3, r) +// } +// +// @Test +// fun fiberChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeFiber().awaitSuspended() + 1 +// } +// +// val r = task2.executeSuspended() +// assertEquals(3, r) +// } +// +// @Test +// fun complexChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeSuspended() + 1 +// } +// +// val task3 = taskFromSuspended { +// yield() +// task2.executeFiber().awaitSuspended() + 1 +// } +// +// val task4 = taskFromSuspended { +// yield() +// val deferred = async { task3.executeSuspended() } +// deferred.await() + 1 +// } +// +// val r = task4.executeSuspended() +// assertEquals(5, r) +// } +// +// @Test +// fun cancellation() = runTest { +// val lock = Mutex() +// val latch = CompletableDeferred() +// val wasCancelled = CompletableDeferred() +// lock.lock() +// +// val job = async { +// taskFromSuspended { +// yield() +// latch.complete(Unit) +// try { +// lock.lock() +// } finally { +// wasCancelled.complete(Unit) +// lock.unlock() +// } +// }.executeSuspended() +// } +// +// withTimeout(5000) { latch.await() } +// job.cancel() +// +// withTimeout(5000) { +// wasCancelled.await() +// } +// } +//} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt new file mode 100644 index 0000000..a90c1e0 --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt @@ -0,0 +1,46 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual fun interface Cancellable { + public actual fun cancel() + + public companion object { + public val empty: Cancellable = + Cancellable {} + } +} + +internal class MutableCancellable : Cancellable { + private var ref: State = State.Active(Cancellable.empty, 0) + + override fun cancel() { + when (val current = ref) { + is State.Active -> { + ref = State.Cancelled + current.token.cancel() + } + State.Cancelled -> return + } + } + + fun set(token: Cancellable) { + while (true) { + when (val current = ref) { + is State.Active -> { + ref = State.Active(token, current.order + 1) + return + } + is State.Cancelled -> { + token.cancel() + return + } + } + } + } + + private sealed interface State { + data class Active(val token: Cancellable, val order: Int) : State + data object Cancelled : State + } +} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt new file mode 100644 index 0000000..eccde07 --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt @@ -0,0 +1,17 @@ +package org.funfix.tasks.kotlin + +import kotlin.js.Promise + +/** + * This is a wrapper around a JavaScript [Promise] with a + * [Cancellable] reference attached. + * + * A standard JavaScript [Promise] is not connected to its + * asynchronous task and cannot be cancelled. Thus, if we want to cancel + * a task, we need to keep a reference to a [Cancellable] object that + * can do the job. + */ +public data class CancellablePromise( + val promise: Promise, + val cancellable: Cancellable +) diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt new file mode 100644 index 0000000..3457bfc --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt @@ -0,0 +1,73 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual class PlatformTask( + private val f: (Executor, (Outcome) -> Unit) -> Cancellable +) { + public operator fun invoke( + executor: Executor, + callback: (Outcome) -> Unit + ): Cancellable = + f(executor, callback) +} + +public actual value class Task public actual constructor( + public actual val asPlatform: PlatformTask +) { + public actual companion object +} + +public actual fun Task.runAsync( + executor: Executor?, + callback: (Outcome) -> Unit +): Cancellable { + val protected = callback.protect() + try { + return asPlatform.invoke( + executor ?: SharedIOExecutor, + protected + ) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + protected(Outcome.failure(e)) + return Cancellable.empty + } +} + +public actual fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task = + Task(PlatformTask { executor, cb -> + val cRef = MutableCancellable() + TrampolineExecutor.execute { + cRef.set(start(executor, cb)) + } + cRef + }) + +internal fun Callback.protect(): Callback { + var isWaiting = true + return { o -> + if (o is Outcome.Failure) { + UncaughtExceptionHandler.logOrRethrow(o.exception) + } + if (isWaiting) { + isWaiting = false + TrampolineExecutor.execute { + this@protect.invoke(o) + } + } + } +} + +public actual fun Task.ensureRunningOnExecutor(executor: Executor?): Task = + Task(PlatformTask { injectedExecutor, callback -> + val ec = executor ?: injectedExecutor + val cRef = MutableCancellable() + ec.execute { + val c = this@ensureRunningOnExecutor.asPlatform.invoke(ec, callback) + cRef.set(c) + } + cRef + }) diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt new file mode 100644 index 0000000..bb51e11 --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt @@ -0,0 +1,13 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual open class TaskCancellationException public actual constructor( + message: String? +): Exception(message) { + public actual constructor() : this(null) +} + +public actual class FiberNotCompletedException + public actual constructor(): Exception("Fiber not completed yet") + diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt new file mode 100644 index 0000000..ca602ae --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt @@ -0,0 +1,129 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +import org.w3c.dom.WindowOrWorkerGlobalScope +import kotlin.js.Promise + +public actual fun interface Runnable { + public actual fun run() +} + +public actual fun interface Executor { + public actual fun execute(command: Runnable) +} + +public actual val SharedIOExecutor: Executor + get() = JSExecutor + +public actual val TrampolineExecutor: Executor + get() = Trampoline + +private external val self: dynamic +private external val global: dynamic + +private val globalOrSelfDynamic = + (self ?: global)!! +private val globalOrSelf = + globalOrSelfDynamic.unsafeCast() + +private object JSExecutor: Executor { + class NotSupported(execType: ExecType): Exception( + "Executor type $execType is not supported on this runtime" + ) + + private var execType = + if (globalOrSelfDynamic.setInterval != null) ExecType.ViaSetInterval + else if (globalOrSelfDynamic.setTimeout != null) ExecType.ViaSetTimeout + else ExecType.Trampolined + + inline fun withExecType(execType: ExecType, block: () -> T): T { + when (execType) { + ExecType.ViaSetInterval -> + if (globalOrSelfDynamic.setInterval == null) throw NotSupported(execType) + ExecType.ViaSetTimeout -> + if (globalOrSelfDynamic.setTimeout == null) throw NotSupported(execType) + ExecType.Trampolined -> + Unit + } + val oldRef = this.execType + this.execType = execType + try { + return block() + } finally { + this.execType = oldRef + } + } + + fun yield(): Promise { + return Promise { resolve, _ -> + execute { + resolve(Unit) + } + } + } + + override fun execute(command: Runnable) { + val handler: () -> Unit = { + try { + command.run() + } catch (e: Exception) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + when (execType) { + ExecType.ViaSetInterval -> + globalOrSelf.setInterval(handler) + ExecType.ViaSetTimeout -> + globalOrSelf.setTimeout(handler, -1) + ExecType.Trampolined -> + Trampoline.execute(command) + } + } + + sealed interface ExecType { + data object ViaSetInterval: ExecType + data object ViaSetTimeout: ExecType + data object Trampolined: ExecType + } +} + +private object Trampoline: Executor { + private var queue: MutableList? = null + + private fun eventLoop() { + while (true) { + val current = queue + if (current.isNullOrEmpty()) { + return + } + val next = current.removeFirstOrNull() + try { + next?.run() + } catch (e: Exception) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + } + + override fun execute(command: Runnable) { + val current = queue ?: mutableListOf() + current.add(command) + queue = current + try { + eventLoop() + } finally { + queue = null + } + } +} + +public actual object UncaughtExceptionHandler { + public actual fun rethrowIfFatal(e: Throwable) { + // Can we do something here? + } + + public actual fun logOrRethrow(e: Throwable) { + console.error(e) + } +} diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt new file mode 100644 index 0000000..1d5d358 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt @@ -0,0 +1,197 @@ +@file:JvmName("FiberJvmKt") + +package org.funfix.tasks.kotlin + +import org.jetbrains.annotations.Blocking +import org.jetbrains.annotations.NonBlocking +import java.util.concurrent.ExecutionException +import java.util.concurrent.TimeoutException +import kotlin.jvm.Throws +import kotlin.time.Duration +import kotlin.time.toJavaDuration + +public typealias PlatformFiber = org.funfix.tasks.jvm.Fiber + +/** + * A fiber is a running task being executed concurrently, and that can be + * joined/awaited or cancelled. + * + * This is the equivalent of Kotlin's `Deferred` type. + * + * This is designed to be a compile-time type that's going to be erased at + * runtime. Therefore, for the JVM at least, when using it in your APIs, it + * won't pollute it with Kotlin-specific wrappers. + */ +@JvmInline +public value class Fiber public constructor( + public val asPlatform: PlatformFiber +): Cancellable { + /** + * Cancels the fiber, which will eventually stop the running fiber (if + * it's still running), completing it via "cancellation". + * + * This manifests either in a [TaskCancellationException] being thrown by + * [resultOrThrow], or in the completion callback being triggered. + */ + @NonBlocking + override fun cancel(): Unit = asPlatform.cancel() +} + +/** + * Converts the source to a [Kotlin Fiber][Fiber]. + * + * E.g., can convert from a `jvm.Fiber` to a `kotlin.Fiber`. + */ +public fun PlatformFiber.asKotlin(): Fiber = + Fiber(this) + +/** + * Returns the result of the completed fiber. + * + * This method does not block for the result. In case the fiber is not + * completed, it throws [FiberNotCompletedException]. Therefore, by contract, + * it should be called only after the fiber was "joined". + * + * @return the result of the concurrent task, if successful. + * @throws TaskCancellationException if the task was cancelled concurrently, + * thus being completed via cancellation. + * @throws FiberNotCompletedException if the fiber is not completed yet. + * @throws Throwable if the task finished with an exception. + */ +public val Fiber.resultOrThrow: T + @NonBlocking + @Throws(TaskCancellationException::class, FiberNotCompletedException::class) + get() = asPlatform.resultOrThrow + +/** + * Returns the [Outcome] of the completed fiber, or `null` in case the + * fiber is not completed yet. + * + * This method does not block for the result. In case the fiber is not + * completed, it returns `null`. Therefore, it should be called after + * the fiber was "joined". + */ +public val Fiber.outcomeOrNull: Outcome? get() = + try { + Outcome.Success(asPlatform.resultOrThrow) + } catch (e: TaskCancellationException) { + Outcome.Cancellation + } catch (e: ExecutionException) { + Outcome.Failure(e.cause ?: e) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + Outcome.Failure(e) + } + +/** + * Waits until the fiber completes, and then runs the given callback to + * signal its completion. + * + * Completion includes cancellation. Triggering [Fiber.cancel] before + * [joinAsync] will cause the fiber to get cancelled, and then the + * "join" back-pressures on cancellation. + * + * @param onComplete is the callback to run when the fiber completes + * (successfully, or with failure, or cancellation) + */ +@NonBlocking +public fun Fiber.joinAsync(onComplete: Runnable): Cancellable = + asPlatform.joinAsync(onComplete) + +/** + * Waits until the fiber completes, and then runs the given callback + * to signal its completion. + * + * This method can be executed as many times as necessary, with the + * result of the `Fiber` being memoized. It can also be executed + * after the fiber has completed, in which case the callback will be + * executed immediately. + * + * @param callback will be called with the result when the fiber completes. + * + * @return a [Cancellable] that can be used to unregister the callback, + * in case the caller is no longer interested in the result. Note this + * does not cancel the fiber itself. + */ +@NonBlocking +public fun Fiber.awaitAsync(callback: Callback): Cancellable = + asPlatform.awaitAsync(callback.asJava()) + +/** + * Blocks the current thread until the fiber completes. + * + * This method does not return the outcome of the fiber. To check + * the outcome, use [resultOrThrow]. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + */ +@Blocking +@Throws(InterruptedException::class) +public fun Fiber.joinBlocking(): Unit = + asPlatform.joinBlocking() + +/** + * Blocks the current thread until the fiber completes, then returns the + * result of the fiber. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running task. + * + * @throws TaskCancellationException if the fiber was cancelled concurrently. + * + * @throws Throwable if the task failed with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TaskCancellationException::class) +public fun Fiber.awaitBlocking(): T = + try { + asPlatform.awaitBlocking() + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +/** + * Blocks the current thread until the fiber completes, or until the + * timeout is reached. + * + * This method does not return the outcome of the fiber. To check the + * outcome, use [resultOrThrow]. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + * + * @throws TimeoutException if the timeout is reached before the fiber + * completes. + */ +@Blocking +@Throws(InterruptedException::class, TimeoutException::class) +public fun Fiber.joinBlockingTimed(timeout: Duration): Unit = + asPlatform.joinBlockingTimed(timeout.toJavaDuration()) + +/** + * Blocks the current thread until the fiber completes, then returns the result of the fiber. + * + * @param timeout the maximum time to wait for the fiber to complete, before + * throwing a [TimeoutException]. + * + * @return the result of the fiber, if successful. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + * @throws TimeoutException if the timeout is reached before the fiber completes. + * @throws TaskCancellationException if the fiber was cancelled concurrently. + * @throws Throwable if the task failed with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TaskCancellationException::class, TimeoutException::class) +public fun Fiber.awaitBlockingTimed(timeout: Duration): T = + try { + asPlatform.awaitBlockingTimed(timeout.toJavaDuration()) + } catch (e: ExecutionException) { + throw e.cause ?: e + } + diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt new file mode 100644 index 0000000..0c8f59c --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt @@ -0,0 +1,220 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") +@file:JvmName("TaskJvmKt") + +package org.funfix.tasks.kotlin + +import org.jetbrains.annotations.Blocking +import org.jetbrains.annotations.NonBlocking +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.TimeoutException +import kotlin.jvm.Throws +import kotlin.time.Duration +import kotlin.time.toJavaDuration + +public actual typealias PlatformTask = org.funfix.tasks.jvm.Task + +@JvmInline +public actual value class Task public actual constructor( + public actual val asPlatform: PlatformTask +) { + /** + * Converts this task to a `jvm.Task`. + */ + public val asJava: PlatformTask get() = asPlatform + + public actual companion object +} + +@NonBlocking +public actual fun Task.ensureRunningOnExecutor(executor: Executor?): Task = + Task(when (executor) { + null -> asPlatform.ensureRunningOnExecutor() + else -> asPlatform.ensureRunningOnExecutor(executor) + }) + +@NonBlocking +public actual fun Task.runAsync( + executor: Executor?, + callback: Callback +): Cancellable = + when (executor) { + null -> asPlatform.runAsync(callback.asJava()) + else -> asPlatform.runAsync(executor, callback.asJava()) + } + +/** + * Executes the task concurrently and returns a [Fiber] that can be + * used to wait for the result or cancel the task. + * + * Similar to [runAsync], this method starts the execution on a different thread. + * + * @param executor is the [Executor] that may be used to run the task. + * + * @return a [Fiber] that can be used to wait for the outcome, + * or to cancel the running fiber. + */ +@NonBlocking +public fun Task.runFiber(executor: Executor? = null): Fiber = + Fiber(when (executor) { + null -> asPlatform.runFiber() + else -> asPlatform.runFiber(executor) + }) + +/** + * Executes the task and blocks until it completes, or the current thread gets + * interrupted (in which case the task is also cancelled). + * + * Given that the intention is to block the current thread for the result, the + * task starts execution on the current thread. + * + * @param executor the [Executor] that may be used to run the task. + * @return the successful result of the task. + * + * @throws InterruptedException if the current thread is interrupted, which also + * cancels the running task. Note that on interruption, the running concurrent + * task must also be interrupted, as this method always blocks for its + * interruption or completion. + * + * @throws Throwable if the task fails with an exception + */ +@Blocking +@Throws(InterruptedException::class) +public fun Task.runBlocking(executor: Executor? = null): T = + try { + when (executor) { + null -> asPlatform.runBlocking() + else -> asPlatform.runBlocking(executor) + } + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +/** + * Executes the task and blocks until it completes, the timeout is reached, or + * the current thread is interrupted. + * + * **EXECUTION MODEL:** Execution starts on a different thread, by necessity, + * otherwise the execution could block the current thread indefinitely, without + * the possibility of interrupting the task after the timeout occurs. + * + * @param timeout the maximum time to wait for the task to complete before + * throwing a [TimeoutException]. + * + * @param executor the [Executor] that may be used to run the task. If one isn't + * provided, the execution will use [SharedIOExecutor] as the default. + * + * @return the successful result of the task. + * + * @throws InterruptedException if the current thread is interrupted. The + * running task is also cancelled, and this method does not return until + * `onCancel` is signaled. + * + * @throws TimeoutException if the task doesn't complete within the specified + * timeout. The running task is also cancelled on timeout, and this method does + * not return until `onCancel` is signaled. + * + * @throws Throwable if the task fails with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TimeoutException::class) +public fun Task.runBlockingTimed( + timeout: Duration, + executor: Executor? = null +): T = + try { + when (executor) { + null -> asPlatform.runBlockingTimed(timeout.toJavaDuration()) + else -> asPlatform.runBlockingTimed(executor, timeout.toJavaDuration()) + } + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +// Builders + +@NonBlocking +public actual fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task = + Task(PlatformTask.fromAsync { executor, cb -> + start(executor, cb.asKotlin()) + }) + +/** + * Creates a task from a function executing blocking I/O. + * + * This uses Java's interruption protocol (i.e., [Thread.interrupt]) for + * cancelling the task. + */ +@NonBlocking +public fun Task.Companion.fromBlockingIO(block: () -> T): Task = + Task(PlatformTask.fromBlockingIO(block)) + +/** + * Creates a task from a [Future] builder. + * + * This is compatible with Java's interruption protocol and [Future.cancel], + * with the resulting task being cancellable. + * + * **NOTE:** Use [fromCompletionStage] for directly converting + * [java.util.concurrent.CompletableFuture] builders, because it is not possible + * to cancel such values, and the logic needs to reflect it. Better yet, use + * [fromCancellableFuture] for working with [CompletionStage] values that can be + * cancelled. + * + * @param builder is the function that will create the [Future] upon this task's + * execution. + * + * @return a new task that will complete with the result of the created `Future` + * upon execution + * + * @see fromCompletionStage + * @see fromCancellableFuture + */ +@NonBlocking +public fun Task.Companion.fromBlockingFuture(builder: () -> Future): Task = + Task(PlatformTask.fromBlockingFuture(builder)) + +/** + * Creates tasks from a builder of [CompletionStage]. + * + * **NOTE:** `CompletionStage` isn't cancellable, and the resulting task should + * reflect this (i.e., on cancellation, the listener should not receive an + * `onCancel` signal until the `CompletionStage` actually completes). + * + * Prefer using [fromCancellableFuture] for working with [CompletionStage] + * values that can be cancelled. + * + * @param builder is the function that will create the [CompletionStage] + * value. It's a builder because `Task` values are cold values + * (lazy, not executed yet). + * + * @return a new task that upon execution will complete with the result of + * the created `CancellableCompletionStage` + * + * @see fromCancellableFuture + */ +@NonBlocking +public fun Task.Companion.fromCompletionStage(builder: () -> CompletionStage): Task = + Task(PlatformTask.fromCompletionStage(builder)) + +/** + * Creates tasks from a builder of [CancellableFuture]. + * + * This is the recommended way to work with [CompletionStage] builders, because + * cancelling such values (e.g., [java.util.concurrent.CompletableFuture]) + * doesn't work for cancelling the connecting computation. As such, the user + * should provide an explicit [Cancellable] token that can be used. + * + * @param builder the function that will create the [CancellableFuture] value. + * It's a builder because [Task] values are cold values (lazy, not executed + * yet). + * + * @return a new task that upon execution will complete with the result of the + * created [CancellableFuture] + */ +@NonBlocking +public fun Task.Companion.fromCancellableFuture(builder: () -> CancellableFuture): Task = + Task(PlatformTask.fromCancellableFuture(builder)) diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt new file mode 100644 index 0000000..9d7679f --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt @@ -0,0 +1,16 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Utilities for handling uncaught exceptions. + */ +public actual object UncaughtExceptionHandler { + public actual fun rethrowIfFatal(e: Throwable) { + org.funfix.tasks.jvm.UncaughtExceptionHandler.rethrowIfFatal(e) + } + + public actual fun logOrRethrow(e: Throwable) { + org.funfix.tasks.jvm.UncaughtExceptionHandler.logOrRethrow(e) + } +} diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt new file mode 100644 index 0000000..2ef84a3 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt @@ -0,0 +1,26 @@ +@file:JvmName("AliasesKt") +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual typealias Cancellable = org.funfix.tasks.jvm.Cancellable + +/** + * A `CancellableFuture` is a tuple of a `CompletableFuture` and a `Cancellable` + * reference. + * + * It's used to model the result of asynchronous computations that can be + * cancelled. Needed because `CompletableFuture` doesn't actually support + * cancellation. It's similar to [Fiber], which should be preferred, because + * it's more principled. `CancellableFuture` is useful for interop with + * Java libraries that use `CompletableFuture`. + */ +public typealias CancellableFuture = org.funfix.tasks.jvm.CancellableFuture + +public actual typealias TaskCancellationException = org.funfix.tasks.jvm.TaskCancellationException + +public actual typealias FiberNotCompletedException = org.funfix.tasks.jvm.Fiber.NotCompletedException + +public actual typealias Runnable = java.lang.Runnable + +public actual typealias Executor = java.util.concurrent.Executor diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt new file mode 100644 index 0000000..1480b94 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt @@ -0,0 +1,11 @@ +@file:JvmName("ExecutorsJvmKt") + +package org.funfix.tasks.kotlin + +import org.funfix.tasks.jvm.TaskExecutors + +public actual val TrampolineExecutor: Executor + get() = TaskExecutors.trampoline() + +public actual val SharedIOExecutor: Executor + get() = TaskExecutors.sharedBlockingIO() diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt new file mode 100644 index 0000000..5d15ab3 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt @@ -0,0 +1,26 @@ +@file:JvmName("InternalsJvmKt") + +package org.funfix.tasks.kotlin + +import org.funfix.tasks.jvm.CompletionCallback + + +internal typealias KotlinCallback = (Outcome) -> Unit + +internal fun CompletionCallback.asKotlin(): KotlinCallback = + { outcome -> + when (outcome) { + is Outcome.Success -> this.onSuccess(outcome.value) + is Outcome.Failure -> this.onFailure(outcome.exception) + is Outcome.Cancellation -> this.onCancellation() + } + } + +internal fun KotlinCallback.asJava(): CompletionCallback = + CompletionCallback { outcome -> + when (outcome) { + is org.funfix.tasks.jvm.Outcome.Success -> this@asJava(Outcome.Success(outcome.value)) + is org.funfix.tasks.jvm.Outcome.Failure -> this@asJava(Outcome.Failure(outcome.exception)) + is org.funfix.tasks.jvm.Outcome.Cancellation -> this@asJava(Outcome.Cancellation) + } + } diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt new file mode 100644 index 0000000..2d68f79 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt @@ -0,0 +1,34 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.ensureRunningOnExecutor +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runBlocking +import org.junit.jupiter.api.Assertions.assertTrue +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskEnsureRunningOnExecutorTest { + @Test + @Suppress("DEPRECATION") + fun ensureRunningOnExecutorWorks() { + val ex = Executors.newCachedThreadPool { r -> + val th = Thread(r) + th.name = "my-thread-" + th.id + th + } + try { + val t = Task.fromBlockingIO { Thread.currentThread().name } + val n1 = t.runBlocking() + val n2 = t.ensureRunningOnExecutor().runBlocking() + val n3 = t.ensureRunningOnExecutor(ex).runBlocking() + + assertEquals(Thread.currentThread().name, n1) + assertTrue(n2.startsWith("tasks-io-"), "tasks-io") + assertTrue(n3.startsWith("my-thread-"), "my-thread") + } finally { + ex.shutdown() + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt new file mode 100644 index 0000000..79a1d37 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt @@ -0,0 +1,56 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Cancellable +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromAsync +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import java.util.concurrent.CountDownLatch +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskFromAsyncTest { + @Test + fun `fromAsync happy path`() { + val task = Task.fromAsync { _, cb -> + cb(Outcome.success(1)) + Cancellable.getEmpty() + } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromAsync with error`() { + val ex = RuntimeException("Boom!") + val task = Task.fromAsync { _, cb -> + cb(Outcome.failure(ex)) + Cancellable.getEmpty() + } + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromAsync can be cancelled`() { + val latch = CountDownLatch(1) + val task = Task.fromAsync { executor, cb -> + executor.execute { + latch.await() + cb(Outcome.Cancellation) + } + Cancellable { + latch.countDown() + } + } + val fiber = task.runFiber() + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt new file mode 100644 index 0000000..3493719 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt @@ -0,0 +1,42 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskFromBlockingIOTest { + @Test + fun `fromBlockingIO (success)`() { + val task = Task.fromBlockingIO { 1 } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromBlockingIO (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromBlockingIO (cancellation)`() { + val task: Task = Task.fromBlockingIO { + Thread.sleep(10000) + 1 + } + val fiber = task.runFiber() + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt new file mode 100644 index 0000000..e6726a3 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt @@ -0,0 +1,169 @@ +package org.funfix.tests + +import org.funfix.tasks.jvm.Cancellable +import org.funfix.tasks.jvm.CancellableFuture +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingFuture +import org.funfix.tasks.kotlin.fromCancellableFuture +import org.funfix.tasks.kotlin.fromCompletionStage +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import java.util.concurrent.Callable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail + +class TaskFromFutureTest { + @Test + fun `fromBlockingFuture (success)`() { + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromBlockingFuture { + ec.submit(Callable { 1 }) + } + assertEquals(1, task.runBlocking()) + } finally { + ec.shutdown() + } + } + + @Test + fun `fromBlockingFuture (failure)`() { + val ex = RuntimeException("Boom!") + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromBlockingFuture { + ec.submit { throw ex } + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } finally { + ec.shutdown() + } + } + + @Test + fun `fromBlockingFuture (cancellation)`() { + val ec = Executors.newCachedThreadPool() + val wasStarted = CountDownLatch(1) + val wasCancelled = CountDownLatch(1) + try { + val task = Task.fromBlockingFuture { + ec.submit(Callable { + wasStarted.countDown() + try { + Thread.sleep(10000) + 1 + } catch (e: InterruptedException) { + wasCancelled.countDown() + throw e + } + }) + } + val fiber = task.runFiber() + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + TimedAwait.latchAndExpectCompletion(wasCancelled, "wasCancelled") + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCompletionStage (success)`() { + val task = Task.fromCompletionStage { + CompletableFuture.supplyAsync { 1 } + } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromCompletionStage (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromCompletionStage { + CompletableFuture.supplyAsync { + throw ex + } + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromCancellableFuture (success)`() { + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { 1 }, + Cancellable.getEmpty() + ) + } + assertEquals(1, task.runBlocking()) + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCancellableFuture (failure)`() { + val ex = RuntimeException("Boom!") + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { + throw ex + }, + Cancellable.getEmpty() + ) + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCancellableFuture (cancellation)`() { + val wasStarted = CountDownLatch(1) + val wasCancelled = CountDownLatch(1) + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { + wasStarted.countDown() + TimedAwait.latchNoExpectations(wasCancelled) + } + ) { + wasCancelled.countDown() + } + } + val fiber = task.runFiber() + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + TimedAwait.latchAndExpectCompletion(wasCancelled, "wasCancelled") + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt new file mode 100644 index 0000000..be81a16 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt @@ -0,0 +1,105 @@ +package org.funfix.tests + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicReference +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail +import org.funfix.tasks.jvm.TaskCancellationException +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runAsync +import org.junit.jupiter.api.Assertions.assertTrue + +class TaskRunAsyncTest { + @Test + fun `runAsync (success)`() { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { 1 } + task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + assertEquals(1, outcomeRef.get()!!.orThrow) + } + + @Test + fun `runAsync (failure)`() { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + try { + outcomeRef.get()!!.orThrow + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runAsync (cancellation)`() { + val latch = CountDownLatch(1) + val wasStarted = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { + wasStarted.countDown() + Thread.sleep(10000) + 1 + } + val cancel = task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + cancel.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + try { + outcomeRef.get()!!.orThrow + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } + + @Test + @Suppress("DEPRECATION") + fun `runAsync runs with given executor`() { + val ec = Executors.newCachedThreadPool { r -> + val t = Thread(r) + t.isDaemon = true + t.name = "my-thread-${t.id}" + t + } + try { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { + Thread.currentThread().name + } + task.runAsync(ec) { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + TimedAwait.latchAndExpectCompletion(latch) + assertTrue(outcomeRef.get()!!.orThrow.startsWith("my-thread-")) + } finally { + ec.shutdown() + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt new file mode 100644 index 0000000..5d8ae69 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt @@ -0,0 +1,106 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.SharedIOExecutor +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runBlockingTimed +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail +import kotlin.time.toKotlinDuration + +class TaskRunBlockingTest { + @Test + fun `runBlocking (success)`() { + val task = Task.fromBlockingIO { 1 } + + assertEquals(1, task.runBlocking()) + assertEquals(1, task.runBlocking(SharedIOExecutor)) + } + + @Test + fun `runBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + + try { + task.runBlocking(SharedIOExecutor) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runBlocking (cancellation)`() { + val task: Task = Task.fromBlockingIO { + throw InterruptedException() + } + try { + task.runBlocking() + } catch (e: InterruptedException) { + // expected + } + try { + task.runBlocking(SharedIOExecutor) + } catch (e: InterruptedException) { + // expected + } + } + + @Test + fun `runBlockingTimed (success)`() { + val task = Task.fromBlockingIO { 1 } + + assertEquals(1, task.runBlockingTimed( + TimedAwait.TIMEOUT.toKotlinDuration() + )) + assertEquals(1, task.runBlockingTimed( + TimedAwait.TIMEOUT.toKotlinDuration(), + SharedIOExecutor + )) + } + + @Test + fun `runBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration(), SharedIOExecutor) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runBlockingTimed (cancellation)`() { + val task: Task = Task.fromBlockingIO { + throw InterruptedException() + } + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: InterruptedException) { + // expected + } + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration(), SharedIOExecutor) + fail("Expected exception") + } catch (e: InterruptedException) { + // expected + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt new file mode 100644 index 0000000..d4ecd20 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt @@ -0,0 +1,275 @@ +package org.funfix.tests + +import org.funfix.tasks.jvm.TaskCancellationException +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.awaitAsync +import org.funfix.tasks.kotlin.awaitBlocking +import org.funfix.tasks.kotlin.awaitBlockingTimed +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.joinAsync +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.joinBlockingTimed +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.resultOrThrow +import org.funfix.tasks.kotlin.runFiber +import org.junit.jupiter.api.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.test.assertEquals +import kotlin.test.fail +import kotlin.time.toKotlinDuration + +class TaskRunFiberTest { + @Test + fun `runFiber + joinAsync (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + } + + @Test + fun `runFiber + joinAsync (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + } + + @Test + fun `runFiber + joinAsync (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + fiber.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + fiber.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + } + + @Test + fun `runFiber + joinBlocking (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + fiber.joinBlocking() + assertEquals(1, fiber.resultOrThrow) + assertEquals(Outcome.Success(1), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + fiber.joinBlocking() + assertEquals(Outcome.Failure(ex), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlocking (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } + + @Test + fun `runFiber + awaitBlocking (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val result = fiber.awaitBlocking() + assertEquals(1, result) + } + + @Test + fun `runFiber + awaitBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + try { + fiber.awaitBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runFiber + awaitBlocking (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + try { + fiber.awaitBlocking() + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } + + @Test + fun `runFiber + joinBlockingTimed (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(1, fiber.resultOrThrow) + assertEquals(Outcome.Success(1), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(Outcome.Failure(ex), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlockingTimed (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } + + @Test + fun `runFiber + awaitBlockingTimed (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val result = fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(1, result) + } + + @Test + fun `runFiber + awaitBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + try { + fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runFiber + awaitBlockingTimed (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + try { + fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt new file mode 100644 index 0000000..3b89538 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt @@ -0,0 +1,31 @@ +package org.funfix.tests + +import java.time.Duration +import java.util.concurrent.* + +object TimedAwait { + val TIMEOUT: Duration = + if (System.getenv("CI") != null) Duration.ofSeconds(20) + else Duration.ofSeconds(10) + + @Throws(InterruptedException::class) + fun latchNoExpectations(latch: CountDownLatch): Boolean = + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + + @Throws(InterruptedException::class) + fun latchAndExpectCompletion(latch: CountDownLatch) = latchAndExpectCompletion(latch, "latch") + + @Throws(InterruptedException::class) + fun latchAndExpectCompletion(latch: CountDownLatch, name: String) { + assert(latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { "$name.await" } + } + + @Throws(InterruptedException::class, TimeoutException::class) + fun future(future: Future<*>) { + try { + future.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + } catch (e: ExecutionException) { + throw RuntimeException(e) + } + } +} diff --git a/tasks-scala/build.gradle.kts b/tasks-scala/build.gradle.kts new file mode 100644 index 0000000..1886ad0 --- /dev/null +++ b/tasks-scala/build.gradle.kts @@ -0,0 +1,18 @@ + +tasks.register("assemble", Exec::class) { + workingDir(project.projectDir) + + commandLine("./sbt", "package") +} + +tasks.register("executeShellCommand", Exec::class) { + // Set the command to execute. Example: "echo", "Hello, World!" + commandLine("echo", "Hello, World!") + commandLine("echo", project.projectDir) + + // Optionally, set the working directory + workingDir(project.rootDir) + + // Logging the output + standardOutput = System.out +} diff --git a/tasks-scala/build.sbt b/tasks-scala/build.sbt new file mode 100644 index 0000000..bd2ca3f --- /dev/null +++ b/tasks-scala/build.sbt @@ -0,0 +1,68 @@ +import Boilerplate.crossVersionSharedSources +import java.io.FileInputStream +import java.util.Properties + +ThisBuild / scalaVersion := "3.3.1" +ThisBuild / crossScalaVersions := Seq("2.13.14", scalaVersion.value) + +ThisBuild / resolvers ++= Seq(Resolver.mavenLocal) + +val publishLocalGradleDependencies = taskKey[Unit]("Builds and publishes gradle dependencies") +val props = settingKey[Properties]("Main project properties") + +ThisBuild / props := { + val projectProperties = new Properties() + val rootDir = (ThisBuild / baseDirectory).value + val fis = new FileInputStream(s"$rootDir/../gradle.properties") + projectProperties.load(fis) + projectProperties +} + +ThisBuild / version := { + val base = props.value.getProperty("project.version") + val isRelease = + sys.env.get("BUILD_RELEASE").filter(_.nonEmpty) + .orElse(Option(System.getProperty("buildRelease"))) + .exists(it => it == "true" || it == "1" || it == "yes" || it == "on") + if (isRelease) base else s"$base-SNAPSHOT" +} + +Global / onChangedBuildSource := ReloadOnSourceChanges + +lazy val root = project + .in(file(".")) + .settings( + publish := {}, + publishLocal := {}, + publishLocalGradleDependencies := { + import scala.sys.process.* + val rootDir = (ThisBuild / baseDirectory).value + val command = Process( + "./gradlew" :: "publishToMavenLocal" :: Nil, + new File(rootDir, ".."), + ) + val log = streams.value.log + val exitCode = command ! log + if (exitCode != 0) { + sys.error(s"Command failed with exit code $exitCode") + } + } + ) + .aggregate(coreJVM, coreJS) + +lazy val core = crossProject(JVMPlatform, JSPlatform) + .crossType(CrossType.Full) + .in(file("core")) + .settings(crossVersionSharedSources) + .settings( + name := "tasks-scala", + ) + .jvmSettings( + libraryDependencies ++= Seq( + "org.funfix" % "tasks-jvm" % version.value + ) + ) + +lazy val coreJVM = core.jvm +lazy val coreJS = core.js + diff --git a/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/CompletionCallback.scala b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/CompletionCallback.scala new file mode 100644 index 0000000..f4677d1 --- /dev/null +++ b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/CompletionCallback.scala @@ -0,0 +1,22 @@ +package org.funfix.tasks.scala + +import org.funfix.tasks.jvm.CompletionCallback as JavaCompletionCallback + +type CompletionCallback[-A] = Outcome[A] => Unit + +extension [A](cb: JavaCompletionCallback[_ >: A]) { + def asScala: CompletionCallback[A] = { + case Outcome.Success(value) => cb.onSuccess(value) + case Outcome.Failure(ex) => cb.onFailure(ex) + case Outcome.Cancellation => cb.onCancellation() + } +} + +extension [A](cb: CompletionCallback[A]) { + def asJava: JavaCompletionCallback[_ >: A] = + new JavaCompletionCallback[A] { + def onSuccess(value: A): Unit = cb(Outcome.Success(value)) + def onFailure(ex: Throwable): Unit = cb(Outcome.Failure(ex)) + def onCancellation(): Unit = cb(Outcome.Cancellation) + } +} diff --git a/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/Task.scala b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/Task.scala new file mode 100644 index 0000000..e4dce4a --- /dev/null +++ b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/Task.scala @@ -0,0 +1,24 @@ +package org.funfix.tasks.scala + +import org.funfix.tasks.jvm.Task as JavaTask + +opaque type Task[+A] = JavaTask[_ <: A] + +object Task { + def apply[A](underlying: JavaTask[_ <: A]): Task[A] = underlying + + def fromAsync[A](f: (Executor, CompletionCallback[A]) => Cancellable): Task[A] = + Task[A](JavaTask.fromAsync { (ec, cb) => + f(ec, cb.asScala) + }) + + extension [A](task: Task[A]) { + def asPlatform: JavaTask[_ <: A] = task + + def unsafeRunAsync(ec: Executor)(cb: CompletionCallback[A]): Cancellable = + unsafeRunAsync(cb)(using TaskExecutor(ec)) + + def unsafeRunAsync(cb: CompletionCallback[A])(using ec: TaskExecutor): Cancellable = + task.asPlatform.runAsync(ec, cb.asJava) + } +} diff --git a/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/TaskExecutor.scala b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/TaskExecutor.scala new file mode 100644 index 0000000..3c795eb --- /dev/null +++ b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/TaskExecutor.scala @@ -0,0 +1,27 @@ +package org.funfix.tasks.scala + +import org.funfix.tasks.jvm.TaskExecutors + +import scala.concurrent.ExecutionContext + +opaque type TaskExecutor <: Executor = Executor + +object TaskExecutor { + def apply(executor: Executor): TaskExecutor = executor + + lazy val compute: TaskExecutor = + TaskExecutor(ExecutionContext.global) + + lazy val blockingIO: TaskExecutor = + TaskExecutor(TaskExecutors.sharedBlockingIO()) + + lazy val trampoline: TaskExecutor = + TaskExecutor(TaskExecutors.trampoline()) + + object Givens { + given compute: TaskExecutor = + TaskExecutor.compute + given blockingIO: TaskExecutor = + TaskExecutor.blockingIO + } +} diff --git a/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/aliases.scala b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/aliases.scala new file mode 100644 index 0000000..acc7448 --- /dev/null +++ b/tasks-scala/core/jvm/src/main/scala-3/org/funfix/tasks/scala/aliases.scala @@ -0,0 +1,4 @@ +package org.funfix.tasks.scala + +type Cancellable = org.funfix.tasks.jvm.Cancellable +type Executor = java.util.concurrent.Executor \ No newline at end of file diff --git a/tasks-scala/core/shared/src/main/scala-3/org/funfix/tasks/scala/Outcome.scala b/tasks-scala/core/shared/src/main/scala-3/org/funfix/tasks/scala/Outcome.scala new file mode 100644 index 0000000..bfa2390 --- /dev/null +++ b/tasks-scala/core/shared/src/main/scala-3/org/funfix/tasks/scala/Outcome.scala @@ -0,0 +1,7 @@ +package org.funfix.tasks.scala + +enum Outcome[+A] { + case Success(value: A) + case Failure(exception: Throwable) + case Cancellation +} diff --git a/tasks-scala/project/Boilerplate.scala b/tasks-scala/project/Boilerplate.scala new file mode 100644 index 0000000..b768a4c --- /dev/null +++ b/tasks-scala/project/Boilerplate.scala @@ -0,0 +1,27 @@ +import sbt.* +import sbt.Keys.* + +object Boilerplate { + /** + * For working with Scala version-specific source files, allowing us to + * use 2.13 or 3.x specific APIs. + */ + lazy val crossVersionSharedSources: Seq[Setting[?]] = { + def scalaPartV = Def setting (CrossVersion partialVersion scalaVersion.value) + Seq(Compile, Test).map { sc => + (sc / unmanagedSourceDirectories) ++= { + (sc / unmanagedSourceDirectories).value + .filterNot(_.getPath.matches("^.*\\d+$")) + .flatMap { dir => + Seq( + scalaPartV.value match { + case Some((2, _)) => Seq(new File(dir.getPath + "-2")) + case Some((3, _)) => Seq(new File(dir.getPath + "-3")) + case _ => Nil + }, + ).flatten + } + } + } + } +} diff --git a/tasks-scala/project/build.properties b/tasks-scala/project/build.properties new file mode 100644 index 0000000..081fdbb --- /dev/null +++ b/tasks-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.10.0 diff --git a/tasks-scala/project/plugins.sbt b/tasks-scala/project/plugins.sbt new file mode 100644 index 0000000..8a47437 --- /dev/null +++ b/tasks-scala/project/plugins.sbt @@ -0,0 +1,4 @@ +addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") +addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.16.0") +addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.4") diff --git a/tasks-scala/sbt b/tasks-scala/sbt new file mode 100755 index 0000000..7d3f707 --- /dev/null +++ b/tasks-scala/sbt @@ -0,0 +1,818 @@ +#!/usr/bin/env bash + +set +e +declare builtin_sbt_version="1.10.0" +declare -a residual_args +declare -a java_args +declare -a scalac_args +declare -a sbt_commands +declare -a sbt_options +declare -a print_version +declare -a print_sbt_version +declare -a print_sbt_script_version +declare -a shutdownall +declare -a original_args +declare java_cmd=java +declare java_version +declare init_sbt_version=_to_be_replaced +declare sbt_default_mem=1024 +declare -r default_sbt_opts="" +declare -r default_java_opts="-Dfile.encoding=UTF-8" +declare sbt_verbose= +declare sbt_debug= +declare build_props_sbt_version= +declare use_sbtn= +declare no_server= +declare sbtn_command="$SBTN_CMD" +declare sbtn_version="1.10.0" + +### ------------------------------- ### +### Helper methods for BASH scripts ### +### ------------------------------- ### + +# Bash reimplementation of realpath to return the absolute path +realpathish () { +( + TARGET_FILE="$1" + FIX_CYGPATH="$2" + + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + + COUNT=0 + while [ -L "$TARGET_FILE" -a $COUNT -lt 100 ] + do + TARGET_FILE=$(readlink "$TARGET_FILE") + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + COUNT=$(($COUNT + 1)) + done + + TARGET_DIR="$(pwd -P)" + if [ "$TARGET_DIR" == "/" ]; then + TARGET_FILE="/$TARGET_FILE" + else + TARGET_FILE="$TARGET_DIR/$TARGET_FILE" + fi + + # make sure we grab the actual windows path, instead of cygwin's path. + if [[ "x$FIX_CYGPATH" != "x" ]]; then + echo "$(cygwinpath "$TARGET_FILE")" + else + echo "$TARGET_FILE" + fi +) +} + +# Uses uname to detect if we're in the odd cygwin environment. +is_cygwin() { + local os=$(uname -s) + case "$os" in + CYGWIN*) return 0 ;; + MINGW*) return 0 ;; + MSYS*) return 0 ;; + *) return 1 ;; + esac +} + +# TODO - Use nicer bash-isms here. +CYGWIN_FLAG=$(if is_cygwin; then echo true; else echo false; fi) + +# This can fix cygwin style /cygdrive paths so we get the +# windows style paths. +cygwinpath() { + local file="$1" + if [[ "$CYGWIN_FLAG" == "true" ]]; then #" + echo $(cygpath -w $file) + else + echo $file + fi +} + + +declare -r sbt_bin_dir="$(dirname "$(realpathish "$0")")" +declare -r sbt_home="$(dirname "$sbt_bin_dir")" + +echoerr () { + echo 1>&2 "$@" +} +vlog () { + [[ $sbt_verbose || $sbt_debug ]] && echoerr "$@" +} +dlog () { + [[ $sbt_debug ]] && echoerr "$@" +} + +jar_file () { + echo "$(cygwinpath "${sbt_home}/bin/sbt-launch.jar")" +} + +jar_url () { + local repo_base="$SBT_LAUNCH_REPO" + if [[ $repo_base == "" ]]; then + repo_base="https://repo1.maven.org/maven2" + fi + echo "$repo_base/org/scala-sbt/sbt-launch/$1/sbt-launch-$1.jar" +} + +download_url () { + local url="$1" + local jar="$2" + mkdir -p $(dirname "$jar") && { + if command -v curl > /dev/null; then + curl --silent -L "$url" --output "$jar" + elif command -v wget > /dev/null; then + wget --quiet -O "$jar" "$url" + fi + } && [[ -f "$jar" ]] +} + +acquire_sbt_jar () { + local launcher_sv="$1" + if [[ "$launcher_sv" == "" ]]; then + if [[ "$init_sbt_version" != "_to_be_replaced" ]]; then + launcher_sv="$init_sbt_version" + else + launcher_sv="$builtin_sbt_version" + fi + fi + local user_home && user_home=$(findProperty user.home) + download_jar="${user_home:-$HOME}/.cache/sbt/boot/sbt-launch/$launcher_sv/sbt-launch-$launcher_sv.jar" + if [[ -f "$download_jar" ]]; then + sbt_jar="$download_jar" + else + sbt_url=$(jar_url "$launcher_sv") + echoerr "downloading sbt launcher $launcher_sv" + download_url "$sbt_url" "${download_jar}.temp" + download_url "${sbt_url}.sha1" "${download_jar}.sha1" + if command -v shasum > /dev/null; then + if echo "$(cat "${download_jar}.sha1") ${download_jar}.temp" | shasum -c - > /dev/null; then + mv "${download_jar}.temp" "${download_jar}" + else + echoerr "failed to download launcher jar: $sbt_url (shasum mismatch)" + exit 2 + fi + else + mv "${download_jar}.temp" "${download_jar}" + fi + if [[ -f "$download_jar" ]]; then + sbt_jar="$download_jar" + else + echoerr "failed to download launcher jar: $sbt_url" + exit 2 + fi + fi +} + +acquire_sbtn () { + local sbtn_v="$1" + local user_home && user_home=$(findProperty user.home) + local p="${user_home:-$HOME}/.cache/sbt/boot/sbtn/$sbtn_v" + local target="$p/sbtn" + local archive_target= + local url= + local arch="x86_64" + if [[ "$OSTYPE" == "linux-gnu"* ]]; then + arch=$(uname -m) + if [[ "$arch" == "aarch64" ]] || [[ "$arch" == "x86_64" ]]; then + archive_target="$p/sbtn-${arch}-pc-linux-${sbtn_v}.tar.gz" + url="https://github.com/sbt/sbtn-dist/releases/download/v${sbtn_v}/sbtn-${arch}-pc-linux-${sbtn_v}.tar.gz" + else + echoerr "sbtn is not supported on $arch" + exit 2 + fi + elif [[ "$OSTYPE" == "darwin"* ]]; then + archive_target="$p/sbtn-universal-apple-darwin-${sbtn_v}.tar.gz" + url="https://github.com/sbt/sbtn-dist/releases/download/v${sbtn_v}/sbtn-universal-apple-darwin-${sbtn_v}.tar.gz" + elif [[ "$OSTYPE" == "cygwin" ]] || [[ "$OSTYPE" == "msys" ]] || [[ "$OSTYPE" == "win32" ]]; then + target="$p/sbtn.exe" + archive_target="$p/sbtn-x86_64-pc-win32-${sbtn_v}.zip" + url="https://github.com/sbt/sbtn-dist/releases/download/v${sbtn_v}/sbtn-x86_64-pc-win32-${sbtn_v}.zip" + else + echoerr "sbtn is not supported on $OSTYPE" + exit 2 + fi + + if [[ -f "$target" ]]; then + sbtn_command="$target" + else + echoerr "downloading sbtn ${sbtn_v} for ${arch}" + download_url "$url" "$archive_target" + if [[ "$OSTYPE" == "linux-gnu"* ]] || [[ "$OSTYPE" == "darwin"* ]]; then + tar zxf "$archive_target" --directory "$p" + else + unzip "$archive_target" -d "$p" + fi + sbtn_command="$target" + fi +} + +# execRunner should be called only once to give up control to java +execRunner () { + # print the arguments one to a line, quoting any containing spaces + [[ $sbt_verbose || $sbt_debug ]] && echo "# Executing command line:" && { + for arg; do + if printf "%s\n" "$arg" | grep -q ' '; then + printf "\"%s\"\n" "$arg" + else + printf "%s\n" "$arg" + fi + done + echo "" + } + + if [[ "$CYGWIN_FLAG" == "true" ]]; then + # In cygwin we loose the ability to re-hook stty if exec is used + # https://github.com/sbt/sbt-launcher-package/issues/53 + "$@" + else + exec "$@" + fi +} + +addJava () { + dlog "[addJava] arg = '$1'" + java_args=( "${java_args[@]}" "$1" ) +} +addSbt () { + dlog "[addSbt] arg = '$1'" + sbt_commands=( "${sbt_commands[@]}" "$1" ) +} +addResidual () { + dlog "[residual] arg = '$1'" + residual_args=( "${residual_args[@]}" "$1" ) +} +addDebugger () { + addJava "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$1" +} + +addMemory () { + dlog "[addMemory] arg = '$1'" + # evict memory related options + local xs=("${java_args[@]}") + java_args=() + for i in "${xs[@]}"; do + if ! [[ "${i}" == *-Xmx* ]] && ! [[ "${i}" == *-Xms* ]] && ! [[ "${i}" == *-Xss* ]] && ! [[ "${i}" == *-XX:MaxPermSize* ]] && ! [[ "${i}" == *-XX:MaxMetaspaceSize* ]] && ! [[ "${i}" == *-XX:ReservedCodeCacheSize* ]]; then + java_args+=("${i}") + fi + done + local ys=("${sbt_options[@]}") + sbt_options=() + for i in "${ys[@]}"; do + if ! [[ "${i}" == *-Xmx* ]] && ! [[ "${i}" == *-Xms* ]] && ! [[ "${i}" == *-Xss* ]] && ! [[ "${i}" == *-XX:MaxPermSize* ]] && ! [[ "${i}" == *-XX:MaxMetaspaceSize* ]] && ! [[ "${i}" == *-XX:ReservedCodeCacheSize* ]]; then + sbt_options+=("${i}") + fi + done + # a ham-fisted attempt to move some memory settings in concert + local mem=$1 + local codecache=$(( $mem / 8 )) + (( $codecache > 128 )) || codecache=128 + (( $codecache < 512 )) || codecache=512 + local class_metadata_size=$(( $codecache * 2 )) + if [[ -z $java_version ]]; then + java_version=$(jdk_version) + fi + + addJava "-Xms${mem}m" + addJava "-Xmx${mem}m" + addJava "-Xss4M" + addJava "-XX:ReservedCodeCacheSize=${codecache}m" + (( $java_version >= 8 )) || addJava "-XX:MaxPermSize=${class_metadata_size}m" +} + +addDefaultMemory() { + # if we detect any of these settings in ${JAVA_OPTS} or ${JAVA_TOOL_OPTIONS} we need to NOT output our settings. + # The reason is the Xms/Xmx, if they don't line up, cause errors. + if [[ "${java_args[@]}" == *-Xmx* ]] || \ + [[ "${java_args[@]}" == *-Xms* ]] || \ + [[ "${java_args[@]}" == *-Xss* ]] || \ + [[ "${java_args[@]}" == *-XX:+UseCGroupMemoryLimitForHeap* ]] || \ + [[ "${java_args[@]}" == *-XX:MaxRAM* ]] || \ + [[ "${java_args[@]}" == *-XX:InitialRAMPercentage* ]] || \ + [[ "${java_args[@]}" == *-XX:MaxRAMPercentage* ]] || \ + [[ "${java_args[@]}" == *-XX:MinRAMPercentage* ]]; then + : + elif [[ "${JAVA_TOOL_OPTIONS}" == *-Xmx* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-Xms* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-Xss* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-XX:+UseCGroupMemoryLimitForHeap* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-XX:MaxRAM* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-XX:InitialRAMPercentage* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-XX:MaxRAMPercentage* ]] || \ + [[ "${JAVA_TOOL_OPTIONS}" == *-XX:MinRAMPercentage* ]] ; then + : + elif [[ "${sbt_options[@]}" == *-Xmx* ]] || \ + [[ "${sbt_options[@]}" == *-Xms* ]] || \ + [[ "${sbt_options[@]}" == *-Xss* ]] || \ + [[ "${sbt_options[@]}" == *-XX:+UseCGroupMemoryLimitForHeap* ]] || \ + [[ "${sbt_options[@]}" == *-XX:MaxRAM* ]] || \ + [[ "${sbt_options[@]}" == *-XX:InitialRAMPercentage* ]] || \ + [[ "${sbt_options[@]}" == *-XX:MaxRAMPercentage* ]] || \ + [[ "${sbt_options[@]}" == *-XX:MinRAMPercentage* ]] ; then + : + else + addMemory $sbt_default_mem + fi +} + +addSbtScriptProperty () { + if [[ "${java_args[@]}" == *-Dsbt.script=* ]]; then + : + else + sbt_script=$0 + # Use // to replace all spaces with %20. + sbt_script=${sbt_script// /%20} + addJava "-Dsbt.script=$sbt_script" + fi +} + +require_arg () { + local type="$1" + local opt="$2" + local arg="$3" + if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then + echo "$opt requires <$type> argument" + exit 1 + fi +} + +is_function_defined() { + declare -f "$1" > /dev/null +} + +# parses JDK version from the -version output line. +# 8 for 1.8.0_nn, 9 for 9-ea etc, and "no_java" for undetected +jdk_version() { + local result + local lines=$("$java_cmd" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + local IFS=$'\n' + for line in $lines; do + if [[ (-z $result) && ($line = *"version \""*) ]] + then + local ver=$(echo $line | sed -e 's/.*version "\(.*\)"\(.*\)/\1/; 1q') + # on macOS sed doesn't support '?' + if [[ $ver = "1."* ]] + then + result=$(echo $ver | sed -e 's/1\.\([0-9]*\)\(.*\)/\1/; 1q') + else + result=$(echo $ver | sed -e 's/\([0-9]*\)\(.*\)/\1/; 1q') + fi + fi + done + if [[ -z $result ]] + then + result=no_java + fi + echo "$result" +} + +# Find the first occurrence of the given property name and returns its value by looking at: +# - properties set by command-line options, +# - JAVA_OPTS environment variable, +# - SBT_OPTS environment variable, +# - _JAVA_OPTIONS environment variable and +# - JAVA_TOOL_OPTIONS environment variable +# in that order. +findProperty() { + local -a java_opts_array + local -a sbt_opts_array + local -a _java_options_array + local -a java_tool_options_array + read -a java_opts_array <<< "$JAVA_OPTS" + read -a sbt_opts_array <<< "$SBT_OPTS" + read -a _java_options_array <<< "$_JAVA_OPTIONS" + read -a java_tool_options_array <<< "$JAVA_TOOL_OPTIONS" + + local args_to_check=( + "${java_args[@]}" + "${java_opts_array[@]}" + "${sbt_opts_array[@]}" + "${_java_options_array[@]}" + "${java_tool_options_array[@]}") + + for opt in "${args_to_check[@]}"; do + if [[ "$opt" == -D$1=* ]]; then + echo "${opt#-D$1=}" + return + fi + done +} + +# Extracts the preloaded directory from either -Dsbt.preloaded, -Dsbt.global.base or -Duser.home +# in that order. +getPreloaded() { + local preloaded && preloaded=$(findProperty sbt.preloaded) + [ "$preloaded" ] && echo "$preloaded" && return + + local global_base && global_base=$(findProperty sbt.global.base) + [ "$global_base" ] && echo "$global_base/preloaded" && return + + local user_home && user_home=$(findProperty user.home) + echo "${user_home:-$HOME}/.sbt/preloaded" +} + +syncPreloaded() { + local source_preloaded="$sbt_home/lib/local-preloaded/" + local target_preloaded="$(getPreloaded)" + if [[ "$init_sbt_version" == "" ]]; then + # FIXME: better $init_sbt_version detection + init_sbt_version="$(ls -1 "$source_preloaded/org/scala-sbt/sbt/")" + fi + [[ -f "$target_preloaded/org/scala-sbt/sbt/$init_sbt_version/" ]] || { + # lib/local-preloaded exists (This is optional) + [[ -d "$source_preloaded" ]] && { + command -v rsync >/dev/null 2>&1 && { + mkdir -p "$target_preloaded" + rsync --recursive --links --perms --times --ignore-existing "$source_preloaded" "$target_preloaded" || true + } + } + } +} + +# Detect that we have java installed. +checkJava() { + local required_version="$1" + # Now check to see if it's a good enough version + local good_enough="$(expr $java_version ">=" $required_version)" + if [[ "$java_version" == "" ]]; then + echo + echo "No Java Development Kit (JDK) installation was detected." + echo Please go to http://www.oracle.com/technetwork/java/javase/downloads/ and download. + echo + exit 1 + elif [[ "$good_enough" != "1" ]]; then + echo + echo "The Java Development Kit (JDK) installation you have is not up to date." + echo $script_name requires at least version $required_version+, you have + echo version $java_version + echo + echo Please go to http://www.oracle.com/technetwork/java/javase/downloads/ and download + echo a valid JDK and install before running $script_name. + echo + exit 1 + fi +} + +copyRt() { + local at_least_9="$(expr $java_version ">=" 9)" + if [[ "$at_least_9" == "1" ]]; then + # The grep for java9-rt-ext- matches the filename prefix printed in Export.java + java9_ext=$("$java_cmd" "${sbt_options[@]}" "${java_args[@]}" \ + -jar "$sbt_jar" --rt-ext-dir | grep java9-rt-ext- | tr -d '\r') + java9_rt=$(echo "$java9_ext/rt.jar") + vlog "[copyRt] java9_rt = '$java9_rt'" + if [[ ! -f "$java9_rt" ]]; then + echo copying runtime jar... + mkdir -p "$java9_ext" + "$java_cmd" \ + "${sbt_options[@]}" \ + "${java_args[@]}" \ + -jar "$sbt_jar" \ + --export-rt \ + "${java9_rt}" + fi + addJava "-Dscala.ext.dirs=${java9_ext}" + fi +} + +run() { + # Copy preloaded repo to user's preloaded directory + syncPreloaded + + # no jar? download it. + [[ -f "$sbt_jar" ]] || acquire_sbt_jar "$sbt_version" || { + exit 1 + } + + # TODO - java check should be configurable... + checkJava "6" + + # Java 9 support + copyRt + + # If we're in cygwin, we should use the windows config, and terminal hacks + if [[ "$CYGWIN_FLAG" == "true" ]]; then #" + stty -icanon min 1 -echo > /dev/null 2>&1 + addJava "-Djline.terminal=jline.UnixTerminal" + addJava "-Dsbt.cygwin=true" + fi + + if [[ $print_sbt_version ]]; then + execRunner "$java_cmd" -jar "$sbt_jar" "sbtVersion" | tail -1 | sed -e 's/\[info\]//g' + elif [[ $print_sbt_script_version ]]; then + echo "$init_sbt_version" + elif [[ $print_version ]]; then + execRunner "$java_cmd" -jar "$sbt_jar" "sbtVersion" | tail -1 | sed -e 's/\[info\]/sbt version in this project:/g' + echo "sbt script version: $init_sbt_version" + elif [[ $shutdownall ]]; then + local sbt_processes=( $(jps -v | grep sbt-launch | cut -f1 -d ' ') ) + for procId in "${sbt_processes[@]}"; do + kill -9 $procId + done + echo "shutdown ${#sbt_processes[@]} sbt processes" + else + # run sbt + execRunner "$java_cmd" \ + "${java_args[@]}" \ + "${sbt_options[@]}" \ + "${java_tool_options[@]}" \ + -jar "$sbt_jar" \ + "${sbt_commands[@]}" \ + "${residual_args[@]}" + fi + + exit_code=$? + + # Clean up the terminal from cygwin hacks. + if [[ "$CYGWIN_FLAG" == "true" ]]; then #" + stty icanon echo > /dev/null 2>&1 + fi + exit $exit_code +} + +declare -ra noshare_opts=(-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy) +declare -r sbt_opts_file=".sbtopts" +declare -r build_props_file="$(pwd)/project/build.properties" +declare -r etc_sbt_opts_file="/etc/sbt/sbtopts" +# this allows /etc/sbt/sbtopts location to be changed +declare -r etc_file="${SBT_ETC_FILE:-$etc_sbt_opts_file}" +declare -r dist_sbt_opts_file="${sbt_home}/conf/sbtopts" +declare -r win_sbt_opts_file="${sbt_home}/conf/sbtconfig.txt" +declare sbt_jar="$(jar_file)" + +usage() { + cat < path to global settings/plugins directory (default: ~/.sbt) + --sbt-boot path to shared boot directory (default: ~/.sbt/boot in 0.11 series) + --sbt-cache path to global cache directory (default: operating system specific) + --ivy path to local Ivy repository (default: ~/.ivy2) + --mem set memory options (default: $sbt_default_mem) + --no-share use all local caches; no sharing + --no-global uses global caches, but does not use global ~/.sbt directory. + --jvm-debug Turn on JVM debugging, open at the given port. + --batch disable interactive mode + + # sbt version (default: from project/build.properties if present, else latest release) + --sbt-version use the specified version of sbt + --sbt-jar use the specified jar as the sbt launcher + + --java-home alternate JAVA_HOME + + # jvm options and output control + JAVA_OPTS environment variable, if unset uses "$default_java_opts" + .jvmopts if this file exists in the current directory, its contents + are appended to JAVA_OPTS + SBT_OPTS environment variable, if unset uses "$default_sbt_opts" + .sbtopts if this file exists in the current directory, its contents + are prepended to the runner args + /etc/sbt/sbtopts if this file exists, it is prepended to the runner args + -Dkey=val pass -Dkey=val directly to the java runtime + -J-X pass option -X directly to the java runtime + (-J is stripped) + +In the case of duplicated or conflicting options, the order above +shows precedence: JAVA_OPTS lowest, command line options highest. +EOM +} + +process_my_args () { + while [[ $# -gt 0 ]]; do + case "$1" in + -batch|--batch) exec + + -sbt-create|--sbt-create) sbt_create=true && shift ;; + + new) sbt_new=true && addResidual "$1" && shift ;; + + *) addResidual "$1" && shift ;; + esac + done + + # Now, ensure sbt version is used. + [[ "${sbt_version}XXX" != "XXX" ]] && addJava "-Dsbt.version=$sbt_version" + + # Confirm a user's intent if the current directory does not look like an sbt + # top-level directory and neither the -sbt-create option nor the "new" + # command was given. + [[ -f ./build.sbt || -d ./project || -n "$sbt_create" || -n "$sbt_new" ]] || { + echo "[warn] Neither build.sbt nor a 'project' directory in the current directory: $(pwd)" + while true; do + echo 'c) continue' + echo 'q) quit' + + read -p '? ' || exit 1 + case "$REPLY" in + c|C) break ;; + q|Q) exit 1 ;; + esac + done + } +} + +## map over argument array. this is used to process both command line arguments and SBT_OPTS +map_args () { + local options=() + local commands=() + while [[ $# -gt 0 ]]; do + case "$1" in + -no-colors|--no-colors) options=( "${options[@]}" "-Dsbt.log.noformat=true" ) && shift ;; + -timings|--timings) options=( "${options[@]}" "-Dsbt.task.timings=true" "-Dsbt.task.timings.on.shutdown=true" ) && shift ;; + -traces|--traces) options=( "${options[@]}" "-Dsbt.traces=true" ) && shift ;; + --supershell=*) options=( "${options[@]}" "-Dsbt.supershell=${1:13}" ) && shift ;; + -supershell=*) options=( "${options[@]}" "-Dsbt.supershell=${1:12}" ) && shift ;; + -no-server|--no-server) options=( "${options[@]}" "-Dsbt.io.virtual=false" "-Dsbt.server.autostart=false" ) && shift ;; + --color=*) options=( "${options[@]}" "-Dsbt.color=${1:8}" ) && shift ;; + -color=*) options=( "${options[@]}" "-Dsbt.color=${1:7}" ) && shift ;; + -no-share|--no-share) options=( "${options[@]}" "${noshare_opts[@]}" ) && shift ;; + -no-global|--no-global) options=( "${options[@]}" "-Dsbt.global.base=$(pwd)/project/.sbtboot" ) && shift ;; + -ivy|--ivy) require_arg path "$1" "$2" && options=( "${options[@]}" "-Dsbt.ivy.home=$2" ) && shift 2 ;; + -sbt-boot|--sbt-boot) require_arg path "$1" "$2" && options=( "${options[@]}" "-Dsbt.boot.directory=$2" ) && shift 2 ;; + -sbt-dir|--sbt-dir) require_arg path "$1" "$2" && options=( "${options[@]}" "-Dsbt.global.base=$2" ) && shift 2 ;; + -debug|--debug) commands=( "${commands[@]}" "-debug" ) && shift ;; + -debug-inc|--debug-inc) options=( "${options[@]}" "-Dxsbt.inc.debug=true" ) && shift ;; + *) options=( "${options[@]}" "$1" ) && shift ;; + esac + done + declare -p options + declare -p commands +} + +process_args () { + while [[ $# -gt 0 ]]; do + case "$1" in + -h|-help|--help) usage; exit 1 ;; + -v|-verbose|--verbose) sbt_verbose=1 && shift ;; + -V|-version|--version) print_version=1 && shift ;; + --numeric-version) print_sbt_version=1 && shift ;; + --script-version) print_sbt_script_version=1 && shift ;; + shutdownall) shutdownall=1 && shift ;; + -d|-debug|--debug) sbt_debug=1 && addSbt "-debug" && shift ;; + -client|--client) use_sbtn=1 && shift ;; + --server) use_sbtn=0 && shift ;; + + -mem|--mem) require_arg integer "$1" "$2" && addMemory "$2" && shift 2 ;; + -jvm-debug|--jvm-debug) require_arg port "$1" "$2" && addDebugger $2 && shift 2 ;; + -batch|--batch) exec = 2 )) || ( (( $sbtBinaryV_1 >= 1 )) && (( $sbtBinaryV_2 >= 4 )) ); then + if [[ "$use_sbtn" == "1" ]]; then + echo "true" + else + echo "false" + fi + else + echo "false" + fi +} + +runNativeClient() { + vlog "[debug] running native client" + detectNativeClient + [[ -f "$sbtn_command" ]] || acquire_sbtn "$sbtn_version" || { + exit 1 + } + for i in "${!original_args[@]}"; do + if [[ "${original_args[i]}" = "--client" ]]; then + unset 'original_args[i]' + fi + done + sbt_script=$0 + sbt_script=${sbt_script/ /%20} + execRunner "$sbtn_command" "--sbt-script=$sbt_script" "${original_args[@]}" +} + +original_args=("$@") + +# Here we pull in the default settings configuration. +[[ -f "$dist_sbt_opts_file" ]] && set -- $(loadConfigFile "$dist_sbt_opts_file") "$@" + +# Here we pull in the global settings configuration. +[[ -f "$etc_file" ]] && set -- $(loadConfigFile "$etc_file") "$@" + +# Pull in the project-level config file, if it exists. +[[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@" + +# Pull in the project-level java config, if it exists. +[[ -f ".jvmopts" ]] && export JAVA_OPTS="$JAVA_OPTS $(loadConfigFile .jvmopts)" + +# Pull in default JAVA_OPTS +[[ -z "${JAVA_OPTS// }" ]] && export JAVA_OPTS="$default_java_opts" + +[[ -f "$build_props_file" ]] && loadPropFile "$build_props_file" + +java_args=($JAVA_OPTS) +sbt_options0=(${SBT_OPTS:-$default_sbt_opts}) +java_tool_options=($JAVA_TOOL_OPTIONS) +if [[ "$SBT_NATIVE_CLIENT" == "true" ]]; then + use_sbtn=1 +fi + +# Split SBT_OPTS into options/commands +miniscript=$(map_args "${sbt_options0[@]}") && eval "${miniscript/options/sbt_options}" && \ +eval "${miniscript/commands/sbt_additional_commands}" + +# Combine command line options/commands and commands from SBT_OPTS +miniscript=$(map_args "$@") && eval "${miniscript/options/cli_options}" && eval "${miniscript/commands/cli_commands}" +args1=( "${cli_options[@]}" "${cli_commands[@]}" "${sbt_additional_commands[@]}" ) + +# process the combined args, then reset "$@" to the residuals +process_args "${args1[@]}" +vlog "[sbt_options] $(declare -p sbt_options)" + +if [[ "$(isRunNativeClient)" == "true" ]]; then + set -- "${residual_args[@]}" + argumentCount=$# + runNativeClient +else + java_version="$(jdk_version)" + vlog "[process_args] java_version = '$java_version'" + addDefaultMemory + addSbtScriptProperty + set -- "${residual_args[@]}" + argumentCount=$# + run +fi