Skip to content

Commit

Permalink
flatMapLatestFlow function
Browse files Browse the repository at this point in the history
  • Loading branch information
motorro committed Dec 5, 2023
1 parent 1bb0f28 commit d6ba59e
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ import com.motorro.rxlcemodel.lce.combine
import com.motorro.rxlcemodel.lce.map
import coroutinesRunCatching
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.transform


/**
Expand Down Expand Up @@ -128,6 +138,25 @@ fun <DATA_1: Any, DATA_2: Any> Flow<LceState<DATA_1>>.flatMapSingleData(mapper:
}
}

/**
* Maps each [DATA_1] to flow for [DATA_2] and combines with original state.
* If error occurs in [mapper] emits [LceState.Error].
* Example: Using original [DATA_1] as a parameter switch to new [DATA_2] LCE flow
* @param DATA_1 Source data type
* @param DATA_2 Resulting data type
* @param mapper Data mapper
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun <DATA_1: Any, DATA_2: Any> Flow<LceState<DATA_1>>.flatMapLatestFlow(mapper: suspend (data: DATA_1) -> Flow<LceState<DATA_2>>): Flow<LceState<DATA_2>> =
flatMapLatest { state1: LceState<DATA_1> ->
when(val data1 = state1.data) {
null -> flowOf(state1.combine(LceState.Loading(null, false)) { _,_ -> null })
else -> coroutinesRunCatching { mapper(data1).map { state1.combine(it) { _, data2 -> data2 } } }.getOrElse {
flowOf(LceState.Error(null, false, it))
}
}
}

/**
* Creates a use-case wrapper that converts [DATA_1] to [DATA_2]
* @receiver Original model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ package com.motorro.rxlcemodel.coroutines
import com.motorro.rxlcemodel.lce.LceState
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -264,6 +273,58 @@ class LceUtilsKtTest {
assertEquals(1, result.size)
}

@Test
fun mapsLceFlowToOtherFlow() = runTest {
val error = Exception("error")

fun mapper(input: Int) = flowOf(
LceState.Content(input.toString(), true)
)

val source = flowOf(
LceState.Loading(null, false),
LceState.Loading(1, true),
LceState.Content(2, true),
LceState.Error(null, false, error),
LceState.Error(3, true, error),
LceState.Terminated
)

assertEquals(
listOf(
LceState.Loading(null, false),
LceState.Loading("1", true),
LceState.Content("2", true),
LceState.Error(null, false, error),
LceState.Error("3", true, error),
LceState.Terminated
),
source.flatMapLatestFlow(::mapper).toList()
)
}

@Test
fun catchesMapperErrorInFlowMapper() = runTest {
val error = Exception("error")

suspend fun mapper(input: Int) = suspendCoroutine<Flow<LceState<String>>> {
it.resumeWithException(error)
}

val source = flowOf(
LceState.Loading(null, false),
LceState.Loading(1, true)
)

assertEquals(
listOf(
LceState.Loading(null, false),
LceState.Error(null, false, error),
),
source.flatMapLatestFlow(::mapper).toList()
)
}

@Test
fun mapsUseCaseData() = runTest {
val error = Exception()
Expand Down
17 changes: 17 additions & 0 deletions rx/src/main/kotlin/com/motorro/rxlcemodel/rx/lceUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,23 @@ fun <DATA_1: Any, DATA_2: Any> Observable<LceState<DATA_1>>.flatMapSingleData(ma
}
}

/**
* Maps each [DATA_1] to flow for [DATA_2] and combines with original state.
* If error occurs in [mapper] emits [LceState.Error].
* Example: Using original [DATA_1] as a parameter switch to new [DATA_2] LCE flow
* @param DATA_1 Source data type
* @param DATA_2 Resulting data type
* @param mapper Data mapper
*/
fun <DATA_1: Any, DATA_2: Any> Observable<LceState<DATA_1>>.flatMapLatestFlow(mapper: (data: DATA_1) -> Observable<LceState<DATA_2>>): Observable<LceState<DATA_2>> {
return switchMap { state1 ->
when(val data1 = state1.data) {
null -> Observable.just(state1.combine(LceState.Loading(null, false)) { _, _ -> null })
else -> mapper(data1).map { state1.combine(it) { _, data2 -> data2 } }.errorToLce()
}
}
}

/**
* Creates a use-case wrapper that converts [DATA_1] to [DATA_2]
* @receiver Original model
Expand Down
51 changes: 50 additions & 1 deletion rx/src/test/kotlin/com/motorro/rxlcemodel/rx/LceUtilsKtTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package com.motorro.rxlcemodel.rx

import com.motorro.rxlcemodel.lce.LceState
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
Expand All @@ -22,7 +23,6 @@ import io.reactivex.rxjava3.subjects.Subject
import org.junit.Test
import java.io.IOException
import kotlin.test.assertEquals
import com.motorro.rxlcemodel.lce.LceState

class LceUtilsKtTest {
companion object {
Expand Down Expand Up @@ -229,6 +229,55 @@ class LceUtilsKtTest {
)
}

@Test
fun mapsLceFlowToOtherFlow() {
val error = IOException("error")

fun mapper(input: Int) = Observable.just<LceState<String>>(
LceState.Content(input.toString(), true)
)

val source = Observable.just(
LceState.Loading(null, false),
LceState.Loading(1, true),
LceState.Content(2, true),
LceState.Error(null, false, error),
LceState.Error(3, true, error),
LceState.Terminated
)

source.flatMapLatestFlow(::mapper)
.test()
.assertNoErrors()
.assertValues(
LceState.Loading(null, false),
LceState.Loading("1", true),
LceState.Content("2", true),
LceState.Error(null, false, error),
LceState.Error("3", true, error),
LceState.Terminated
)
}

@Test
fun catchesMapperErrorInFlowMapper() {
val error = Exception("error")

fun mapper(input: Int) = Observable.error<LceState<String>>(error)

val source = Observable.just<LceState<Int>>(
LceState.Loading(null, false),
LceState.Loading(1, true)
)

source.flatMapLatestFlow(::mapper)
.test()
.assertNoErrors()
.assertValues(
LceState.Loading(null, false),
LceState.Error(null, false, error)
)
}

@Test
fun createsRefreshingStream() {
Expand Down

0 comments on commit d6ba59e

Please sign in to comment.