From be1ec04e470cbf600f01ee72a1d362937c800fd5 Mon Sep 17 00:00:00 2001 From: motorro Date: Tue, 14 Apr 2020 10:11:28 +0300 Subject: [PATCH] flatMapSingleData function Resolves: #21 --- .../com/motorro/rxlcemodel/base/LceModel.kt | 40 +++++++++++-- .../motorro/rxlcemodel/base/LceModelKtTest.kt | 60 +++++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/base/src/main/kotlin/com/motorro/rxlcemodel/base/LceModel.kt b/base/src/main/kotlin/com/motorro/rxlcemodel/base/LceModel.kt index 8bdaf40d..50f58f5e 100644 --- a/base/src/main/kotlin/com/motorro/rxlcemodel/base/LceModel.kt +++ b/base/src/main/kotlin/com/motorro/rxlcemodel/base/LceModel.kt @@ -13,13 +13,14 @@ package com.motorro.rxlcemodel.base -import com.motorro.rxlcemodel.base.LceState.Loading +import com.motorro.rxlcemodel.base.LceState.* import com.motorro.rxlcemodel.base.service.CacheService import com.motorro.rxlcemodel.base.service.NetService import com.motorro.rxlcemodel.base.service.ServiceSet import com.motorro.rxlcemodel.base.service.UpdatingServiceSet import io.reactivex.Completable import io.reactivex.Observable +import io.reactivex.Single /** * A model interface to load data and transmit it to subscribers along with loading operation state @@ -122,9 +123,9 @@ fun LceModel.withUpdates(ser * @param predicate A predicate to check error state. If predicate returns true, the stream * is terminated with [LceState.Error.error] */ -fun Observable>.terminateOnError(predicate: (LceState.Error) -> Boolean): Observable> = map { state -> +fun Observable>.terminateOnError(predicate: (Error) -> Boolean): Observable> = map { state -> when { - state is LceState.Error && predicate(state) -> throw state.error + state is Error && predicate(state) -> throw state.error else -> state } } @@ -149,7 +150,7 @@ val Observable>.stopOnEmptyErrors: Observable Observable>.getData(terminateOnError: (LceState.Error) -> Boolean): Observable = +fun Observable>.getData(terminateOnError: (Error) -> Boolean): Observable = terminateOnError(terminateOnError) .switchMap { val data = it.data @@ -200,6 +201,37 @@ val Observable>.validData: Observable } .distinctUntilChanged() +/** + * Maps each [DATA_1] to single for [DATA_2] and merges back to LceState. + * If error occurs in [mapper] emits [LceState.Error]. + * Example: load some [DATA_2] from server using original [DATA_1] as a parameter. + * @param DATA_1 Source data type + * @param DATA_2 Resulting data type + * @param mapper Data mapper + */ +fun Observable>.flatMapSingleData(mapper: (data: DATA_1) -> Single): Observable> { + + fun dataMapper(data1: DATA_1, block: (DATA_2) -> LceState) = mapper(data1) + .map { block(it) } + .onErrorReturn { Error(null, false, it) } + + fun nullableMapper(data1: DATA_1?, block: (DATA_2?) -> LceState) = if (null == data1) { + Single.just(block(null)) + } else { + dataMapper(data1, block) + } + + @Suppress("RemoveExplicitTypeArguments") + return flatMapSingle> { state -> + when (state) { + is Loading -> nullableMapper(state.data) { Loading(it, state.dataIsValid, state.type) } + is Content -> dataMapper(state.data) { Content(it, state.dataIsValid) } + is Error -> nullableMapper(state.data) { Error(it, state.dataIsValid, state.error) } + is Terminated -> Single.just(Terminated()) + } + } +} + /** * Creates a model wrapper that converts [DATA_1] to [DATA_2] * @receiver Original model diff --git a/base/src/test/kotlin/com/motorro/rxlcemodel/base/LceModelKtTest.kt b/base/src/test/kotlin/com/motorro/rxlcemodel/base/LceModelKtTest.kt index fc23e06e..43bee410 100644 --- a/base/src/test/kotlin/com/motorro/rxlcemodel/base/LceModelKtTest.kt +++ b/base/src/test/kotlin/com/motorro/rxlcemodel/base/LceModelKtTest.kt @@ -15,9 +15,11 @@ package com.motorro.rxlcemodel.base import io.reactivex.Completable import io.reactivex.Observable +import io.reactivex.Single import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject import org.junit.Test +import java.io.IOException class LceModelKtTest { companion object { @@ -166,6 +168,64 @@ class LceModelKtTest { } } + @Test + fun flatMapsSingleData() { + val error = IOException("error") + + fun mapper(input: Int) = Single.just(input.toString()) + + val source = Observable.just( + LceState.Loading(null, false), + LceState.Loading(1, true), + LceState.Content(2, true), + LceState.Error(null, false, error), + LceState.Error(3, true, error), + LceState.Terminated() + ) + + source.flatMapSingleData(::mapper) + .test() + .assertNoErrors() + .assertValues( + LceState.Loading(null, false), + LceState.Loading("1", true), + LceState.Content("2", true), + LceState.Error(null, false, error), + LceState.Error("3", true, error), + LceState.Terminated() + ) + } + + @Test + fun returnsErrorIfSingleDataMapperFails() { + val error1 = IOException("error 1") + val error2 = IOException("error 2") + + @Suppress("UNUSED_PARAMETER") + fun mapper(input: Int) = Single.error(error2) + + val source = Observable.just( + LceState.Loading(null, false), + LceState.Loading(1, true), + LceState.Content(2, true), + LceState.Error(null, false, error1), + LceState.Error(3, true, error1), + LceState.Terminated() + ) + + source.flatMapSingleData(::mapper) + .test() + .assertNoErrors() + .assertValues( + LceState.Loading(null, false), + LceState.Error(null, false, error2), + LceState.Error(null, false, error2), + LceState.Error(null, false, error1), + LceState.Error(null, false, error2), + LceState.Terminated() + ) + } + @Test fun createsRefreshingStream() { val value1 = LceState.Content(1, true)