From c9dd612ad788ae4bd60043c0a698905aaebdf9fb Mon Sep 17 00:00:00 2001 From: Patrick Steiger Date: Mon, 20 Nov 2023 21:09:49 -0300 Subject: [PATCH] [RibCoroutineWorker] In `asWorker()`, keep scope alive until lifecycle completion. This fixes Rx subscriptions using `autoDispose(CoroutineScope)` immediately terminating. In order to properly support `autoDispose(CoroutineScope)` subscriptions, we must keep the `CoroutineScope` received in `onStart` alive as long as the `WorkerScopeProvider` lifecycle. `autoDispose` does *not* create a children coroutine: instead it installs a completion handler. Hence, outer scope will not have children to wait for completion and will terminate immediately. --- .../com/uber/rib/core/RibCoroutineWorker.kt | 17 +++++++++++----- .../uber/rib/core/RibCoroutineWorkerTest.kt | 20 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt b/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt index ff40ec9a7..4c1b192b0 100644 --- a/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt +++ b/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt @@ -246,12 +246,19 @@ internal constructor( ) : Worker { override fun onStart(lifecycle: WorkerScopeProvider) { - // We can start it undispatched because Worker binder will already call `onStart` in correct - // context, - // but we still want to pass in `coroutineDispatcher` to resume from suspensions in `onStart` in + // We start it undispatched to keep the behavior of immediate binding of Worker when + // WorkerBinder.bind is called. + // We still want to pass in `coroutineContext` to resume from suspensions in `onStart` in // correct context. - lifecycle.coroutineScope.launch(coroutineContext, start = CoroutineStart.UNDISPATCHED) { - supervisorScope { ribCoroutineWorker.onStart(this) } + lifecycle.coroutineScope.launch(coroutineContext, CoroutineStart.UNDISPATCHED) { + supervisorScope { + ribCoroutineWorker.onStart(this) + // Keep this scope alive until cancelled. + // This is particularly important for cases where we do not launch long-running coroutines + // with scope, but instead install some completion handler that we expect to be called at + // worker unbinding. This is the case with Rx subscriptions with 'autoDispose(scope)' + awaitCancellation() + } } } diff --git a/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt b/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt index 89c4b3be3..a802eb4d1 100644 --- a/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt +++ b/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt @@ -16,6 +16,8 @@ package com.uber.rib.core import com.google.common.truth.Truth.assertThat +import com.uber.autodispose.coroutinesinterop.autoDispose +import io.reactivex.subjects.PublishSubject import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CancellationException @@ -46,6 +48,7 @@ import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import org.junit.Rule import org.junit.Test +import org.mockito.kotlin.mock private const val ON_START_DELAY_DURATION_MILLIS = 100L private const val INNER_COROUTINE_DELAY_DURATION_MILLIS = 200L @@ -180,6 +183,23 @@ class RibCoroutineWorkerTest { } } + @Test + fun asWorker_autoDisposeWithCoroutineScope_lateEmissionIsReceivedBySubscriber() = runTest { + val router = mock>() + val interactor = object : Interactor>() {} + val subject = PublishSubject.create() + var gotEmission = false + val ribCoroutineWorker = RibCoroutineWorker { + subject.autoDispose(this).subscribe { gotEmission = true } + } + val worker = ribCoroutineWorker.asWorker() + InteractorHelper.attach(interactor, Any(), router, null) + WorkerBinder.bind(interactor, worker) + runCurrent() + subject.onNext(Unit) + assertThat(gotEmission).isTrue() + } + @Test fun testHelperFunction() = runTest { // Sanity - assert initial state.