Skip to content

Commit

Permalink
Merge branch 'expose-flexible-ipc-message-handling'
Browse files Browse the repository at this point in the history
  • Loading branch information
albin-mullvad committed Oct 16, 2023
2 parents dc70ef8 + 989f57e commit d32b6f8
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import net.mullvad.mullvadvpn.repository.ChangelogRepository
import net.mullvad.mullvadvpn.repository.DeviceRepository
import net.mullvad.mullvadvpn.repository.PrivacyDisclaimerRepository
import net.mullvad.mullvadvpn.repository.SettingsRepository
import net.mullvad.mullvadvpn.ui.serviceconnection.MessageHandler
import net.mullvad.mullvadvpn.ui.serviceconnection.ServiceConnectionManager
import net.mullvad.mullvadvpn.ui.serviceconnection.SplitTunneling
import net.mullvad.mullvadvpn.util.ChangelogDataProvider
Expand All @@ -40,6 +41,7 @@ import org.koin.android.ext.koin.androidApplication
import org.koin.android.ext.koin.androidContext
import org.koin.androidx.viewmodel.dsl.viewModel
import org.koin.core.qualifier.named
import org.koin.dsl.bind
import org.koin.dsl.module
import org.koin.dsl.onClose

Expand All @@ -59,7 +61,7 @@ val uiModule = module {
SplitTunneling(messenger, dispatcher)
}

single { ServiceConnectionManager(androidContext()) }
single { ServiceConnectionManager(androidContext()) } bind MessageHandler::class
single { InetAddressValidator.getInstance() }
single { androidContext().resources }
single { androidContext().assets }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,97 +10,93 @@ import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.withContext
import net.mullvad.mullvadvpn.lib.ipc.Event
import net.mullvad.mullvadvpn.lib.ipc.Request
import net.mullvad.mullvadvpn.model.AccountCreationResult
import net.mullvad.mullvadvpn.model.AccountExpiry
import net.mullvad.mullvadvpn.model.AccountHistory
import net.mullvad.mullvadvpn.model.LoginResult
import net.mullvad.mullvadvpn.ui.serviceconnection.ServiceConnectionManager
import net.mullvad.mullvadvpn.ui.serviceconnection.accountDataSource
import net.mullvad.mullvadvpn.util.flatMapReadyConnectionOrDefault
import net.mullvad.mullvadvpn.ui.serviceconnection.MessageHandler
import net.mullvad.mullvadvpn.ui.serviceconnection.events

class AccountRepository(
private val serviceConnectionManager: ServiceConnectionManager,
val dispatcher: CoroutineDispatcher = Dispatchers.IO
private val messageHandler: MessageHandler,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
private val _cachedCreatedAccount = MutableStateFlow<String?>(null)
val cachedCreatedAccount = _cachedCreatedAccount.asStateFlow()

private val accountCreationEvents: SharedFlow<AccountCreationResult> =
serviceConnectionManager.connectionState
.flatMapReadyConnectionOrDefault(flowOf()) { state ->
state.container.accountDataSource.accountCreationResult
}
messageHandler
.events<Event.AccountCreationEvent>()
.map { it.result }
.onEach {
_cachedCreatedAccount.value = (it as? AccountCreationResult.Success)?.accountToken
}
.shareIn(CoroutineScope(dispatcher), SharingStarted.WhileSubscribed())

val accountExpiryState: StateFlow<AccountExpiry> =
serviceConnectionManager.connectionState
.flatMapReadyConnectionOrDefault(flowOf()) { state ->
state.container.accountDataSource.accountExpiry
}
messageHandler
.events<Event.AccountExpiryEvent>()
.map { it.expiry }
.stateIn(
CoroutineScope(dispatcher),
SharingStarted.WhileSubscribed(),
AccountExpiry.Missing
)

val accountHistory: StateFlow<AccountHistory> =
serviceConnectionManager.connectionState
.flatMapReadyConnectionOrDefault(flowOf()) { state ->
state.container.accountDataSource.accountHistory
}
messageHandler
.events<Event.AccountHistoryEvent>()
.map { it.history }
.onStart { fetchAccountHistory() }
.stateIn(
CoroutineScope(dispatcher),
SharingStarted.WhileSubscribed(),
AccountHistory.Missing
)

private val loginEvents: SharedFlow<Event.LoginEvent> =
serviceConnectionManager.connectionState
.flatMapReadyConnectionOrDefault(flowOf()) { state ->
state.container.accountDataSource.loginEvents
}
private val loginEvents: SharedFlow<LoginResult> =
messageHandler
.events<Event.LoginEvent>()
.map { it.result }
.shareIn(CoroutineScope(dispatcher), SharingStarted.WhileSubscribed())

suspend fun createAccount(): AccountCreationResult =
withContext(dispatcher) {
val deferred = async { accountCreationEvents.first() }
serviceConnectionManager.accountDataSource()?.createAccount()
messageHandler.trySendRequest(Request.CreateAccount)
deferred.await()
}

suspend fun login(accountToken: String): LoginResult =
withContext(Dispatchers.IO) {
val deferred = async { loginEvents.first().result }
serviceConnectionManager.accountDataSource()?.login(accountToken)
val deferred = async { loginEvents.first() }
messageHandler.trySendRequest(Request.Login(accountToken))
deferred.await()
}

fun logout() {
clearCreatedAccountCache()
serviceConnectionManager.accountDataSource()?.logout()
messageHandler.trySendRequest(Request.Logout)
}

fun fetchAccountExpiry() {
serviceConnectionManager.accountDataSource()?.fetchAccountExpiry()
messageHandler.trySendRequest(Request.FetchAccountExpiry)
}

fun fetchAccountHistory() {
serviceConnectionManager.accountDataSource()?.fetchAccountHistory()
messageHandler.trySendRequest(Request.FetchAccountHistory)
}

fun clearAccountHistory() {
serviceConnectionManager.accountDataSource()?.clearAccountHistory()
messageHandler.trySendRequest(Request.ClearAccountHistory)
}

private fun clearCreatedAccountCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import net.mullvad.mullvadvpn.BuildConfig
Expand Down Expand Up @@ -312,6 +313,7 @@ open class MainActivity : FragmentActivity() {
private suspend fun isExpired(timeoutMillis: Long): Boolean {
return withTimeoutOrNull(timeoutMillis) {
accountRepository.accountExpiryState
.onSubscription { accountRepository.fetchAccountExpiry() }
.filter { it is AccountExpiry.Available }
.map { it.date()?.isBeforeNow }
.first()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package net.mullvad.mullvadvpn.ui.serviceconnection

import kotlin.reflect.KClass
import kotlinx.coroutines.flow.Flow
import net.mullvad.mullvadvpn.lib.ipc.Event
import net.mullvad.mullvadvpn.lib.ipc.Request

interface MessageHandler {
fun <R : Event> events(klass: KClass<R>): Flow<R>

fun trySendRequest(request: Request): Boolean
}

inline fun <reified R : Event> MessageHandler.events(): Flow<R> {
return this.events(R::class)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import android.os.Looper
import android.os.Messenger
import android.os.RemoteException
import android.util.Log
import kotlinx.coroutines.flow.filterIsInstance
import net.mullvad.mullvadvpn.lib.ipc.DispatchingHandler
import net.mullvad.mullvadvpn.lib.ipc.Event
import net.mullvad.mullvadvpn.lib.ipc.Request
import net.mullvad.mullvadvpn.lib.ipc.extensions.trySendRequest
import org.koin.core.component.KoinComponent

// Container of classes that communicate with the service through an active connection
Expand All @@ -21,7 +23,8 @@ class ServiceConnectionContainer(
private val dispatcher =
DispatchingHandler(Looper.getMainLooper()) { message -> Event.fromMessage(message) }

val accountDataSource = ServiceConnectionAccountDataSource(connection, dispatcher)
val events = dispatcher.parsedMessages.filterIsInstance<Event>()

val authTokenCache = AuthTokenCache(connection, dispatcher)
val connectionProxy = ConnectionProxy(connection, dispatcher)
val deviceDataSource = ServiceConnectionDeviceDataSource(connection, dispatcher)
Expand Down Expand Up @@ -49,6 +52,10 @@ class ServiceConnectionContainer(
registerListener(connection)
}

fun trySendRequest(request: Request, logErrors: Boolean): Boolean {
return connection.trySendRequest(request, logErrors = logErrors)
}

fun onDestroy() {
unregisterListener()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ import android.content.Intent
import android.os.IBinder
import android.os.Messenger
import android.util.Log
import kotlin.reflect.KClass
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import net.mullvad.mullvadvpn.lib.endpoint.ApiEndpointConfiguration
import net.mullvad.mullvadvpn.lib.endpoint.BuildConfig
import net.mullvad.mullvadvpn.lib.endpoint.putApiEndpointConfigurationExtra
import net.mullvad.mullvadvpn.lib.ipc.Event
import net.mullvad.mullvadvpn.lib.ipc.Request
import net.mullvad.mullvadvpn.service.MullvadVpnService
import net.mullvad.mullvadvpn.util.flatMapReadyConnectionOrDefault
import net.mullvad.talpid.util.EventNotifier

class ServiceConnectionManager(private val context: Context) {
class ServiceConnectionManager(private val context: Context) : MessageHandler {
private val _connectionState =
MutableStateFlow<ServiceConnectionState>(ServiceConnectionState.Disconnected)

Expand All @@ -27,6 +34,9 @@ class ServiceConnectionManager(private val context: Context) {
var isBound = false
private var vpnPermissionRequestHandler: (() -> Unit)? = null

private val events =
connectionState.flatMapReadyConnectionOrDefault(emptyFlow()) { it.container.events }

private val serviceConnection =
object : android.content.ServiceConnection {
override fun onServiceConnected(className: ComponentName, binder: IBinder) {
Expand Down Expand Up @@ -82,6 +92,15 @@ class ServiceConnectionManager(private val context: Context) {
}
}

override fun <E : Event> events(klass: KClass<E>): Flow<E> {
return events.filterIsInstance(klass)
}

override fun trySendRequest(request: Request): Boolean {
return connectionState.value.readyContainer()?.trySendRequest(request, logErrors = false)
?: false
}

fun onDestroy() {
_connectionState.value.readyContainer()?.onDestroy()
serviceNotifier.unsubscribeAll()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package net.mullvad.mullvadvpn.ui.serviceconnection

fun ServiceConnectionManager.accountDataSource() =
this.connectionState.value.readyContainer()?.accountDataSource

fun ServiceConnectionManager.appVersionInfoCache() =
this.connectionState.value.readyContainer()?.appVersionInfoCache

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class AccountViewModel(
@Suppress("konsist.ensure public properties use permitted names")
val enterTransitionEndAction = _enterTransitionEndAction.asSharedFlow()

init {
accountRepository.fetchAccountExpiry()
}

fun onManageAccountClick() {
viewModelScope.launch {
_uiSideEffect.tryEmit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ import android.util.Log
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.reflect.KClass
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow

class DispatchingHandler<T : Any>(looper: Looper, private val extractor: (Message) -> T?) :
Handler(looper), MessageDispatcher<T> {
private val handlers = HashMap<KClass<out T>, (T) -> Unit>()
private val lock = ReentrantReadWriteLock()

private val _parsedMessages = MutableSharedFlow<T>(extraBufferCapacity = 1)
val parsedMessages = _parsedMessages.asSharedFlow()

@Deprecated("Use parsedMessages instead.")
override fun <V : T> registerHandler(variant: KClass<V>, handler: (V) -> Unit) {
lock.writeLock().withLock {
handlers.put(variant) { instance -> @Suppress("UNCHECKED_CAST") handler(instance as V) }
Expand All @@ -27,6 +33,7 @@ class DispatchingHandler<T : Any>(looper: Looper, private val extractor: (Messag
val handler = handlers.get(instance::class)

handler?.invoke(instance)
_parsedMessages.tryEmit(instance)
} else {
Log.e("mullvad", "Dispatching handler received an unexpected message")
}
Expand Down

0 comments on commit d32b6f8

Please sign in to comment.