Skip to content

Commit

Permalink
Changed flowWhenStarted to emit value on flow coroutine context (#678)
Browse files Browse the repository at this point in the history
* Changed flowWhenStarted to emit value on flow coroutine context

* Replaced data classes to NULL token

* Fixed null value handling in channel & Added nullable flow test code

---------

Co-authored-by: roy.tk <[email protected]>
  • Loading branch information
vulpeszerda and roy.tk authored May 22, 2023
1 parent 7c03368 commit 5174519
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 25 deletions.
53 changes: 28 additions & 25 deletions mvrx/src/main/kotlin/com/airbnb/mvrx/MavericksLifecycleAwareFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combineTransform
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.SelectBuilder
Expand All @@ -31,32 +30,34 @@ fun <T : Any?> Flow<T>.flowWhenStarted(owner: LifecycleOwner): Flow<T> = flow {
val startedChannel = startedChannel(owner.lifecycle)
val flowChannel = produce { collect { send(it) } }

val transform: suspend (Boolean, T) -> Unit = { started, value ->
if (started) {
emit(value)
}
}

val nullValue = Any()
var started: Boolean? = null
var flowValue: T? = null
var flowResult: Any? = nullValue
var isClosed = false

while (!isClosed) {
select<Unit> {
onReceive(startedChannel, { flowChannel.cancel(); isClosed = true }) {
started = it
if (flowValue !== null) {
@Suppress("UNCHECKED_CAST")
transform(it, flowValue as T)
val result = select {
onReceive(startedChannel, { flowChannel.cancel(); isClosed = true; nullValue }) { value ->
started = value
if (flowResult != nullValue && value) {
flowResult
} else {
nullValue
}
}
onReceive(flowChannel, { isClosed = true }) {
flowValue = it
if (started !== null) {
transform(started as Boolean, it)
onReceive(flowChannel, { isClosed = true; nullValue }) { value ->
flowResult = value
if (started == true) {
value
} else {
nullValue
}
}
}
if (result != nullValue) {
@Suppress("UNCHECKED_CAST")
emit(result as T)
}
}
}
}
Expand All @@ -83,14 +84,16 @@ private fun startedChannel(owner: Lifecycle): Channel<Boolean> {
return channel
}

private inline fun <T : Any?> SelectBuilder<Unit>.onReceive(
private inline fun <T : Any?, R : Any?> SelectBuilder<R>.onReceive(
channel: ReceiveChannel<T>,
crossinline onClosed: () -> Unit,
noinline onReceive: suspend (value: T) -> Unit
crossinline onClosed: () -> R,
noinline onReceive: suspend (value: T) -> R
) {
channel.onReceiveCatching {
val result = it.getOrNull()
if (result === null) onClosed()
else onReceive(result)
channel.onReceiveCatching { result ->
if (result.isClosed) {
onClosed()
} else {
onReceive(result.getOrThrow())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,17 @@ class MavericksLifecycleAwareFlowKtTest : BaseTest() {

flowOf(1).flowWhenStarted(owner).collect()
}

@Test
fun testNullableFlow() = runTest(UnconfinedTestDispatcher()) {
val flow = flowOf<Int?>(null, null)
val owner = TestLifecycleOwner()
val values = mutableListOf<Int?>()
owner.lifecycle.currentState = Lifecycle.State.STARTED
val job = flow.flowWhenStarted(owner).onEach {
values += it
}.launchIn(this)
assertEquals(listOf(null, null), values)
job.cancel()
}
}

0 comments on commit 5174519

Please sign in to comment.