Skip to content

Commit

Permalink
Modifications to login managers and AuthService'
Browse files Browse the repository at this point in the history
  • Loading branch information
this-Aditya committed Nov 5, 2024
1 parent 1d75529 commit bbb724f
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.radarbase.android.util.NetworkConnectedReceiver
import org.radarbase.kotlin.coroutines.launchJoin
import org.radarbase.producer.AuthenticationException
import org.slf4j.LoggerFactory
import java.net.ConnectException
import java.util.concurrent.TimeUnit

@Keep
Expand All @@ -43,7 +44,10 @@ abstract class AuthService : LifecycleService(), LoginListener {
private val executor: CoroutineTaskExecutor =
CoroutineTaskExecutor(AuthService::class.simpleName!!, Dispatchers.Default)

val serviceMutex: Mutex = Mutex(false)
private val serviceMutex: Mutex = Mutex(false)
private val registryTweakMutex: Mutex = Mutex(false)
private val authUpdateMutex: Mutex = Mutex(false)

private val _authStateFailures: MutableSharedFlow<AuthLoginListener.AuthStateFailure> = MutableSharedFlow(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
Expand All @@ -59,6 +63,7 @@ abstract class AuthService : LifecycleService(), LoginListener {
onBufferOverflow = BufferOverflow.DROP_OLDEST
)

private var sourceRegistrationStarted: Boolean = false
private val authStateFailures: Flow<AuthLoginListener.AuthStateFailure> = _authStateFailures
private val authStateSuccess: Flow<AuthLoginListener.AuthStateSuccess> = _authStateSuccess
private val authStateLogout: Flow<AuthLoginListener.AuthStateLogout> = _authStateLogout
Expand All @@ -67,6 +72,8 @@ abstract class AuthService : LifecycleService(), LoginListener {
SharedPreferencesAuthSerialization(this)
}

private val successHandlers: MutableList<suspend () -> Unit> = mutableListOf()

@Volatile
private var isInLoginActivity: Boolean = false

Expand Down Expand Up @@ -118,31 +125,34 @@ abstract class AuthService : LifecycleService(), LoginListener {
}
launch(Dispatchers.Default) {
authStateFailures
.filter { (_, exception) -> exception !is AuthenticationException }
.flatMapLatest { (_, exception) ->
logger.error("Failed to get authentication state", exception)
val endTime = SystemClock.uptimeMillis() + refreshDelay.nextDelay()
authState
.transformWhile { auth ->
emit(endTime to auth)
auth.isValid
.collectLatest { (manager, ex) ->
logger.info("Login failed: {}", ex.toString())
when (ex) {
is AuthenticationException -> {
callListeners {
it.listener.loginFailed(manager, ex)
}
}
is ConnectException -> {
isConnected = false
executor.delay(refreshDelay.nextDelay(), ::refresh)
}
else -> {
executor.delay(refreshDelay.nextDelay(), ::refresh)
}
}
.collectLatest { (endTime, auth) ->
if (auth.isValid) {
refreshDelay.reset()
} else {
delay(endTime - SystemClock.uptimeMillis())
refresh()
}
}
}
launch(Dispatchers.Default) {
authStateSuccess
.collect {

.collectLatest { successState: AuthLoginListener.AuthStateSuccess ->
logger.info("Log-in succeeded")
isConnected = true
refreshDelay.reset()
callListeners(sinceUpdate = latestAppAuth.lastUpdate) {
it.listener.loginSucceeded(successState.manager, latestAppAuth)
}
}

}
}
}
Expand Down Expand Up @@ -188,25 +198,26 @@ abstract class AuthService : LifecycleService(), LoginListener {
if (isConnected) {
refresh()
} else {
val currentAuthState = authState.value
if (currentAuthState.isValid || currentAuthState.relevantManagers.any {
it.isRefreshable(
currentAuthState
updateState { appAuth ->
if (appAuth.isValid || appAuth.relevantManagers.any {
it.isRefreshable(
appAuth
)
}) {
logger.info("Retrieving active authentication state without refreshing")
callListeners(sinceUpdate = appAuth.lastUpdate) {
it.listener.loginSucceeded(null, appAuth)
}
} else {
logger.error(
"Failed to retrieve authentication state without refreshing",
IllegalStateException(
"Cannot refresh authentication state $appAuth: not online and no" +
"applicable authentication manager."
)
)
}) {
logger.info("Retrieving active authentication state without refreshing")
callListeners(sinceUpdate = currentAuthState.lastUpdate) {
it.listener.loginSucceeded(null, currentAuthState)
startLogin()
}
} else {
logger.error(
"Failed to retrieve authentication state without refreshing",
IllegalStateException(
"Cannot refresh authentication state $currentAuthState: not online and no" +
"applicable authentication manager."
)
)
startLogin()
}
}
}
Expand Down Expand Up @@ -241,10 +252,6 @@ abstract class AuthService : LifecycleService(), LoginListener {
it.lastUpdated = sinceUpdate
call(it)
}
// obsoleteListeners.forEach { listener ->
// listener.lastUpdated = sinceUpdate
// call(listener)
// }
}
}

Expand All @@ -259,12 +266,12 @@ abstract class AuthService : LifecycleService(), LoginListener {

return LoginListenerRegistry(loginListener)
.also {
serviceMutex.withLock { listeners += it }
registryTweakMutex.withLock { listeners += it }
}
}

suspend fun removeLoginListener(registry: LoginListenerRegistry) {
serviceMutex.withLock {
registryTweakMutex.withLock {
logger.debug(
"Removing login listener #{}: {} (starting with {} listeners)",
registry.id,
Expand All @@ -284,23 +291,24 @@ abstract class AuthService : LifecycleService(), LoginListener {

override fun loginSucceeded(manager: LoginManager?, authState: AppAuthState) {
_authStateSuccess.tryEmit(AuthLoginListener.AuthStateSuccess(manager))
_authState.tryEmit(authState)
_authState.value = authState
}

override fun logoutSucceeded(manager: LoginManager?, authState: AppAuthState) {
_authStateLogout.tryEmit(AuthLoginListener.AuthStateLogout(manager))
_authState.tryEmit(authState)
_authState.value = authState
}

private val AppAuthState.relevantManagers: List<LoginManager>
get() = loginManagers.filter { it.appliesTo(this) }

private val AppAuthState.manager: LoginManager?
get() = relevantManagers.takeIf { it.size == 1 }?.first()
private val AppAuthState.manager: LoginManager
get() = relevantManagers.first()

override fun onDestroy() {
super.onDestroy()
loginManagers.forEach { it.onDestroy() }
executor.stop()
}

suspend fun update(manager: LoginManager) {
Expand All @@ -321,52 +329,83 @@ abstract class AuthService : LifecycleService(), LoginListener {
private suspend fun <T> updateState(
update: suspend AppAuthState.Builder.(AppAuthState) -> T
): T {
var result: T? = null
do {
val currentValue = authState.value
val newValue = currentValue.alter {
result = this.update(currentValue)
}
} while (!_authState.compareAndSet(currentValue, newValue))
@Suppress("UNCHECKED_CAST")
return result as T
logger.info("Trying to tweak auth state")
authUpdateMutex.withLock {
var result: T? = null
do {
logger.debug("Updating auth state")
val currentValue = authState.value
val newValue = currentValue.alter {
result = this.update(currentValue)
}
} while (!_authState.compareAndSet(currentValue, newValue))
@Suppress("UNCHECKED_CAST")
return result as T
}
}

private fun applyState(function: AppAuthState.() -> Unit) {
latestAppAuth.apply(function)
val updatedState = latestAppAuth.apply(function)
_authState.value = updatedState
}

suspend fun invalidate(token: String?, disableRefresh: Boolean) {
updateState { auth ->
logger.info("Invalidating authentication state")
if (token != null && token != auth.token) return@updateState
executor.execute {
updateState { auth ->
logger.info("Invalidating authentication state")
if (token != null && token != auth.token) return@updateState

auth.relevantManagers.forEach {
it.invalidate(this@updateState, disableRefresh)
}
if (disableRefresh) {
clear()
} else {
invalidate()
auth.relevantManagers.forEach {
it.invalidate(this@updateState, disableRefresh)
}
if (disableRefresh) {
clear()
} else {
invalidate()
}
}
}
}

suspend fun registerSource(
source: SourceMetadata,
success: (AppAuthState, SourceMetadata) -> Unit,
failure: (Exception?) -> Unit
): SourceMetadata {
return try {
updateState { state ->
checkNotNull(state.manager) { "Missing auth manager for source $source" }
.registerSource(this, source)
}
} catch (ex: Exception) {
updateState {
sourceMetadata.removeAll(source::matches)
success: suspend (AppAuthState, SourceMetadata) -> Unit,
failure: suspend (Exception?) -> Unit
) {
logger.info(
"Registering source with SourceType: {}, and SourceId: {}",
source.type,
source.sourceId
)

updateState { auth ->
auth.relevantManagers.any { manager ->
manager.registerSource(
this,
source,
{ newAppAuth, newSource ->
if (newAppAuth != auth) {
_authState.value = newAppAuth
}
successHandlers += { success(newAppAuth, newSource) }
if (!sourceRegistrationStarted) {
executor.delay(1_000L) {
doRefresh()
successHandlers.forEach { it() }
successHandlers.clear()
sourceRegistrationStarted = false
}
sourceRegistrationStarted = true
}
},
{ ex ->
updateState {
sourceMetadata.removeAll(source::matches)
}
authSerialization.store(auth)
failure(ex)
})
}
throw ex
}
}

Expand All @@ -376,6 +415,13 @@ abstract class AuthService : LifecycleService(), LoginListener {
}

inner class AuthServiceBinder: Binder() {

suspend fun addLoginListener(loginListener: LoginListener) =
this@AuthService.addLoginListener(loginListener)

suspend fun removeLoginListener(registry: LoginListenerRegistry) =
this@AuthService.removeLoginListener(registry)

val managers: List<LoginManager>
get() = loginManagers

Expand All @@ -398,8 +444,8 @@ abstract class AuthService : LifecycleService(), LoginListener {

suspend fun registerSource(
source: SourceMetadata,
success: (AppAuthState, SourceMetadata) -> Unit, failure: (Exception?) -> Unit
): SourceMetadata =
success: suspend (AppAuthState, SourceMetadata) -> Unit, failure: (Exception?) -> Unit
) =
this@AuthService.registerSource(source, success, failure)

fun updateSource(source: SourceMetadata, success: (AppAuthState, SourceMetadata) -> Unit, failure: (Exception?) -> Unit) =
Expand All @@ -408,7 +454,7 @@ abstract class AuthService : LifecycleService(), LoginListener {
suspend fun unregisterSources(sources: Iterable<SourceMetadata>) =
this@AuthService.unregisterSources(sources)

fun invalidate(token: String?, disableRefresh: Boolean) = this@AuthService.invalidate(token, disableRefresh)
suspend fun invalidate(token: String?, disableRefresh: Boolean) = this@AuthService.invalidate(token, disableRefresh)

var isInLoginActivity: Boolean
get() = this@AuthService.isInLoginActivity
Expand All @@ -417,16 +463,19 @@ abstract class AuthService : LifecycleService(), LoginListener {
}
}

private fun updateSource(source: SourceMetadata, success: (AppAuthState, SourceMetadata) -> Unit, failure: (Exception?) -> Unit) {
updateState {

}
loginManagers
handler.execute {
relevantManagers.any { manager ->
manager.updateSource(latestAppAuth, source, success, failure)
private fun updateSource(
source: SourceMetadata,
success: (AppAuthState, SourceMetadata) -> Unit,
failure: (Exception?) -> Unit
) {
executor.execute {
updateState {
it.relevantManagers.any { manager ->
manager.updateSource(this, source, success, failure)
}
}
}
loginManagers
}

sealed interface AuthLoginListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class ManagementPortalLoginManager(
override suspend fun refresh(authState: AppAuthState.Builder): Boolean {
authState.attributes[MP_REFRESH_TOKEN_PROPERTY] ?: return false
val client = client ?: return true
if (mutex.tryLock()) {
mutex.withLock {
try {
val subjectParser = SubjectTokenParser(client, authState)

Expand All @@ -134,8 +134,6 @@ class ManagementPortalLoginManager(
} catch (exception: Exception) {
logger.error("Failed to receive access token", exception)
throw exception
} finally {
mutex.unlock()
}
}
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NetworkConnectedReceiver(
context.getSystemService(Context.CONNECTIVITY_SERVICE) as? ConnectivityManager
) { "No connectivity manager available" }

val isMonitoring: AtomicBoolean = AtomicBoolean(false)
private val isMonitoring: AtomicBoolean = AtomicBoolean(false)

var state: Flow<NetworkState>? = null

Expand Down

0 comments on commit bbb724f

Please sign in to comment.