From 2a24142454ddf99f34fc104bb75e0411ad73a647 Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 16:42:58 +0700 Subject: [PATCH 1/6] feat(mapNotNull): add `Observable.mapNotNull` extension --- .../kotlin/internal/MapNotNullObserver.kt | 33 +++++++++++++++++++ .../io/reactivex/rxjava3/kotlin/observable.kt | 11 +++++++ .../rxjava3/kotlin/ObservableTest.kt | 18 +++++++--- 3 files changed, 58 insertions(+), 4 deletions(-) create mode 100644 src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt new file mode 100644 index 0000000..a4aa26e --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt @@ -0,0 +1,33 @@ +package io.reactivex.rxjava3.kotlin.internal + +import io.reactivex.rxjava3.core.Observer +import io.reactivex.rxjava3.disposables.Disposable +import io.reactivex.rxjava3.internal.disposables.DisposableHelper + +internal class MapNotNullObserver( + private val downstream: Observer, + private val transform: (T) -> R? +) : Observer, Disposable { + private var upstream: Disposable? = null + + override fun onSubscribe(d: Disposable) { + if (DisposableHelper.validate(upstream, d)) { + upstream = d + downstream.onSubscribe(this) + } + } + + override fun onNext(t: T) { + transform(t)?.let(downstream::onNext) + } + + override fun onError(e: Throwable): Unit = downstream.onError(e) + + override fun onComplete(): Unit = downstream.onComplete() + + override fun dispose() { + upstream!!.dispose() + } + + override fun isDisposed(): Boolean = upstream!!.isDisposed +} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt index ed246db..d121d70 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt @@ -7,6 +7,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.ObservableSource import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.kotlin.internal.MapNotNullObserver @CheckReturnValue @@ -177,3 +178,13 @@ fun Observable>.toMultimap(): Single Iterable>.concatAll(): Observable = Observable.concat(this) + +/** + * Returns an [Observable] that emits the non-`null` results + * of applying the given [transform] function to each element of the original Observable. + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Observable.mapNotNull(transform: (T) -> R?): Observable = + lift { MapNotNullObserver(it, transform) } + diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt index 2ed8cae..58e6a9e 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt @@ -254,14 +254,15 @@ class ObservableTest { Observables.zip( Observable.just("Alpha", "Beta", "Gamma"), - Observable.range(1,4), - Observable.just(100,200,300) + Observable.range(1, 4), + Observable.just(100, 200, 300) ).subscribe(testObserver) - testObserver.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300)) + testObserver.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300)) } - @Test fun testConcatAll() { + @Test + fun testConcatAll() { var counter = 0 (0 until 10) .map { Observable.just(counter++, counter++, counter++) } @@ -271,4 +272,13 @@ class ObservableTest { Assert.assertEquals((0 until 30).toList(), result) } } + + @Test + fun testMapNotNull() { + Observable.just(1, 2, 3, 4, 5) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertComplete() + } } From b143fe3cc10322e2143240d7cd3e6a7df9bba8a3 Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 17:32:40 +0700 Subject: [PATCH 2/6] feat(mapNotNull): add `Flowable.mapNotNull` extension --- .../io/reactivex/rxjava3/kotlin/flowable.kt | 12 ++ .../kotlin/internal/MapNotNullObserver.kt | 11 +- .../kotlin/internal/MapNotNullSubscriber.kt | 129 ++++++++++++++++++ .../reactivex/rxjava3/kotlin/FlowableTest.kt | 48 ++++++- .../rxjava3/kotlin/ObservableTest.kt | 30 ++++ 5 files changed, 225 insertions(+), 5 deletions(-) create mode 100644 src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt index c9e1f3e..efac8b8 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/flowable.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.functions.* +import io.reactivex.rxjava3.kotlin.internal.FlowableMapNotNullFlowable import org.reactivestreams.Publisher @@ -201,3 +202,14 @@ fun Flowable>.toMultimap(): Single Iterable>.concatAll(): Flowable = Flowable.concat(this) + +/** + * Returns an [Flowable] that emits the non-`null` results + * of applying the given [transform] function to each element of the original Observable. + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +@BackpressureSupport(BackpressureKind.PASS_THROUGH) +fun Flowable.mapNotNull(transform: (T) -> R?): Flowable = + FlowableMapNotNullFlowable(this, transform) + diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt index a4aa26e..f389ff5 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt @@ -2,6 +2,7 @@ package io.reactivex.rxjava3.kotlin.internal import io.reactivex.rxjava3.core.Observer import io.reactivex.rxjava3.disposables.Disposable +import io.reactivex.rxjava3.exceptions.Exceptions import io.reactivex.rxjava3.internal.disposables.DisposableHelper internal class MapNotNullObserver( @@ -18,7 +19,15 @@ internal class MapNotNullObserver( } override fun onNext(t: T) { - transform(t)?.let(downstream::onNext) + val v = try { + transform(t) + } catch (e: Throwable) { + Exceptions.throwIfFatal(e) + upstream!!.dispose() + onError(e) + return + } + v?.let(downstream::onNext) } override fun onError(e: Throwable): Unit = downstream.onError(e) diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt new file mode 100644 index 0000000..8f502ed --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt @@ -0,0 +1,129 @@ +package io.reactivex.rxjava3.kotlin.internal + +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber +import org.reactivestreams.Subscriber + +internal class FlowableMapNotNullFlowable( + private val source: Flowable, + private val transform: (T) -> R?, +) : Flowable() { + override fun subscribeActual(subscriber: Subscriber) { + if (subscriber is ConditionalSubscriber<*>) { + source.subscribe( + MapNotNullConditionalSubscriber( + subscriber as ConditionalSubscriber, + transform + ) + ) + } else { + source.subscribe(MapNotNullSubscriber(subscriber, transform)) + } + } + + internal class MapNotNullSubscriber( + downstream: Subscriber, + val transform: (T) -> R? + ) : BasicFuseableSubscriber(downstream), + ConditionalSubscriber { + override fun onNext(t: T) { + if (!tryOnNext(t)) { + upstream.request(1) + } + } + + override fun tryOnNext(t: T): Boolean { + if (done) { + return true + } + if (sourceMode != NONE) { + downstream.onNext(null) + return true + } + + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return true + } + + if (result !== null) { + downstream.onNext(result) + return true + } + return false + } + + override fun requestFusion(mode: Int): Int = transitiveBoundaryFusion(mode) + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qs.poll() ?: return null + val result = transform(item) + if (result != null) { + return result + } + + if (sourceMode == ASYNC) { + qs.request(1) + } + } + } + } + + internal class MapNotNullConditionalSubscriber( + downstream: ConditionalSubscriber, + val transform: (T) -> R?, + ) : BasicFuseableConditionalSubscriber(downstream) { + override fun onNext(t: T) { + if (!tryOnNext(t)) { + upstream.request(1) + } + } + + override fun tryOnNext(t: T): Boolean { + if (done) { + return true + } + if (sourceMode != NONE) { + downstream.onNext(null) + return true + } + + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return true + } + + return if (result != null) { + downstream.tryOnNext(result) + } else { + false + } + } + + override fun requestFusion(mode: Int): Int { + return transitiveBoundaryFusion(mode) + } + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qs.poll() ?: return null + val result = transform(item) + if (result != null) { + return result + } + if (sourceMode == ASYNC) { + qs.request(1) + } + } + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt index da615ec..5c88366 100755 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableTest.kt @@ -215,14 +215,15 @@ class FlowableTest { Flowables.zip( Flowable.just("Alpha", "Beta", "Gamma"), - Flowable.range(1,4), - Flowable.just(100,200,300) + Flowable.range(1, 4), + Flowable.just(100, 200, 300) ).subscribe(testSubscriber) - testSubscriber.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300)) + testSubscriber.assertValues(Triple("Alpha", 1, 100), Triple("Beta", 2, 200), Triple("Gamma", 3, 300)) } - @Test fun testConcatAll() { + @Test + fun testConcatAll() { (0 until 10) .map { Flowable.just(it) } .concatAll() @@ -231,4 +232,43 @@ class FlowableTest { Assert.assertEquals((0 until 10).toList(), result) } } + + @Test + fun testMapNotNull() { + // map many items + Flowable.just(1, 2, 3, 4, 5) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertComplete() + + // map an error Flowable + Flowable.error(RuntimeException()) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertError(RuntimeException::class.java) + + // map many items, concat with an error Flowable + Flowable.just(1, 2, 3, 4, 5) + .concatWith(Flowable.error(RuntimeException())) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertError(RuntimeException::class.java) + + // test Fuseable + Flowable.just(1) + .map { it * 2 } + .mapNotNull { null } + .filter { true } + .test() + .assertNoValues() + .assertComplete() + + // transform throws + Flowable.just(1) + .mapNotNull { throw RuntimeException() } + .test() + .assertError(RuntimeException::class.java) + } } diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt index 58e6a9e..4e1f09b 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableTest.kt @@ -275,10 +275,40 @@ class ObservableTest { @Test fun testMapNotNull() { + // map many items Observable.just(1, 2, 3, 4, 5) .mapNotNull { v -> v.takeIf { it % 2 == 0 } } .test() .assertValues(2, 4) .assertComplete() + + // map an error Observable + Observable.error(RuntimeException()) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertError(RuntimeException::class.java) + + // map many items, concat with an error Observable + Observable.just(1, 2, 3, 4, 5) + .concatWith(Observable.error(RuntimeException())) + .mapNotNull { v -> v.takeIf { it % 2 == 0 } } + .test() + .assertValues(2, 4) + .assertError(RuntimeException::class.java) + + // test Fuseable + Observable.just(1) + .map { it * 2 } + .mapNotNull { null } + .filter { true } + .test() + .assertNoValues() + .assertComplete() + + // transform throws + Observable.just(1) + .mapNotNull { throw RuntimeException() } + .test() + .assertError(RuntimeException::class.java) } } From 9ded19ab66c8edbfec02fd00dd4db31b1180a029 Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 17:36:41 +0700 Subject: [PATCH 3/6] rxjava 3.1.1 --- build.gradle.kts | 2 +- .../reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 02f63c0..fe6bf9b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -39,7 +39,7 @@ val examplesImplementation by configurations.getting { } dependencies { - api("io.reactivex.rxjava3:rxjava:3.0.6") + api("io.reactivex.rxjava3:rxjava:3.1.1") implementation(kotlin("stdlib")) testImplementation("org.funktionale:funktionale-partials:1.0.0-final") diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt index 8f502ed..e314769 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt @@ -1,9 +1,9 @@ package io.reactivex.rxjava3.kotlin.internal import io.reactivex.rxjava3.core.Flowable -import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber +import io.reactivex.rxjava3.operators.ConditionalSubscriber import org.reactivestreams.Subscriber internal class FlowableMapNotNullFlowable( From d228241b5cb0def1089bb1a9ce13477aefb0405a Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 17:37:42 +0700 Subject: [PATCH 4/6] rename files --- .../{MapNotNullSubscriber.kt => FlowableMapNotNullFlowable.kt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/{MapNotNullSubscriber.kt => FlowableMapNotNullFlowable.kt} (100%) diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt similarity index 100% rename from src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullSubscriber.kt rename to src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt From 2a6f53b53b3031b1c1df5f7ba57e7d220332058a Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 17:43:30 +0700 Subject: [PATCH 5/6] update --- .../internal/FlowableMapNotNullFlowable.kt | 4 +- .../kotlin/internal/MapNotNullObservable.kt | 53 +++++++++++++++++++ .../kotlin/internal/MapNotNullObserver.kt | 42 --------------- .../io/reactivex/rxjava3/kotlin/observable.kt | 4 +- 4 files changed, 57 insertions(+), 46 deletions(-) create mode 100644 src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt delete mode 100644 src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt index e314769..f598f05 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt @@ -25,7 +25,7 @@ internal class FlowableMapNotNullFlowable( internal class MapNotNullSubscriber( downstream: Subscriber, - val transform: (T) -> R? + internal val transform: (T) -> R? ) : BasicFuseableSubscriber(downstream), ConditionalSubscriber { override fun onNext(t: T) { @@ -77,7 +77,7 @@ internal class FlowableMapNotNullFlowable( internal class MapNotNullConditionalSubscriber( downstream: ConditionalSubscriber, - val transform: (T) -> R?, + internal val transform: (T) -> R?, ) : BasicFuseableConditionalSubscriber(downstream) { override fun onNext(t: T) { if (!tryOnNext(t)) { diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt new file mode 100644 index 0000000..e1bb833 --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt @@ -0,0 +1,53 @@ +package io.reactivex.rxjava3.kotlin.internal + +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Observer +import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver + +internal class MapNotNullObservable( + private val source: Observable, + private val transform: (T) -> R? +) : Observable() { + override fun subscribeActual(observer: Observer) { + source.subscribe(MapOptionalObserver(observer, transform)) + } + + internal class MapOptionalObserver( + downstream: Observer, + internal val transform: (T) -> R? + ) : BasicFuseableObserver(downstream) { + override fun onNext(t: T) { + if (done) { + return + } + if (sourceMode != NONE) { + downstream.onNext(null) + return + } + val result = try { + transform(t) + } catch (ex: Throwable) { + fail(ex) + return + } + if (result !== null) { + downstream.onNext(result) + } + } + + override fun requestFusion(mode: Int): Int { + return transitiveBoundaryFusion(mode) + } + + @Throws(Throwable::class) + override fun poll(): R? { + while (true) { + val item = qd.poll() ?: return null + val result = transform(item) + if (result !== null) { + return result + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt deleted file mode 100644 index f389ff5..0000000 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObserver.kt +++ /dev/null @@ -1,42 +0,0 @@ -package io.reactivex.rxjava3.kotlin.internal - -import io.reactivex.rxjava3.core.Observer -import io.reactivex.rxjava3.disposables.Disposable -import io.reactivex.rxjava3.exceptions.Exceptions -import io.reactivex.rxjava3.internal.disposables.DisposableHelper - -internal class MapNotNullObserver( - private val downstream: Observer, - private val transform: (T) -> R? -) : Observer, Disposable { - private var upstream: Disposable? = null - - override fun onSubscribe(d: Disposable) { - if (DisposableHelper.validate(upstream, d)) { - upstream = d - downstream.onSubscribe(this) - } - } - - override fun onNext(t: T) { - val v = try { - transform(t) - } catch (e: Throwable) { - Exceptions.throwIfFatal(e) - upstream!!.dispose() - onError(e) - return - } - v?.let(downstream::onNext) - } - - override fun onError(e: Throwable): Unit = downstream.onError(e) - - override fun onComplete(): Unit = downstream.onComplete() - - override fun dispose() { - upstream!!.dispose() - } - - override fun isDisposed(): Boolean = upstream!!.isDisposed -} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt index d121d70..c61c964 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/observable.kt @@ -7,7 +7,7 @@ import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.ObservableSource import io.reactivex.rxjava3.core.Single -import io.reactivex.rxjava3.kotlin.internal.MapNotNullObserver +import io.reactivex.rxjava3.kotlin.internal.MapNotNullObservable @CheckReturnValue @@ -186,5 +186,5 @@ fun Iterable>.concatAll(): Observable = @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Observable.mapNotNull(transform: (T) -> R?): Observable = - lift { MapNotNullObserver(it, transform) } + MapNotNullObservable(this, transform) From 43ea4edc544a151e137d1f381cc4d78b4006a325 Mon Sep 17 00:00:00 2001 From: hoc081098 Date: Sat, 4 Feb 2023 17:47:22 +0700 Subject: [PATCH 6/6] doen --- .../rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt | 8 ++++++-- .../rxjava3/kotlin/internal/MapNotNullObservable.kt | 7 +++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt index f598f05..5491155 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/FlowableMapNotNullFlowable.kt @@ -7,8 +7,10 @@ import io.reactivex.rxjava3.operators.ConditionalSubscriber import org.reactivestreams.Subscriber internal class FlowableMapNotNullFlowable( - private val source: Flowable, - private val transform: (T) -> R?, + @JvmField + internal val source: Flowable, + @JvmField + internal val transform: (T) -> R?, ) : Flowable() { override fun subscribeActual(subscriber: Subscriber) { if (subscriber is ConditionalSubscriber<*>) { @@ -25,6 +27,7 @@ internal class FlowableMapNotNullFlowable( internal class MapNotNullSubscriber( downstream: Subscriber, + @JvmField internal val transform: (T) -> R? ) : BasicFuseableSubscriber(downstream), ConditionalSubscriber { @@ -77,6 +80,7 @@ internal class FlowableMapNotNullFlowable( internal class MapNotNullConditionalSubscriber( downstream: ConditionalSubscriber, + @JvmField internal val transform: (T) -> R?, ) : BasicFuseableConditionalSubscriber(downstream) { override fun onNext(t: T) { diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt index e1bb833..fdc1a1f 100644 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/internal/MapNotNullObservable.kt @@ -5,8 +5,10 @@ import io.reactivex.rxjava3.core.Observer import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver internal class MapNotNullObservable( - private val source: Observable, - private val transform: (T) -> R? + @JvmField + internal val source: Observable, + @JvmField + internal val transform: (T) -> R? ) : Observable() { override fun subscribeActual(observer: Observer) { source.subscribe(MapOptionalObserver(observer, transform)) @@ -14,6 +16,7 @@ internal class MapNotNullObservable( internal class MapOptionalObserver( downstream: Observer, + @JvmField internal val transform: (T) -> R? ) : BasicFuseableObserver(downstream) { override fun onNext(t: T) {