diff --git a/coroutines/src/commonMain/kotlin/com/motorro/rxlcemodel/coroutines/lceUtils.kt b/coroutines/src/commonMain/kotlin/com/motorro/rxlcemodel/coroutines/lceUtils.kt index d6c0232b..f53d264e 100644 --- a/coroutines/src/commonMain/kotlin/com/motorro/rxlcemodel/coroutines/lceUtils.kt +++ b/coroutines/src/commonMain/kotlin/com/motorro/rxlcemodel/coroutines/lceUtils.kt @@ -18,7 +18,17 @@ import com.motorro.rxlcemodel.lce.combine import com.motorro.rxlcemodel.lce.map import coroutinesRunCatching import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.transform /** @@ -128,6 +138,25 @@ fun Flow>.flatMapSingleData(mapper: } } +/** + * Maps each [DATA_1] to flow for [DATA_2] and combines with original state. + * If error occurs in [mapper] emits [LceState.Error]. + * Example: Using original [DATA_1] as a parameter switch to new [DATA_2] LCE flow + * @param DATA_1 Source data type + * @param DATA_2 Resulting data type + * @param mapper Data mapper + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun Flow>.flatMapLatestFlow(mapper: suspend (data: DATA_1) -> Flow>): Flow> = + flatMapLatest { state1: LceState -> + when(val data1 = state1.data) { + null -> flowOf(state1.combine(LceState.Loading(null, false)) { _,_ -> null }) + else -> coroutinesRunCatching { mapper(data1).map { state1.combine(it) { _, data2 -> data2 } } }.getOrElse { + flowOf(LceState.Error(null, false, it)) + } + } + } + /** * Creates a use-case wrapper that converts [DATA_1] to [DATA_2] * @receiver Original model diff --git a/coroutines/src/commonTest/kotlin/com/motorro/rxlcemodel/coroutines/LceUtilsKtTest.kt b/coroutines/src/commonTest/kotlin/com/motorro/rxlcemodel/coroutines/LceUtilsKtTest.kt index fbea3656..db93622a 100644 --- a/coroutines/src/commonTest/kotlin/com/motorro/rxlcemodel/coroutines/LceUtilsKtTest.kt +++ b/coroutines/src/commonTest/kotlin/com/motorro/rxlcemodel/coroutines/LceUtilsKtTest.kt @@ -16,10 +16,19 @@ package com.motorro.rxlcemodel.coroutines import com.motorro.rxlcemodel.lce.LceState import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -264,6 +273,58 @@ class LceUtilsKtTest { assertEquals(1, result.size) } + @Test + fun mapsLceFlowToOtherFlow() = runTest { + val error = Exception("error") + + fun mapper(input: Int) = flowOf( + LceState.Content(input.toString(), true) + ) + + val source = flowOf( + LceState.Loading(null, false), + LceState.Loading(1, true), + LceState.Content(2, true), + LceState.Error(null, false, error), + LceState.Error(3, true, error), + LceState.Terminated + ) + + assertEquals( + listOf( + LceState.Loading(null, false), + LceState.Loading("1", true), + LceState.Content("2", true), + LceState.Error(null, false, error), + LceState.Error("3", true, error), + LceState.Terminated + ), + source.flatMapLatestFlow(::mapper).toList() + ) + } + + @Test + fun catchesMapperErrorInFlowMapper() = runTest { + val error = Exception("error") + + suspend fun mapper(input: Int) = suspendCoroutine>> { + it.resumeWithException(error) + } + + val source = flowOf( + LceState.Loading(null, false), + LceState.Loading(1, true) + ) + + assertEquals( + listOf( + LceState.Loading(null, false), + LceState.Error(null, false, error), + ), + source.flatMapLatestFlow(::mapper).toList() + ) + } + @Test fun mapsUseCaseData() = runTest { val error = Exception() diff --git a/rx/src/main/kotlin/com/motorro/rxlcemodel/rx/lceUtils.kt b/rx/src/main/kotlin/com/motorro/rxlcemodel/rx/lceUtils.kt index df6d7f68..95fb5d82 100644 --- a/rx/src/main/kotlin/com/motorro/rxlcemodel/rx/lceUtils.kt +++ b/rx/src/main/kotlin/com/motorro/rxlcemodel/rx/lceUtils.kt @@ -130,6 +130,23 @@ fun Observable>.flatMapSingleData(ma } } +/** + * Maps each [DATA_1] to flow for [DATA_2] and combines with original state. + * If error occurs in [mapper] emits [LceState.Error]. + * Example: Using original [DATA_1] as a parameter switch to new [DATA_2] LCE flow + * @param DATA_1 Source data type + * @param DATA_2 Resulting data type + * @param mapper Data mapper + */ +fun Observable>.flatMapLatestFlow(mapper: (data: DATA_1) -> Observable>): Observable> { + return switchMap { state1 -> + when(val data1 = state1.data) { + null -> Observable.just(state1.combine(LceState.Loading(null, false)) { _, _ -> null }) + else -> mapper(data1).map { state1.combine(it) { _, data2 -> data2 } }.errorToLce() + } + } +} + /** * Creates a use-case wrapper that converts [DATA_1] to [DATA_2] * @receiver Original model diff --git a/rx/src/test/kotlin/com/motorro/rxlcemodel/rx/LceUtilsKtTest.kt b/rx/src/test/kotlin/com/motorro/rxlcemodel/rx/LceUtilsKtTest.kt index 9f5caadf..4499c48f 100644 --- a/rx/src/test/kotlin/com/motorro/rxlcemodel/rx/LceUtilsKtTest.kt +++ b/rx/src/test/kotlin/com/motorro/rxlcemodel/rx/LceUtilsKtTest.kt @@ -13,6 +13,7 @@ package com.motorro.rxlcemodel.rx +import com.motorro.rxlcemodel.lce.LceState import io.reactivex.rxjava3.core.Completable import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Single @@ -22,7 +23,6 @@ import io.reactivex.rxjava3.subjects.Subject import org.junit.Test import java.io.IOException import kotlin.test.assertEquals -import com.motorro.rxlcemodel.lce.LceState class LceUtilsKtTest { companion object { @@ -229,6 +229,55 @@ class LceUtilsKtTest { ) } + @Test + fun mapsLceFlowToOtherFlow() { + val error = IOException("error") + + fun mapper(input: Int) = Observable.just>( + LceState.Content(input.toString(), true) + ) + + val source = Observable.just( + LceState.Loading(null, false), + LceState.Loading(1, true), + LceState.Content(2, true), + LceState.Error(null, false, error), + LceState.Error(3, true, error), + LceState.Terminated + ) + + source.flatMapLatestFlow(::mapper) + .test() + .assertNoErrors() + .assertValues( + LceState.Loading(null, false), + LceState.Loading("1", true), + LceState.Content("2", true), + LceState.Error(null, false, error), + LceState.Error("3", true, error), + LceState.Terminated + ) + } + + @Test + fun catchesMapperErrorInFlowMapper() { + val error = Exception("error") + + fun mapper(input: Int) = Observable.error>(error) + + val source = Observable.just>( + LceState.Loading(null, false), + LceState.Loading(1, true) + ) + + source.flatMapLatestFlow(::mapper) + .test() + .assertNoErrors() + .assertValues( + LceState.Loading(null, false), + LceState.Error(null, false, error) + ) + } @Test fun createsRefreshingStream() {