diff --git a/build.gradle.kts b/build.gradle.kts index 02f63c0..fe6bf9b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -39,7 +39,7 @@ val examplesImplementation by configurations.getting { } dependencies { - api("io.reactivex.rxjava3:rxjava:3.0.6") + api("io.reactivex.rxjava3:rxjava:3.1.1") implementation(kotlin("stdlib")) testImplementation("org.funktionale:funktionale-partials:1.0.0-final") diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt index c9e1f3e..efac8b8 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.functions.* +import io.reactivex.rxjava3.kotlin.internal.FlowableMapNotNullFlowable import org.reactivestreams.Publisher @@ -201,3 +202,14 @@ fun Flowable>.toMultimap(): Single Iterable>.concatAll(): Flowable = Flowable.concat(this) + +/** + * Returns an [Flowable] that emits the non-`null` results + * of applying the given [transform] function to each element of the original Observable. + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +@BackpressureSupport(BackpressureKind.PASS_THROUGH) +fun Flowable.mapNotNull(transform: (T) -> R?): Flowable = + FlowableMapNotNullFlowable(this, transform) + diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt new file mode 100644 index 0000000..5491155 --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt @@ -0,0 +1,133 @@ +package io.reactivex.rxjava3.kotlin.internal + +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber +import io.reactivex.rxjava3.operators.ConditionalSubscriber +import org.reactivestreams.Subscriber + +internal class FlowableMapNotNullFlowable( + @JvmField + internal val source: Flowable, + @JvmField + internal val transform: (T) -> R?, +) : Flowable() { + override fun subscribeActual(subscriber: Subscriber) { + if (subscriber is ConditionalSubscriber<*>) { + source.subscribe( + MapNotNullConditionalSubscriber( + subscriber as ConditionalSubscriber, + transform + ) + ) + } else { + source.subscribe(MapNotNullSubscriber(subscriber, transform)) + } + } + + internal class MapNotNullSubscriber( + downstream: Subscriber, + @JvmField + internal val transform: (T) -> R? + ) : BasicFuseableSubscriber(downstream), + ConditionalSubscriber { + override fun onNext(t: T) { + if (!tryOnNext(t)) { + upstream.request(1) + } + } + + override fun tryOnNext(t: T): Boolean { + if (done) { + return true + } + if (sourceMode != NONE) { + downstream.onNext(null) + return true + } + + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return true + } + + if (result !== null) { + downstream.onNext(result) + return true + } + return false + } + + override fun requestFusion(mode: Int): Int = transitiveBoundaryFusion(mode) + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qs.poll() ?: return null + val result = transform(item) + if (result != null) { + return result + } + + if (sourceMode == ASYNC) { + qs.request(1) + } + } + } + } + + internal class MapNotNullConditionalSubscriber( + downstream: ConditionalSubscriber, + @JvmField + internal val transform: (T) -> R?, + ) : BasicFuseableConditionalSubscriber(downstream) { + override fun onNext(t: T) { + if (!tryOnNext(t)) { + upstream.request(1) + } + } + + override fun tryOnNext(t: T): Boolean { + if (done) { + return true + } + if (sourceMode != NONE) { + downstream.onNext(null) + return true + } + + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return true + } + + return if (result != null) { + downstream.tryOnNext(result) + } else { + false + } + } + + override fun requestFusion(mode: Int): Int { + return transitiveBoundaryFusion(mode) + } + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qs.poll() ?: return null + val result = transform(item) + if (result != null) { + return result + } + if (sourceMode == ASYNC) { + qs.request(1) + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt new file mode 100644 index 0000000..fdc1a1f --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt @@ -0,0 +1,56 @@ +package io.reactivex.rxjava3.kotlin.internal + +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Observer +import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver + +internal class MapNotNullObservable( + @JvmField + internal val source: Observable, + @JvmField + internal val transform: (T) -> R? +) : Observable() { + override fun subscribeActual(observer: Observer) { + source.subscribe(MapOptionalObserver(observer, transform)) + } + + internal class MapOptionalObserver( + downstream: Observer, + @JvmField + internal val transform: (T) -> R? + ) : BasicFuseableObserver(downstream) { + override fun onNext(t: T) { + if (done) { + return + } + if (sourceMode != NONE) { + downstream.onNext(null) + return + } + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return + } + if (result !== null) { + downstream.onNext(result) + } + } + + override fun requestFusion(mode: Int): Int { + return transitiveBoundaryFusion(mode) + } + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qd.poll() ?: return null + val result = transform(item) + if (result !== null) { + return result + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt index ed246db..c61c964 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt @@ -7,6 +7,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.ObservableSource import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.kotlin.internal.MapNotNullObservable @CheckReturnValue @@ -177,3 +178,13 @@ fun Observable>.toMultimap(): Single Iterable>.concatAll(): Observable = Observable.concat(this) + +/** + * Returns an [Observable] that emits the non-`null` results + * of applying the given [transform] function to each element of the original Observable. + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Observable.mapNotNull(transform: (T) -> R?): Observable = + MapNotNullObservable(this, transform) + diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt index da615ec..5c88366 100755 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt @@ -215,14 +215,15 @@ class FlowableTest { Flowables.zip( Flowable.just("Alpha", "Beta", "Gamma"), - Flowable.range(1,4), - Flowable.just(100,200,300) + Flowable.range(1, 4), + Flowable.just(100, 200, 300) ).subscribe(testSubscriber) - testSubscriber.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300)) + testSubscriber.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300)) } - @Test fun testConcatAll() { + @Test + fun testConcatAll() { (0 until 10) .map { Flowable.just(it) } .concatAll() @@ -231,4 +232,43 @@ class FlowableTest { Assert.assertEquals((0 until 10).toList(), result) } } + + @Test + fun testMapNotNull() { + // map many items + Flowable.just(1, 2, 3, 4, 5) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertComplete() + + // map an error Flowable + Flowable.error(RuntimeException()) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertError(RuntimeException::class.java) + + // map many items, concat with an error Flowable + Flowable.just(1, 2, 3, 4, 5) + .concatWith(Flowable.error(RuntimeException())) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertError(RuntimeException::class.java) + + // test Fuseable + Flowable.just(1) + .map { it * 2 } + .mapNotNull { null } + .filter { true } + .test() + .assertNoValues() + .assertComplete() + + // transform throws + Flowable.just(1) + .mapNotNull { throw RuntimeException() } + .test() + .assertError(RuntimeException::class.java) + } } diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt index 2ed8cae..4e1f09b 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt @@ -254,14 +254,15 @@ class ObservableTest { Observables.zip( Observable.just("Alpha", "Beta", "Gamma"), - Observable.range(1,4), - Observable.just(100,200,300) + Observable.range(1, 4), + Observable.just(100, 200, 300) ).subscribe(testObserver) - testObserver.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300)) + testObserver.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300)) } - @Test fun testConcatAll() { + @Test + fun testConcatAll() { var counter = 0 (0 until 10) .map { Observable.just(counter++, counter++, counter++) } @@ -271,4 +272,43 @@ class ObservableTest { Assert.assertEquals((0 until 30).toList(), result) } } + + @Test + fun testMapNotNull() { + // map many items + Observable.just(1, 2, 3, 4, 5) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertComplete() + + // map an error Observable + Observable.error(RuntimeException()) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertError(RuntimeException::class.java) + + // map many items, concat with an error Observable + Observable.just(1, 2, 3, 4, 5) + .concatWith(Observable.error(RuntimeException())) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertError(RuntimeException::class.java) + + // test Fuseable + Observable.just(1) + .map { it * 2 } + .mapNotNull { null } + .filter { true } + .test() + .assertNoValues() + .assertComplete() + + // transform throws + Observable.just(1) + .mapNotNull { throw RuntimeException() } + .test() + .assertError(RuntimeException::class.java) + } }