Skip to content

Commit

Permalink
Exposing unique subscription handling for custom flow operations (#560)
Browse files Browse the repository at this point in the history
  • Loading branch information
podarsmarty authored Oct 6, 2021
1 parent 35e50bb commit d2f01a2
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KProperty1

/**
Expand All @@ -38,8 +36,6 @@ abstract class BaseMvRxViewModel<S : MavericksState>(
) : MavericksViewModel<S>(initialState) {
private val tag by lazy { javaClass.simpleName }
private val disposables = CompositeDisposable()
private val lastDeliveredStates = ConcurrentHashMap<String, Any>()
private val activeSubscriptions = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())

/**
* Define a [LifecycleOwner] to control subscriptions between [BaseMvRxViewModel]s. This only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
Expand Down Expand Up @@ -673,7 +672,7 @@ class ViewModelSubscriberTest : BaseTest() {
}

@Test
fun testSubscribeNotCalledOnStartIfNoUpdateOccurredInStop() {
fun testUniqueOnlySubscribeNotCalledOnStartIfNoUpdateOccurredInStop() {
owner.lifecycle.currentState = Lifecycle.State.STARTED

var callCount = 0
Expand Down
10 changes: 7 additions & 3 deletions mvrx/src/main/kotlin/com/airbnb/mvrx/DeliveryMode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import kotlin.reflect.KProperty1
* See: [RedeliverOnStart], [UniqueOnly].
*/
sealed class DeliveryMode {
abstract val subscriptionId: String

internal fun appendPropertiesToId(vararg properties: KProperty1<*, *>): DeliveryMode {
return when (this) {
is RedeliverOnStart -> RedeliverOnStart
Expand All @@ -21,16 +23,18 @@ sealed class DeliveryMode {
*
* Likewise, when a [MavericksView] resubscribes after a configuration change the most recent update will always be emitted.
*/
object RedeliverOnStart : DeliveryMode()
object RedeliverOnStart : DeliveryMode() {
override val subscriptionId: String = RedeliverOnStart::javaClass.name
}

/**
* The subscription will receive the most recent state update when transitioning from locked to unlocked states (stopped -> started),
* only if the state has changed while locked.
* only if the state has changed while locked. This will include the initial state as a state update.
*
* Likewise, when a [MavericksView] resubscribes after a configuration change the most recent update will only be emitted
* if the state has changed while locked.
*
* @param subscriptionId A uniqueIdentifier for this subscription. It is an error for two unique only subscriptions to
* have the same id.
*/
class UniqueOnly(val subscriptionId: String) : DeliveryMode()
class UniqueOnly(override val subscriptionId: String) : DeliveryMode()
73 changes: 73 additions & 0 deletions mvrx/src/main/kotlin/com/airbnb/mvrx/FlowExtensions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.airbnb.mvrx

import androidx.lifecycle.DefaultLifecycleObserver
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.dropWhile
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.yield
import java.util.concurrent.ConcurrentHashMap

internal fun <T : Any?> Flow<T>.collectLatest(
lifecycleOwner: LifecycleOwner,
lastDeliveredStates: ConcurrentHashMap<String, Any?>,
activeSubscriptions: MutableSet<String>,
deliveryMode: DeliveryMode,
action: suspend (T) -> Unit
): Job {
val flow = when {
MavericksTestOverrides.FORCE_DISABLE_LIFECYCLE_AWARE_OBSERVER -> this
deliveryMode is UniqueOnly -> {
this.assertOneActiveSubscription(lifecycleOwner, activeSubscriptions, deliveryMode.subscriptionId)
.dropWhile { lastDeliveredStates.containsKey(deliveryMode.subscriptionId) && it == lastDeliveredStates[deliveryMode.subscriptionId] }
.flowWhenStarted(lifecycleOwner)
.distinctUntilChanged()
.onEach { lastDeliveredStates[deliveryMode.subscriptionId] = it }
}
else -> flowWhenStarted(lifecycleOwner)
}

val scope = lifecycleOwner.lifecycleScope + Mavericks.viewModelConfigFactory.subscriptionCoroutineContextOverride
return scope.launch(start = CoroutineStart.UNDISPATCHED) {
// Use yield to ensure flow collect coroutine is dispatched rather than invoked immediately.
// This is necessary when Dispatchers.Main.immediate is used in scope.
// Coroutine is launched with start = CoroutineStart.UNDISPATCHED to perform dispatch only once.
yield()
flow.collectLatest(action)
}
}

@Suppress("EXPERIMENTAL_API_USAGE")
internal fun <T : Any?> Flow<T>.assertOneActiveSubscription(lifecycleOwner: LifecycleOwner, activeSubscriptions: MutableSet<String>, subscriptionId: String): Flow<T> {
val observer = object : DefaultLifecycleObserver {
override fun onCreate(owner: LifecycleOwner) {
if (activeSubscriptions.contains(subscriptionId)) error(duplicateSubscriptionMessage(subscriptionId))
activeSubscriptions += subscriptionId
}

override fun onDestroy(owner: LifecycleOwner) {
activeSubscriptions.remove(subscriptionId)
}
}

lifecycleOwner.lifecycle.addObserver(observer)
return onCompletion {
activeSubscriptions.remove(subscriptionId)
lifecycleOwner.lifecycle.removeObserver(observer)
}
}

private fun duplicateSubscriptionMessage(subscriptionId: String) = """
Subscribing with a duplicate subscription id: $subscriptionId.
If you have multiple uniqueOnly subscriptions in a MvRx view that listen to the same properties
you must use a custom subscription id. If you are using a custom MvRxView, make sure you are using the proper
lifecycle owner. See BaseMvRxFragment for an example.
""".trimIndent()
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kotlinx.coroutines.yield
* It's possible because lifecycle state updated in the main thread
* 2. Flow completes when either [this] flow completes or lifecycle is destroyed
*/
fun <T : Any> Flow<T>.flowWhenStarted(owner: LifecycleOwner): Flow<T> = flow {
fun <T : Any?> Flow<T>.flowWhenStarted(owner: LifecycleOwner): Flow<T> = flow {
coroutineScope {
val startedChannel = startedChannel(owner.lifecycle)
val flowChannel = produce { collect { send(it) } }
Expand Down Expand Up @@ -82,7 +82,7 @@ private fun startedChannel(owner: Lifecycle): Channel<Boolean> {
return channel
}

private inline fun <T : Any> SelectBuilder<Unit>.onReceive(
private inline fun <T : Any?> SelectBuilder<Unit>.onReceive(
channel: ReceiveChannel<T>,
crossinline onClosed: () -> Unit,
noinline onReceive: suspend (value: T) -> Unit
Expand Down
58 changes: 46 additions & 12 deletions mvrx/src/main/kotlin/com/airbnb/mvrx/MavericksView.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.ViewModelProvider
import androidx.lifecycle.ViewModelStoreOwner
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlin.reflect.KProperty1

// Set of [MavericksView identity hash codes that have a pending invalidate.
Expand Down Expand Up @@ -42,11 +44,14 @@ interface MavericksView : LifecycleOwner {
* Accessing mvrxViewId before calling super.onCreate() will cause a crash.
*/
val mvrxViewId: String
get() = mavericksViewInternalViewModel.mavericksViewId

val mavericksViewInternalViewModel: MavericksViewInternalViewModel
get() = when (this) {
is ViewModelStoreOwner -> ViewModelProvider(this).get(MavericksViewIdViewModel::class.java).mavericksViewId
is ViewModelStoreOwner -> ViewModelProvider(this).get(MavericksViewInternalViewModel::class.java)
else -> error(
"If your MavericksView is not a ViewModelStoreOwner, you must implement mvrxViewId " +
"and return a string that is unique to this view and persistent across its entire lifecycle."
"If your MavericksView is not a ViewModelStoreOwner, you must implement mavericksViewInternalViewModel " +
"and return a MavericksViewInternalViewModel that is unique to this view and persistent across its entire lifecycle."
)
}

Expand Down Expand Up @@ -90,7 +95,7 @@ interface MavericksView : LifecycleOwner {
* in this fragment with exact same properties (i.e. two subscribes, or two selectSubscribes with the same properties).
*/
fun uniqueOnly(customId: String? = null): UniqueOnly {
return UniqueOnly(listOfNotNull(mvrxViewId, customId).joinToString("_"))
return UniqueOnly(listOfNotNull(mvrxViewId, UniqueOnly::class.simpleName, customId).joinToString("_"))
}

/**
Expand All @@ -104,7 +109,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState> MavericksViewModel<S>.onEach(deliveryMode: DeliveryMode = RedeliverOnStart, action: suspend (S) -> Unit) =
Expand All @@ -122,7 +127,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A> MavericksViewModel<S>.onEach(
Expand All @@ -142,7 +147,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart]
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted..
*/
fun <S : MavericksState, A, B> MavericksViewModel<S>.onEach(
Expand All @@ -163,7 +168,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A, B, C> MavericksViewModel<S>.onEach(
Expand All @@ -185,7 +190,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A, B, C, D> MavericksViewModel<S>.onEach(
Expand All @@ -208,7 +213,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A, B, C, D, E> MavericksViewModel<S>.onEach(
Expand All @@ -232,7 +237,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A, B, C, D, E, F> MavericksViewModel<S>.onEach(
Expand All @@ -257,7 +262,7 @@ interface MavericksView : LifecycleOwner {
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* Default: [RedeliverOnStart].
* @param action supports cooperative cancellation. The previous action will be cancelled if it as not completed before
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <S : MavericksState, A, B, C, D, E, F, G> MavericksViewModel<S>.onEach(
Expand Down Expand Up @@ -295,4 +300,33 @@ interface MavericksView : LifecycleOwner {
onFail: (suspend (Throwable) -> Unit)? = null,
onSuccess: (suspend (T) -> Unit)? = null
) = _internalSF(subscriptionLifecycleOwner, asyncProp, deliveryMode, onFail, onSuccess)

/**
* Subscribes to the given Flow within the coroutine scope of the `subscriptionLifecycleOwner`, starting the flow only when the lifecycle
* is started, and executing with the coroutine context of Mavericks' `subscriptionCoroutineContextOverride`. This can be utilized to create
* customized subscriptions, for example, to drop the first element in the flow before continuing. This is intended to be used with
* `viewmodel.stateflow`.
*
* @param deliveryMode If [UniqueOnly], when this [MavericksView] goes from a stopped to started lifecycle a value
* will only be emitted if the value has changed. This is useful for transient views that should only
* be shown once (toasts, poptarts), or logging. Most other views should use [RedeliverOnStart], as when a view is destroyed
* and recreated the previous state is necessary to recreate the view.
*
* Use [uniqueOnly] to automatically create a [UniqueOnly] mode with a unique id for this view.
*
* @param action supports cooperative cancellation. The previous action will be cancelled if it is not completed before
* the next one is emitted.
*/
fun <T : Any?> Flow<T>.collectLatest(
deliveryMode: DeliveryMode,
action: suspend (T) -> Unit
): Job = mavericksViewInternalViewModel.let {
collectLatest(
subscriptionLifecycleOwner,
it.lastDeliveredStates,
it.activeSubscriptions,
deliveryMode,
action
)
}
}
17 changes: 0 additions & 17 deletions mvrx/src/main/kotlin/com/airbnb/mvrx/MavericksViewIdViewModel.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.airbnb.mvrx

import androidx.lifecycle.SavedStateHandle
import androidx.lifecycle.ViewModel
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

class MavericksViewInternalViewModel(state: SavedStateHandle) : ViewModel() {
internal val lastDeliveredStates: ConcurrentHashMap<String, Any?> = ConcurrentHashMap<String, Any?>()
internal val activeSubscriptions: MutableSet<String> = mutableSetOf()
internal val mavericksViewId = state[PERSISTED_VIEW_ID_KEY] ?: generateUniqueId().also { id ->
state[PERSISTED_VIEW_ID_KEY] = id
}

private fun generateUniqueId() = "MavericksView_" + UUID.randomUUID().toString()

companion object {
private const val PERSISTED_VIEW_ID_KEY = "mavericks:persisted_view_id"
}
}
Loading

0 comments on commit d2f01a2

Please sign in to comment.