Skip to content

Commit

Permalink
Fix observable not disposing store subscription, change dispatcher to…
Browse files Browse the repository at this point in the history
… main.immediate for Dispatcher
  • Loading branch information
Pablo Orgaz committed Jan 2, 2020
1 parent c7a80df commit 624d636
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 1,038 deletions.
43 changes: 25 additions & 18 deletions app/src/main/java/org/sample/SampleActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ import com.mini.android.FluxActivity
import com.minikorp.grove.ConsoleLogTree
import com.minikorp.grove.Grove
import kotlinx.android.synthetic.main.home_activity.*
import kotlinx.coroutines.Job
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mini.Action
import mini.BaseSaga
import mini.Dispatcher
import mini.LoggerMiddleware
import mini.MiniGen
import mini.ObjectDiff
import mini.Reducer
import mini.Saga
import mini.SagaAction
import mini.Store

class SampleActivity : FluxActivity() {
Expand All @@ -38,20 +41,23 @@ class SampleActivity : FluxActivity() {
demo_text.text = "${it.loginState} - ${it.user}"
}.track()



Grove.plant(ConsoleLogTree())
dispatcher.addMiddleware(LoggerMiddleware(stores,
diffFunction = { a, b -> ObjectDiff.computeDiff(a, b) },
logger = { p, tag, msg ->
Grove.tag(tag).log(p) { msg }
}))

val job = dispatch(LoginAction()) {
Grove.d { "Login complete!" }
}

lifecycleScope.launch {
delay(2000)
//job.cancel()
//Perform login

container.setOnClickListener {
val job = lifecycleScope.launch {
dispatcher.dispatch(LoginAction())
Grove.d { "Login complete!" }
}
}
}
}
Expand All @@ -63,10 +69,7 @@ interface ActionInterface {
}

@Action
class ActionTwo(override val text: String) : ActionInterface

@Action
class LoginAction : SagaAction
class LoginAction : BaseSaga()

@Action
class LoginStartAction
Expand All @@ -84,12 +87,16 @@ class DummyStore(private val dispatcher: Dispatcher) : Store<DummyState>() {

@Saga suspend fun onLogin(action: LoginAction) {
dispatcher.dispatch(LoginStartAction())
try {
delay(5000) //Login for 5 seconds
dispatcher.dispatch(LoginCompleteAction("success"))
} catch (ex: Throwable) {
//Job was cancelled, so we can't dispatch on the same context, start new one
dispatcher.dispatch(LoginCompleteAction("failure"), Job())
withContext(Dispatchers.IO + SupervisorJob()) {
try {
delay(5000) //Login for 5 seconds
dispatcher.dispatch(LoginCompleteAction("success"))
} catch (ex: Throwable) {
withContext(NonCancellable) {
//Job was cancelled or failed, so we can't dispatch on the same context, start new one
dispatcher.dispatch(LoginCompleteAction("failure"))
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions app/src/main/res/layout/home_activity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<FrameLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:id="@+id/container"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:layout_editor_absoluteY="25dp">
Expand Down
10 changes: 10 additions & 0 deletions mini-android/src/main/java/com/mini/android/FluxViewModel.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.mini.android

import androidx.annotation.CallSuper
import androidx.lifecycle.LiveData
import androidx.lifecycle.ViewModel
import mini.CloseableTracker
import mini.DefaultCloseableTracker
import mini.StateContainer

abstract class FluxViewModel : ViewModel(),
CloseableTracker by DefaultCloseableTracker() {
Expand All @@ -13,4 +15,12 @@ abstract class FluxViewModel : ViewModel(),
super.onCleared()
clearCloseables()
}
}

fun <T> LiveData<T>.asStateContainer(defaultValue: T? = null): StateContainer<T> {
return object : StateContainer<T> {
override val state: T
get() = value ?: defaultValue
?: throw NullPointerException("No default value provided.")
}
}
7 changes: 6 additions & 1 deletion mini-common/src/main/java/mini/BaseAction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@ package mini
* Utility base action, to use in place of [Action] annotation.
*/
@Action
abstract class BaseAction
abstract class BaseAction

/**
* Utility base action, to use in place of [Action] annotation.
*/
abstract class BaseSaga : SagaAction
2 changes: 1 addition & 1 deletion mini-common/src/main/java/mini/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private typealias DispatchCallback = suspend (Any) -> Unit
*
*/
class Dispatcher(private val actionTypes: Map<KClass<*>, List<KClass<*>>>,
private val actionDispatchContext: CoroutineContext = Dispatchers.Main,
private val actionDispatchContext: CoroutineContext = Dispatchers.Main.immediate,
private val strictMode: Boolean = false) {

private val subscriptionCaller: Chain = object : Chain {
Expand Down
11 changes: 6 additions & 5 deletions mini-common/src/main/java/mini/LoggerMiddleware.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface SilentAction
/**
* Action logging for stores.
*/
class LoggerMiddleware(stores: Collection<Store<*>>,
class LoggerMiddleware(stores: Collection<StateContainer<*>>,
private val tag: String = "MiniLog",
private val diffFunction: ((a: Any?, b: Any?) -> String)? = null,
private val logger: (priority: Int, tag: String, msg: String) -> Unit) :
Expand All @@ -23,6 +23,7 @@ class LoggerMiddleware(stores: Collection<Store<*>>,

override suspend fun intercept(action: Any, chain: Chain): Any {
if (action is SilentAction) chain.proceed(action) //Do nothing

val isSaga = action is SagaAction
val beforeStates: Array<Any?> = Array(stores.size) { Unit }
val afterStates: Array<Any?> = Array(stores.size) { Unit }
Expand Down Expand Up @@ -51,10 +52,10 @@ class LoggerMiddleware(stores: Collection<Store<*>>,
val oldState = beforeStates[i]
val newState = afterStates[i]
if (oldState !== newState) {
val line = "${stores[i].javaClass.simpleName}: $newState"
logger(Log.VERBOSE, tag, "$prelude$line")
diffFunction?.invoke(oldState, newState)?.let {
logger(Log.DEBUG, tag, "$prelude$it")
val line = "$prelude${stores[i].javaClass.simpleName}"
logger(Log.VERBOSE, tag, "$line: $newState")
diffFunction?.invoke(oldState, newState)?.let { diff ->
logger(Log.DEBUG, tag, "$line: $diff")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions mini-common/src/main/java/mini/ObjectDiff.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ObjectDiff {
is String, is Number -> {
diffMap[propertyName] = "$a => $b"
}
is Map<*, *> -> {
is Map<*, *> -> {
b as Map<*, *>
val out = DiffMap()
b.entries.forEach { (k, v) ->
Expand All @@ -48,7 +48,7 @@ object ObjectDiff {

diffMap[propertyName] = out
}
else -> {
else -> {
if (a::class.isData) {
val props = a::class.memberProperties
val out = DiffMap()
Expand Down
8 changes: 4 additions & 4 deletions mini-common/src/main/java/mini/Resource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ open class Resource<out T> @PublishedApi internal constructor(val value: Any?) {
fun getOrNull(): T? =
when {
isSuccess -> value as T?
else -> null
else -> null
}

fun exceptionOrNull(): Throwable? =
when (value) {
is Failure -> value.exception
else -> null
else -> null
}

companion object {
Expand Down Expand Up @@ -74,8 +74,8 @@ class Task(value: Any?) : Resource<Unit>(value) {
isSuccess -> "Success"
isFailure -> "Failure"
isLoading -> "Loading"
isIdle -> "Idle"
else -> value.toString()
isIdle -> "Idle"
else -> value.toString()
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions mini-common/src/main/java/mini/StateContainer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package mini

/**
* Common interface for state containers.
*/
interface StateContainer<S> {

val state: S
}
4 changes: 2 additions & 2 deletions mini-common/src/main/java/mini/Store.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.*
/**
* State holder.
*/
abstract class Store<S> : Closeable {
abstract class Store<S> : Closeable, StateContainer<S> {

companion object {
val NO_STATE = Any()
Expand Down Expand Up @@ -48,7 +48,7 @@ abstract class Store<S> : Closeable {
return StoreSubscription(this, fn)
}

val state: S
override val state: S
get() {
if (_state === NO_STATE) {
synchronized(this) {
Expand Down
14 changes: 3 additions & 11 deletions mini-rx/src/main/java/mini/rx/RxEx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.processors.PublishProcessor
import io.reactivex.subjects.PublishSubject
import mini.Resource
import mini.Store
Expand Down Expand Up @@ -65,20 +64,13 @@ class DefaultSubscriptionTracker : SubscriptionTracker {
}
}

fun <S> Store<S>.flowable(hotStart: Boolean = true): Flowable<S> {
val processor = PublishProcessor.create<S>()
val subscription = subscribe(hotStart = false) {
processor.offer(it)
}
return processor.doOnTerminate { subscription.close() }
.let { if (hotStart) it.startWith(state) else it }
}

fun <S> Store<S>.observable(hotStart: Boolean = true): Observable<S> {
val subject = PublishSubject.create<S>()
val subscription = subscribe(hotStart = false) {
subject.onNext(it)
}
return subject.doOnTerminate { subscription.close() }
return subject
.doOnDispose { subscription.close() }
.doOnTerminate { subscription.close() }
.let { if (hotStart) it.startWith(state) else it }
}
29 changes: 10 additions & 19 deletions mini-rx/src/test/kotlin/mini/rx/RxExTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,38 @@ import org.amshove.kluent.`should be equal to`
import org.junit.Test

class RxExTest {
@Test
fun `flowable sends initial state`() {
val store = SampleStore()
store.updateState("abc") //Set before subscribe
var sentState = ""
store.flowable().subscribe {
sentState = it
}
sentState `should be equal to` "abc"
}

@Test
fun `flowable sends updates`() {
fun `observable sends initial state`() {
val store = SampleStore()
store.updateState("abc") //Set before subscribe
var sentState = ""
store.flowable().subscribe {
store.observable().subscribe {
sentState = it
}
store.updateState("abc") //Set before subscribe
sentState `should be equal to` "abc"
}

@Test
fun `observable sends initial state`() {
fun `observable sends updates`() {
val store = SampleStore()
store.updateState("abc") //Set before subscribe
var sentState = ""
store.observable().subscribe {
sentState = it
}
store.updateState("abc") //Set before subscribe
sentState `should be equal to` "abc"
}

@Test
fun `observable sends updates`() {
fun `observable completes`() {
val store = SampleStore()
var sentState = ""
store.observable().subscribe {
val disposable = store.observable(hotStart = false).subscribe {
sentState = it
}
store.updateState("abc") //Set before subscribe
sentState `should be equal to` "abc"
disposable.dispose() //Clear it
store.updateState("abc")
sentState `should be equal to` "" //No change should be made
}
}
Loading

0 comments on commit 624d636

Please sign in to comment.