Skip to content

Commit

Permalink
[CHAN-6807] Refresh contact channels on push key and foreground (#1461)
Browse files Browse the repository at this point in the history
* [CHAN-6807] Refresh contact channels on push key and foreground

* spots
  • Loading branch information
rlepinski authored Jun 28, 2024
1 parent d75f7b5 commit 2603659
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ private void init() {
this.channelCapture = new ChannelCapture(application, airshipConfigOptions, channel, preferenceDataStore, GlobalActivityMonitor.shared(application));
components.add(this.channelCapture);

this.contact = new Contact(application, preferenceDataStore, runtimeConfig, privacyManager, channel, localeManager, audienceOverridesProvider);
this.contact = new Contact(application, preferenceDataStore, runtimeConfig, privacyManager, channel, localeManager, audienceOverridesProvider, pushManager);
components.add(this.contact);
requestSession.setContactAuthTokenProvider(this.contact.getAuthTokenProvider());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import com.urbanairship.job.JobDispatcher
import com.urbanairship.job.JobInfo
import com.urbanairship.job.JobResult
import com.urbanairship.locale.LocaleManager
import com.urbanairship.push.PushListener
import com.urbanairship.push.PushManager
import com.urbanairship.push.PushMessage
import com.urbanairship.util.CachedValue
import com.urbanairship.util.Clock
import com.urbanairship.util.SerialQueue
Expand Down Expand Up @@ -66,6 +69,7 @@ public class Contact internal constructor(
private val subscriptionListApiClient: SubscriptionListApiClient,
private val contactManager: ContactManager,
private val smsValidator: SmsValidator,
pushManager: PushManager,
contactChannelsProvider: ContactChannelsProvider = ContactChannelsProvider(
config,
audienceOverridesProvider,
Expand All @@ -81,6 +85,7 @@ public class Contact internal constructor(
airshipChannel: AirshipChannel,
localeManager: LocaleManager,
audienceOverridesProvider: AudienceOverridesProvider,
pushManager: PushManager
) : this(
context,
preferenceDataStore,
Expand All @@ -99,7 +104,8 @@ public class Contact internal constructor(
localeManager,
audienceOverridesProvider
),
AirshipSmsValidator(config)
AirshipSmsValidator(config),
pushManager
)

/**
Expand Down Expand Up @@ -193,9 +199,17 @@ public class Contact internal constructor(
}
lastResolvedDate = clock.currentTimeMillis()
}

contactChannelsProvider.refresh()
}
})

pushManager.addInternalPushListener { message, _ ->
if (message.containsKey(CONTACT_UPDATE_PUSH_KEY)) {
contactChannelsProvider.refresh()
}
}

subscriptionsScope.launch {
for (conflict in contactManager.conflictEvents) {
contactConflictListener?.onConflict(conflict)
Expand Down Expand Up @@ -590,6 +604,8 @@ public class Contact internal constructor(
}
}



internal companion object {

@VisibleForTesting
Expand All @@ -614,6 +630,9 @@ public class Contact internal constructor(

/** Default CRA max age. */
private val CRA_MAX_AGE = TimeUnit.MINUTES.toMillis(10)

private val CONTACT_UPDATE_PUSH_KEY = "com.urbanairship.contact.update"

}

private data class Subscriptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.urbanairship.config.AirshipRuntimeConfig
import com.urbanairship.util.CachedValue
import com.urbanairship.util.Clock
import com.urbanairship.util.TaskSleeper
import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.time.Duration
Expand All @@ -18,7 +19,7 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
Expand All @@ -38,13 +39,11 @@ internal class ContactChannelsProvider(
dispatcher: CoroutineDispatcher = AirshipDispatchers.newSerialDispatcher()
) {
private val scope = CoroutineScope(dispatcher + SupervisorJob())
private val cachedResponse = CachedValue<Pair<String, List<ContactChannel>>>(clock)

/// Map to cache address to channels to make matching easier
private val addressToChannelIdMap = mutableMapOf<String, String>()
private val lock = ReentrantLock()


private val changeTokenFlow = MutableStateFlow(UUID.randomUUID())
private val fetchCache = FetchCache(clock, maxCacheAge)
internal constructor(
config: AirshipRuntimeConfig,
audienceOverridesProvider: AudienceOverridesProvider,
Expand All @@ -55,61 +54,69 @@ internal class ContactChannelsProvider(
contactUpdates = contactUpdates
)

fun refresh() {
changeTokenFlow.value = UUID.randomUUID()
}

@OptIn(ExperimentalCoroutinesApi::class)
val contactChannels: SharedFlow<Result<List<ContactChannel>>> = contactUpdates.mapNotNull {
if (it?.isStable == true) { it.contactId } else { null }
}.flatMapLatest { contactId ->
val fetchUpdates = flow {

var backoff: Duration = initialBackoff
var isFirstFetch = true

while (true) {
val fetched = fetch(contactId)
backoff = if (fetched.isSuccess) {
emit(fetched)
taskSleeper.sleep(cachedResponse.remainingCacheTimeMillis().milliseconds)
initialBackoff
} else {
if (isFirstFetch) {
val contactChannels: SharedFlow<Result<List<ContactChannel>>> by lazy {
var lastContactId: String? = null

val stableContactIdUpdates = contactUpdates.mapNotNull {
if (it?.isStable == true) { it.contactId } else { null }
}

combine(stableContactIdUpdates, changeTokenFlow) { contactId, changeToken ->
Pair(contactId, changeToken)
}.flatMapLatest { (contactId, changeToken) ->
val fetchUpdates = flow {

var backoff: Duration = initialBackoff

while (true) {
val fetched = fetch(contactId, changeToken)
backoff = if (fetched.isSuccess) {
emit(fetched)
taskSleeper.sleep(fetchCache.remainingCacheTimeMillis)
initialBackoff
} else {
if (lastContactId != contactId) {
emit(fetched)
}
taskSleeper.sleep(backoff)
backoff.times(2).coerceAtMost(maxBackoff)
}
taskSleeper.sleep(backoff)
backoff.times(2).coerceAtMost(maxBackoff)
lastContactId = contactId
}
isFirstFetch = false
}
}

val overridesUpdates = audienceOverridesProvider.updates.map { _ ->
audienceOverridesProvider.contactOverrides(contactId)
}
val overridesUpdates = audienceOverridesProvider.updates.map { _ ->
audienceOverridesProvider.contactOverrides(contactId)
}

combine(fetchUpdates, overridesUpdates) { fetchUpdate, overrides ->
fetchUpdate.fold(onSuccess = {
Result.success(applyOverrides(it, overrides.channels))
}, onFailure = {
Result.failure(it)
})
}
}.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 100),
replay = 1
)
combine(fetchUpdates, overridesUpdates) { fetchUpdate, overrides ->
fetchUpdate.fold(onSuccess = {
Result.success(applyOverrides(it, overrides.channels))
}, onFailure = {
Result.failure(it)
})
}
}.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 100),
replay = 1
)
}

private suspend fun fetch(contactId: String): Result<List<ContactChannel>> {
val cached = cachedResponse.get()
if (cached != null && cached.first == contactId) {
return Result.success(cached.second)
private suspend fun fetch(contactId: String, changeToken: UUID): Result<List<ContactChannel>> {
val cached = fetchCache.getCache(contactId, changeToken)
if (cached != null) {
return Result.success(cached)
}

val result = apiClient.fetch(contactId)
if (result.isSuccessful && result.value != null) {
cachedResponse.set(
Pair(contactId, result.value),
clock.currentTimeMillis() + maxCacheAge.inWholeMilliseconds
)
fetchCache.setCache(contactId, changeToken, result.value)
return Result.success(result.value)
}

Expand Down Expand Up @@ -272,3 +279,28 @@ private val ContactChannel.channelId: String?
}
}
}


private class FetchCache(private val clock: Clock, private val maxCacheAge: Duration) {
private val cachedResponse = CachedValue<Triple<String, UUID, List<ContactChannel>>>(clock)

fun getCache(contactId: String, changeToken: UUID): List<ContactChannel>? {
val cached = cachedResponse.get()
if (cached != null && cached.first == contactId && cached.second == changeToken) {
return cached.third
}

return null
}

fun setCache(contactId: String, changeToken: UUID, value: List<ContactChannel>) {
cachedResponse.set(
Triple(contactId, changeToken, value),
clock.currentTimeMillis() + maxCacheAge.inWholeMilliseconds
)
}

val remainingCacheTimeMillis: Duration
get() = cachedResponse.remainingCacheTimeMillis().milliseconds

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,45 @@ public class ContactChannelsProviderTest {
}
}

@Test
public fun testManualRefresh(): TestResult = runTest(testDispatcher) {
val firstResponse = listOf(
makeRegisteredSmsChannel(),
)

val secondResponse = listOf(
makeRegisteredSmsChannel(),
makeRegisteredEmailChannel()
)

coEvery { apiClient.fetch("some-contact-id") } returnsMany listOf(
makeRequestResult(firstResponse),
makeRequestResult(secondResponse)
)

val channelOverrides = listOf(
ContactChannelMutation.Associate(makePendingSmsChannel()),
ContactChannelMutation.Associate(makePendingEmailChannel())
)

coEvery { audienceOverridesProvider.contactOverrides("some-contact-id") } returns AudienceOverrides.Contact(
channels = channelOverrides
)

provider.contactChannels.test {
ensureAllEventsConsumed()

contactUpdates.value = ContactIdUpdate("some-contact-id", namedUserId = null, isStable = true, resolveDateMs = 0)

assertEquals(firstResponse + channelOverrides.map { it.channel }, this.awaitItem().getOrThrow())
ensureAllEventsConsumed()

provider.refresh()
assertEquals(secondResponse + channelOverrides.map { it.channel }, this.awaitItem().getOrThrow())
ensureAllEventsConsumed()
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
public fun testRefreshFailsIgnored(): TestResult = runTest(testDispatcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import com.urbanairship.channel.SmsValidator
import com.urbanairship.channel.TagGroupsMutation
import com.urbanairship.http.RequestResult
import com.urbanairship.json.JsonValue
import com.urbanairship.push.PushListener
import com.urbanairship.push.PushManager
import com.urbanairship.push.PushMessage
import com.urbanairship.remoteconfig.ContactConfig
import com.urbanairship.remoteconfig.RemoteConfig
import io.mockk.coEvery
Expand Down Expand Up @@ -83,6 +86,10 @@ public class ContactTest {
private val context: Context = ApplicationProvider.getApplicationContext()
private val preferenceDataStore = PreferenceDataStore.inMemoryStore(context)
private val privacyManager = PrivacyManager(preferenceDataStore, PrivacyManager.Feature.ALL)
private val pushListeners = mutableListOf<PushListener>()
private val mockPushManager: PushManager = mockk {
every { this@mockk.addInternalPushListener(capture(pushListeners)) } just runs
}

private val contact: Contact by lazy {
Contact(
Expand All @@ -97,6 +104,7 @@ public class ContactTest {
mockSubscriptionListApiClient,
mockContactManager,
mockSmsValidator,
mockPushManager,
mockChannelsContactProvider,
testDispatcher
)
Expand Down Expand Up @@ -288,6 +296,27 @@ public class ContactTest {
assertEquals(2, count)
}

@Test
public fun testForegroundRefreshesContactChannels(): TestResult = runTest {
// init
contact
verify(exactly = 0) { mockChannelsContactProvider.refresh() }

testActivityMonitor.foreground()
verify { mockChannelsContactProvider.refresh() }
}

@Test
public fun testPushRefreshesContact(): TestResult = runTest {
// init
contact
val mockPush = mockk<PushMessage> {
every { this@mockk.containsKey("com.urbanairship.contact.update") } returns true
}
pushListeners.forEach { it.onPushReceived(mockPush, false) }
verify { mockChannelsContactProvider.refresh() }
}

@Test
public fun testForegroundResolvesRemoteConfig(): TestResult = runTest {
// init
Expand Down

0 comments on commit 2603659

Please sign in to comment.