Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mapNotNull): add Observable.mapNotNull, Flowable.mapNotNull #251

Open
wants to merge 6 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ val examplesImplementation by configurations.getting {
}

dependencies {
api("io.reactivex.rxjava3:rxjava:3.0.6")
api("io.reactivex.rxjava3:rxjava:3.1.1")
implementation(kotlin("stdlib"))

testImplementation("org.funktionale:funktionale-partials:1.0.0-final")
Expand Down
12 changes: 12 additions & 0 deletions src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.functions.*
import io.reactivex.rxjava3.kotlin.internal.FlowableMapNotNullFlowable
import org.reactivestreams.Publisher


Expand Down Expand Up @@ -201,3 +202,14 @@ fun <A : Any, B : Any> Flowable<Pair<A, B>>.toMultimap(): Single<MutableMap<A, M
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Iterable<Publisher<T>>.concatAll(): Flowable<T> = Flowable.concat(this)

/**
* Returns an [Flowable] that emits the non-`null` results
* of applying the given [transform] function to each element of the original Observable.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
fun <T : Any, R : Any> Flowable<T>.mapNotNull(transform: (T) -> R?): Flowable<R> =
FlowableMapNotNullFlowable(this, transform)

Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package io.reactivex.rxjava3.kotlin.internal

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber
import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber
import io.reactivex.rxjava3.operators.ConditionalSubscriber
import org.reactivestreams.Subscriber

internal class FlowableMapNotNullFlowable<T : Any, R : Any>(
@JvmField
internal val source: Flowable<T>,
@JvmField
internal val transform: (T) -> R?,
) : Flowable<R>() {
override fun subscribeActual(subscriber: Subscriber<in R>) {
if (subscriber is ConditionalSubscriber<*>) {
source.subscribe(
MapNotNullConditionalSubscriber<T, R>(
subscriber as ConditionalSubscriber<in R>,
transform
)
)
} else {
source.subscribe(MapNotNullSubscriber(subscriber, transform))
}
}

internal class MapNotNullSubscriber<T : Any, R : Any>(
downstream: Subscriber<in R>,
@JvmField
internal val transform: (T) -> R?
) : BasicFuseableSubscriber<T, R>(downstream),
ConditionalSubscriber<T> {
override fun onNext(t: T) {
if (!tryOnNext(t)) {
upstream.request(1)
}
}

override fun tryOnNext(t: T): Boolean {
if (done) {
return true
}
if (sourceMode != NONE) {
downstream.onNext(null)
return true
}

val result = try {
transform(t)
} catch (ex: Throwable) {
fail(ex)
return true
}

if (result !== null) {
downstream.onNext(result)
return true
}
return false
}

override fun requestFusion(mode: Int): Int = transitiveBoundaryFusion(mode)

@Throws(Throwable::class)
override fun poll(): R? {
while (true) {
val item = qs.poll() ?: return null
val result = transform(item)
if (result != null) {
return result
}

if (sourceMode == ASYNC) {
qs.request(1)
}
}
}
}

internal class MapNotNullConditionalSubscriber<T : Any, R : Any>(
downstream: ConditionalSubscriber<in R>,
@JvmField
internal val transform: (T) -> R?,
) : BasicFuseableConditionalSubscriber<T, R>(downstream) {
override fun onNext(t: T) {
if (!tryOnNext(t)) {
upstream.request(1)
}
}

override fun tryOnNext(t: T): Boolean {
if (done) {
return true
}
if (sourceMode != NONE) {
downstream.onNext(null)
return true
}

val result = try {
transform(t)
} catch (ex: Throwable) {
fail(ex)
return true
}

return if (result != null) {
downstream.tryOnNext(result)
} else {
false
}
}

override fun requestFusion(mode: Int): Int {
return transitiveBoundaryFusion(mode)
}

@Throws(Throwable::class)
override fun poll(): R? {
while (true) {
val item = qs.poll() ?: return null
val result = transform(item)
if (result != null) {
return result
}
if (sourceMode == ASYNC) {
qs.request(1)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.reactivex.rxjava3.kotlin.internal

import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Observer
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver

internal class MapNotNullObservable<T : Any, R : Any>(
@JvmField
internal val source: Observable<T>,
@JvmField
internal val transform: (T) -> R?
) : Observable<R>() {
override fun subscribeActual(observer: Observer<in R>) {
source.subscribe(MapOptionalObserver(observer, transform))
}

internal class MapOptionalObserver<T : Any, R : Any>(
downstream: Observer<in R>,
@JvmField
internal val transform: (T) -> R?
) : BasicFuseableObserver<T, R>(downstream) {
override fun onNext(t: T) {
if (done) {
return
}
if (sourceMode != NONE) {
downstream.onNext(null)
return
}
val result = try {
transform(t)
} catch (ex: Throwable) {
fail(ex)
return
}
if (result !== null) {
downstream.onNext(result)
}
}

override fun requestFusion(mode: Int): Int {
return transitiveBoundaryFusion(mode)
}

@Throws(Throwable::class)
override fun poll(): R? {
while (true) {
val item = qd.poll() ?: return null
val result = transform(item)
if (result !== null) {
return result
}
}
}
}
}
11 changes: 11 additions & 0 deletions src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.ObservableSource
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.kotlin.internal.MapNotNullObservable


@CheckReturnValue
Expand Down Expand Up @@ -177,3 +178,13 @@ fun <A : Any, B : Any> Observable<Pair<A, B>>.toMultimap(): Single<MutableMap<A,
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any> Iterable<ObservableSource<T>>.concatAll(): Observable<T> =
Observable.concat(this)

/**
* Returns an [Observable] that emits the non-`null` results
* of applying the given [transform] function to each element of the original Observable.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any, R : Any> Observable<T>.mapNotNull(transform: (T) -> R?): Observable<R> =
MapNotNullObservable(this, transform)

48 changes: 44 additions & 4 deletions src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,15 @@ class FlowableTest {

Flowables.zip(
Flowable.just("Alpha", "Beta", "Gamma"),
Flowable.range(1,4),
Flowable.just(100,200,300)
Flowable.range(1, 4),
Flowable.just(100, 200, 300)
).subscribe(testSubscriber)

testSubscriber.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
testSubscriber.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300))
}

@Test fun testConcatAll() {
@Test
fun testConcatAll() {
(0 until 10)
.map { Flowable.just(it) }
.concatAll()
Expand All @@ -231,4 +232,43 @@ class FlowableTest {
Assert.assertEquals((0 until 10).toList(), result)
}
}

@Test
fun testMapNotNull() {
// map many items
Flowable.just(1, 2, 3, 4, 5)
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertValues(2, 4)
.assertComplete()

// map an error Flowable
Flowable.error<Int>(RuntimeException())
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertError(RuntimeException::class.java)

// map many items, concat with an error Flowable
Flowable.just(1, 2, 3, 4, 5)
.concatWith(Flowable.error(RuntimeException()))
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertValues(2, 4)
.assertError(RuntimeException::class.java)

// test Fuseable
Flowable.just(1)
.map { it * 2 }
.mapNotNull<Int, String> { null }
.filter { true }
.test()
.assertNoValues()
.assertComplete()

// transform throws
Flowable.just(1)
.mapNotNull { throw RuntimeException() }
.test()
.assertError(RuntimeException::class.java)
}
}
48 changes: 44 additions & 4 deletions src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ class ObservableTest {

Observables.zip(
Observable.just("Alpha", "Beta", "Gamma"),
Observable.range(1,4),
Observable.just(100,200,300)
Observable.range(1, 4),
Observable.just(100, 200, 300)
).subscribe(testObserver)

testObserver.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
testObserver.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300))
}

@Test fun testConcatAll() {
@Test
fun testConcatAll() {
var counter = 0
(0 until 10)
.map { Observable.just(counter++, counter++, counter++) }
Expand All @@ -271,4 +272,43 @@ class ObservableTest {
Assert.assertEquals((0 until 30).toList(), result)
}
}

@Test
fun testMapNotNull() {
// map many items
Observable.just(1, 2, 3, 4, 5)
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertValues(2, 4)
.assertComplete()

// map an error Observable
Observable.error<Int>(RuntimeException())
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertError(RuntimeException::class.java)

// map many items, concat with an error Observable
Observable.just(1, 2, 3, 4, 5)
.concatWith(Observable.error(RuntimeException()))
.mapNotNull { v -> v.takeIf { it % 2 == 0 } }
.test()
.assertValues(2, 4)
.assertError(RuntimeException::class.java)

// test Fuseable
Observable.just(1)
.map { it * 2 }
.mapNotNull<Int, String> { null }
.filter { true }
.test()
.assertNoValues()
.assertComplete()

// transform throws
Observable.just(1)
.mapNotNull { throw RuntimeException() }
.test()
.assertError(RuntimeException::class.java)
}
}