From 26036591ceda7676d14909c0efe5a2546b44e8fd Mon Sep 17 00:00:00 2001 From: Ryan Lepinski Date: Thu, 27 Jun 2024 17:15:29 -0700 Subject: [PATCH] [CHAN-6807] Refresh contact channels on push key and foreground (#1461) * [CHAN-6807] Refresh contact channels on push key and foreground * spots --- .../main/java/com/urbanairship/UAirship.java | 2 +- .../java/com/urbanairship/contacts/Contact.kt | 21 ++- .../contacts/ContactChannelsProvider.kt | 128 +++++++++++------- .../contacts/ContactChannelsProviderTest.kt | 39 ++++++ .../com/urbanairship/contacts/ContactTest.kt | 29 ++++ 5 files changed, 169 insertions(+), 50 deletions(-) diff --git a/urbanairship-core/src/main/java/com/urbanairship/UAirship.java b/urbanairship-core/src/main/java/com/urbanairship/UAirship.java index 24d53cfb6..73e7756d1 100644 --- a/urbanairship-core/src/main/java/com/urbanairship/UAirship.java +++ b/urbanairship-core/src/main/java/com/urbanairship/UAirship.java @@ -742,7 +742,7 @@ private void init() { this.channelCapture = new ChannelCapture(application, airshipConfigOptions, channel, preferenceDataStore, GlobalActivityMonitor.shared(application)); components.add(this.channelCapture); - this.contact = new Contact(application, preferenceDataStore, runtimeConfig, privacyManager, channel, localeManager, audienceOverridesProvider); + this.contact = new Contact(application, preferenceDataStore, runtimeConfig, privacyManager, channel, localeManager, audienceOverridesProvider, pushManager); components.add(this.contact); requestSession.setContactAuthTokenProvider(this.contact.getAuthTokenProvider()); diff --git a/urbanairship-core/src/main/java/com/urbanairship/contacts/Contact.kt b/urbanairship-core/src/main/java/com/urbanairship/contacts/Contact.kt index c138ad29d..7c3cb36b4 100644 --- a/urbanairship-core/src/main/java/com/urbanairship/contacts/Contact.kt +++ b/urbanairship-core/src/main/java/com/urbanairship/contacts/Contact.kt @@ -34,6 +34,9 @@ import com.urbanairship.job.JobDispatcher import com.urbanairship.job.JobInfo import com.urbanairship.job.JobResult import com.urbanairship.locale.LocaleManager +import com.urbanairship.push.PushListener +import com.urbanairship.push.PushManager +import com.urbanairship.push.PushMessage import com.urbanairship.util.CachedValue import com.urbanairship.util.Clock import com.urbanairship.util.SerialQueue @@ -66,6 +69,7 @@ public class Contact internal constructor( private val subscriptionListApiClient: SubscriptionListApiClient, private val contactManager: ContactManager, private val smsValidator: SmsValidator, + pushManager: PushManager, contactChannelsProvider: ContactChannelsProvider = ContactChannelsProvider( config, audienceOverridesProvider, @@ -81,6 +85,7 @@ public class Contact internal constructor( airshipChannel: AirshipChannel, localeManager: LocaleManager, audienceOverridesProvider: AudienceOverridesProvider, + pushManager: PushManager ) : this( context, preferenceDataStore, @@ -99,7 +104,8 @@ public class Contact internal constructor( localeManager, audienceOverridesProvider ), - AirshipSmsValidator(config) + AirshipSmsValidator(config), + pushManager ) /** @@ -193,9 +199,17 @@ public class Contact internal constructor( } lastResolvedDate = clock.currentTimeMillis() } + + contactChannelsProvider.refresh() } }) + pushManager.addInternalPushListener { message, _ -> + if (message.containsKey(CONTACT_UPDATE_PUSH_KEY)) { + contactChannelsProvider.refresh() + } + } + subscriptionsScope.launch { for (conflict in contactManager.conflictEvents) { contactConflictListener?.onConflict(conflict) @@ -590,6 +604,8 @@ public class Contact internal constructor( } } + + internal companion object { @VisibleForTesting @@ -614,6 +630,9 @@ public class Contact internal constructor( /** Default CRA max age. */ private val CRA_MAX_AGE = TimeUnit.MINUTES.toMillis(10) + + private val CONTACT_UPDATE_PUSH_KEY = "com.urbanairship.contact.update" + } private data class Subscriptions( diff --git a/urbanairship-core/src/main/java/com/urbanairship/contacts/ContactChannelsProvider.kt b/urbanairship-core/src/main/java/com/urbanairship/contacts/ContactChannelsProvider.kt index 79ce65cac..3032825a8 100644 --- a/urbanairship-core/src/main/java/com/urbanairship/contacts/ContactChannelsProvider.kt +++ b/urbanairship-core/src/main/java/com/urbanairship/contacts/ContactChannelsProvider.kt @@ -8,6 +8,7 @@ import com.urbanairship.config.AirshipRuntimeConfig import com.urbanairship.util.CachedValue import com.urbanairship.util.Clock import com.urbanairship.util.TaskSleeper +import java.util.UUID import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.time.Duration @@ -18,7 +19,7 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow @@ -38,13 +39,11 @@ internal class ContactChannelsProvider( dispatcher: CoroutineDispatcher = AirshipDispatchers.newSerialDispatcher() ) { private val scope = CoroutineScope(dispatcher + SupervisorJob()) - private val cachedResponse = CachedValue>>(clock) - /// Map to cache address to channels to make matching easier private val addressToChannelIdMap = mutableMapOf() private val lock = ReentrantLock() - - + private val changeTokenFlow = MutableStateFlow(UUID.randomUUID()) + private val fetchCache = FetchCache(clock, maxCacheAge) internal constructor( config: AirshipRuntimeConfig, audienceOverridesProvider: AudienceOverridesProvider, @@ -55,61 +54,69 @@ internal class ContactChannelsProvider( contactUpdates = contactUpdates ) + fun refresh() { + changeTokenFlow.value = UUID.randomUUID() + } + @OptIn(ExperimentalCoroutinesApi::class) - val contactChannels: SharedFlow>> = contactUpdates.mapNotNull { - if (it?.isStable == true) { it.contactId } else { null } - }.flatMapLatest { contactId -> - val fetchUpdates = flow { - - var backoff: Duration = initialBackoff - var isFirstFetch = true - - while (true) { - val fetched = fetch(contactId) - backoff = if (fetched.isSuccess) { - emit(fetched) - taskSleeper.sleep(cachedResponse.remainingCacheTimeMillis().milliseconds) - initialBackoff - } else { - if (isFirstFetch) { + val contactChannels: SharedFlow>> by lazy { + var lastContactId: String? = null + + val stableContactIdUpdates = contactUpdates.mapNotNull { + if (it?.isStable == true) { it.contactId } else { null } + } + + combine(stableContactIdUpdates, changeTokenFlow) { contactId, changeToken -> + Pair(contactId, changeToken) + }.flatMapLatest { (contactId, changeToken) -> + val fetchUpdates = flow { + + var backoff: Duration = initialBackoff + + while (true) { + val fetched = fetch(contactId, changeToken) + backoff = if (fetched.isSuccess) { emit(fetched) + taskSleeper.sleep(fetchCache.remainingCacheTimeMillis) + initialBackoff + } else { + if (lastContactId != contactId) { + emit(fetched) + } + taskSleeper.sleep(backoff) + backoff.times(2).coerceAtMost(maxBackoff) } - taskSleeper.sleep(backoff) - backoff.times(2).coerceAtMost(maxBackoff) + lastContactId = contactId } - isFirstFetch = false } - } - val overridesUpdates = audienceOverridesProvider.updates.map { _ -> - audienceOverridesProvider.contactOverrides(contactId) - } + val overridesUpdates = audienceOverridesProvider.updates.map { _ -> + audienceOverridesProvider.contactOverrides(contactId) + } - combine(fetchUpdates, overridesUpdates) { fetchUpdate, overrides -> - fetchUpdate.fold(onSuccess = { - Result.success(applyOverrides(it, overrides.channels)) - }, onFailure = { - Result.failure(it) - }) - } - }.shareIn( - scope = scope, - started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 100), - replay = 1 - ) + combine(fetchUpdates, overridesUpdates) { fetchUpdate, overrides -> + fetchUpdate.fold(onSuccess = { + Result.success(applyOverrides(it, overrides.channels)) + }, onFailure = { + Result.failure(it) + }) + } + }.shareIn( + scope = scope, + started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 100), + replay = 1 + ) + } - private suspend fun fetch(contactId: String): Result> { - val cached = cachedResponse.get() - if (cached != null && cached.first == contactId) { - return Result.success(cached.second) + private suspend fun fetch(contactId: String, changeToken: UUID): Result> { + val cached = fetchCache.getCache(contactId, changeToken) + if (cached != null) { + return Result.success(cached) } val result = apiClient.fetch(contactId) if (result.isSuccessful && result.value != null) { - cachedResponse.set( - Pair(contactId, result.value), - clock.currentTimeMillis() + maxCacheAge.inWholeMilliseconds - ) + fetchCache.setCache(contactId, changeToken, result.value) return Result.success(result.value) } @@ -272,3 +279,28 @@ private val ContactChannel.channelId: String? } } } + + +private class FetchCache(private val clock: Clock, private val maxCacheAge: Duration) { + private val cachedResponse = CachedValue>>(clock) + + fun getCache(contactId: String, changeToken: UUID): List? { + val cached = cachedResponse.get() + if (cached != null && cached.first == contactId && cached.second == changeToken) { + return cached.third + } + + return null + } + + fun setCache(contactId: String, changeToken: UUID, value: List) { + cachedResponse.set( + Triple(contactId, changeToken, value), + clock.currentTimeMillis() + maxCacheAge.inWholeMilliseconds + ) + } + + val remainingCacheTimeMillis: Duration + get() = cachedResponse.remainingCacheTimeMillis().milliseconds + +} diff --git a/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactChannelsProviderTest.kt b/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactChannelsProviderTest.kt index e69c8d355..a104ef16e 100644 --- a/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactChannelsProviderTest.kt +++ b/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactChannelsProviderTest.kt @@ -183,6 +183,45 @@ public class ContactChannelsProviderTest { } } + @Test + public fun testManualRefresh(): TestResult = runTest(testDispatcher) { + val firstResponse = listOf( + makeRegisteredSmsChannel(), + ) + + val secondResponse = listOf( + makeRegisteredSmsChannel(), + makeRegisteredEmailChannel() + ) + + coEvery { apiClient.fetch("some-contact-id") } returnsMany listOf( + makeRequestResult(firstResponse), + makeRequestResult(secondResponse) + ) + + val channelOverrides = listOf( + ContactChannelMutation.Associate(makePendingSmsChannel()), + ContactChannelMutation.Associate(makePendingEmailChannel()) + ) + + coEvery { audienceOverridesProvider.contactOverrides("some-contact-id") } returns AudienceOverrides.Contact( + channels = channelOverrides + ) + + provider.contactChannels.test { + ensureAllEventsConsumed() + + contactUpdates.value = ContactIdUpdate("some-contact-id", namedUserId = null, isStable = true, resolveDateMs = 0) + + assertEquals(firstResponse + channelOverrides.map { it.channel }, this.awaitItem().getOrThrow()) + ensureAllEventsConsumed() + + provider.refresh() + assertEquals(secondResponse + channelOverrides.map { it.channel }, this.awaitItem().getOrThrow()) + ensureAllEventsConsumed() + } + } + @OptIn(ExperimentalCoroutinesApi::class) @Test public fun testRefreshFailsIgnored(): TestResult = runTest(testDispatcher) { diff --git a/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactTest.kt b/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactTest.kt index 2c5fa924e..029f3a28d 100644 --- a/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactTest.kt +++ b/urbanairship-core/src/test/java/com/urbanairship/contacts/ContactTest.kt @@ -18,6 +18,9 @@ import com.urbanairship.channel.SmsValidator import com.urbanairship.channel.TagGroupsMutation import com.urbanairship.http.RequestResult import com.urbanairship.json.JsonValue +import com.urbanairship.push.PushListener +import com.urbanairship.push.PushManager +import com.urbanairship.push.PushMessage import com.urbanairship.remoteconfig.ContactConfig import com.urbanairship.remoteconfig.RemoteConfig import io.mockk.coEvery @@ -83,6 +86,10 @@ public class ContactTest { private val context: Context = ApplicationProvider.getApplicationContext() private val preferenceDataStore = PreferenceDataStore.inMemoryStore(context) private val privacyManager = PrivacyManager(preferenceDataStore, PrivacyManager.Feature.ALL) + private val pushListeners = mutableListOf() + private val mockPushManager: PushManager = mockk { + every { this@mockk.addInternalPushListener(capture(pushListeners)) } just runs + } private val contact: Contact by lazy { Contact( @@ -97,6 +104,7 @@ public class ContactTest { mockSubscriptionListApiClient, mockContactManager, mockSmsValidator, + mockPushManager, mockChannelsContactProvider, testDispatcher ) @@ -288,6 +296,27 @@ public class ContactTest { assertEquals(2, count) } + @Test + public fun testForegroundRefreshesContactChannels(): TestResult = runTest { + // init + contact + verify(exactly = 0) { mockChannelsContactProvider.refresh() } + + testActivityMonitor.foreground() + verify { mockChannelsContactProvider.refresh() } + } + + @Test + public fun testPushRefreshesContact(): TestResult = runTest { + // init + contact + val mockPush = mockk { + every { this@mockk.containsKey("com.urbanairship.contact.update") } returns true + } + pushListeners.forEach { it.onPushReceived(mockPush, false) } + verify { mockChannelsContactProvider.refresh() } + } + @Test public fun testForegroundResolvesRemoteConfig(): TestResult = runTest { // init