Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the happy path in parXOrAccumulate #3370

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public final class arrow/fx/coroutines/ExitCase$Failure : arrow/fx/coroutines/Ex
public fun toString ()Ljava/lang/String;
}

public final class arrow/fx/coroutines/FailureValue {
public static final field INSTANCE Larrow/fx/coroutines/FailureValue;
public final fun bindNel (Larrow/core/raise/RaiseAccumulate;Ljava/lang/Object;)Ljava/lang/Object;
public final fun failureValue (Ljava/lang/Object;)Ljava/lang/Object;
public final fun mightFail (Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/FlowExtensions {
public static final fun fixedRate (JZLkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun fixedRate$default (JZLkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package arrow.fx.coroutines
import arrow.core.raise.RaiseAccumulate
import arrow.core.Either
import arrow.core.NonEmptyList
import arrow.core.flattenOrAccumulate
import arrow.core.mapOrAccumulate
import arrow.core.raise.Raise
import arrow.core.raise.either
import arrow.fx.coroutines.FailureValue.bindNel
import arrow.fx.coroutines.FailureValue.mightFail
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
Expand Down Expand Up @@ -59,16 +60,13 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}
parMap(context, concurrency) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate(combine)
}.mapOrAccumulate(combine) { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
Expand All @@ -77,13 +75,13 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
parMap(context) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate(combine)
}.mapOrAccumulate(combine) { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
Expand All @@ -92,28 +90,25 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}
parMap(context, concurrency) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate()
}.mapOrAccumulate { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
parMap(context) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate()
}.mapOrAccumulate { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}
Loading
Loading