From 6b44bcef733160f52859b08257606158f41eef86 Mon Sep 17 00:00:00 2001 From: Ryan Lepinski Date: Wed, 4 Dec 2024 14:48:19 -0800 Subject: [PATCH] Rework channel to combine (#3296) * Rework channel to combine * Small things * Update AirshipAsyncChannel.swift * Update AirshipAsyncChannel.swift --------- Co-authored-by: David --- .../Source/AirshipAsyncChannel.swift | 150 +++++++++++++++++- Airship/AirshipCore/Source/Push.swift | 104 ++++-------- .../Tests/AirshipAsyncChannelTest.swift | 111 +++++++++++++ Airship/AirshipCore/Tests/PushTest.swift | 26 +++ .../Source/View/AirshipDebugView.swift | 12 +- .../Source/MessageCenterList.swift | 77 ++------- 6 files changed, 340 insertions(+), 140 deletions(-) diff --git a/Airship/AirshipCore/Source/AirshipAsyncChannel.swift b/Airship/AirshipCore/Source/AirshipAsyncChannel.swift index c438f9e11..08e6c040c 100644 --- a/Airship/AirshipCore/Source/AirshipAsyncChannel.swift +++ b/Airship/AirshipCore/Source/AirshipAsyncChannel.swift @@ -1,11 +1,11 @@ /* Copyright Airship and Contributors */ import Foundation +import Combine -/// Simple implemenation of a `channel` that allows multiple AsyncStream of the same data. +/// Simple implementation of a `channel` that allows multiple AsyncStreams of the same data. /// - Note: for internal use only. :nodoc: public actor AirshipAsyncChannel { - public enum BufferPolicy: Sendable { case unbounded case bufferingNewest(Int) @@ -63,3 +63,149 @@ public actor AirshipAsyncChannel { } } } + +public extension AirshipAsyncChannel { + + /// Makes a stream that is nonisolated from the actor by wrapping the + /// actor stream in an async stream. + /// - Parameters: + /// - bufferPolicy: The buffer policy. + /// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream. + /// - transform: Transforms the channel values to the stream value. If nil, it a value is mapped to nil it will finish the stream. + /// - Returns: An AsyncStream. + nonisolated func makeNonIsolatedStream( + bufferPolicy: BufferPolicy = .unbounded, + initialValue: (@Sendable () async -> R?)? = nil, + transform: @escaping @Sendable (T) async -> R? + ) -> AsyncStream { + return AsyncStream { [weak self] continuation in + let task = Task { [weak self] in + guard let stream = await self?.makeStream() else { + return + } + + if let initialValue { + guard let value = await initialValue() else { + continuation.finish() + return + } + + continuation.yield(value) + } + + for await update in stream.map(transform) { + if let update { + continuation.yield(update) + } else { + continuation.finish() + } + } + continuation.finish() + } + + continuation.onTermination = { _ in + task.cancel() + } + } + } + + /// Makes a stream that is nonisolated from the actor by wrapping the + /// actor stream in an async stream. + /// - Parameters: + /// - bufferPolicy: The buffer policy. + /// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream. + /// - transform: Transforms the channel values to the stream value. If nil, it a value is mapped to nil it will finish the stream. + /// - Returns: An AsyncStream. + nonisolated func makeNonIsolatedStream( + bufferPolicy: BufferPolicy = .unbounded, + initialValue: (@Sendable () async -> T)? = nil + ) -> AsyncStream { + return makeNonIsolatedStream( + bufferPolicy: bufferPolicy, + initialValue: initialValue, + transform: { $0 } + ) + } + + /// Makes a stream that is nonisolated from the actor by wrapping the + /// actor stream in an async stream. Values will only be emitted if they are different than the previous value. + /// - Parameters: + /// - bufferPolicy: The buffer policy. + /// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream. + /// - Returns: An AsyncStream. + nonisolated func makeNonIsolatedDedupingStream( + bufferPolicy: BufferPolicy = .unbounded, + initialValue: (@Sendable () async -> R?)? = nil, + transform: @escaping @Sendable (T) async -> R? + ) -> AsyncStream { + return AsyncStream { [weak self] continuation in + let task = Task { [weak self] in + guard let stream = await self?.makeStream() else { + return + } + + var last: R? = nil + + if let initialValue { + guard let value = await initialValue() else { + continuation.finish() + return + } + + continuation.yield(value) + last = value + } + + for await update in stream.map(transform) { + guard let update else { + continuation.finish() + return + } + + if update != last { + continuation.yield(update) + last = update + } + } + } + + continuation.onTermination = { _ in + task.cancel() + } + } + } + + /// Makes a stream that is nonisolated from the actor by wrapping the + /// actor stream in an async stream. Values will only be emitted if they are different than the previous value. + /// - Parameters: + /// - bufferPolicy: The buffer policy. + /// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream. + /// - Returns: An AsyncStream. + nonisolated func makeNonIsolatedDedupingStream( + bufferPolicy: BufferPolicy = .unbounded, + initialValue: (@Sendable () async -> T?)? = nil + ) -> AsyncStream where T: Equatable { + return makeNonIsolatedDedupingStream( + bufferPolicy: bufferPolicy, + initialValue: initialValue, + transform: { $0 } + ) + } +} + +public extension AsyncStream where Element : Sendable { + + /// Creates a combine publisher from an AsyncStream. + /// - Note: for internal use only. :nodoc: + var airshipPublisher: AnyPublisher{ + let subject = CurrentValueSubject(nil) + Task { [weak subject] in + for await update in self { + guard let subject else { return } + subject.send(update) + } + } + return subject.eraseToAnyPublisher() + } + +} diff --git a/Airship/AirshipCore/Source/Push.swift b/Airship/AirshipCore/Source/Push.swift index 4ad186485..3083afb1e 100644 --- a/Airship/AirshipCore/Source/Push.swift +++ b/Airship/AirshipCore/Source/Push.swift @@ -13,61 +13,32 @@ import UIKit /// This singleton provides an interface to the functionality provided by the Airship iOS Push API. final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { - private let pushTokenSubject = PassthroughSubject() - private var pushTokenPublisher: AnyPublisher { - self.pushTokenSubject - .prepend(Future { promise in - Task { - return await promise(.success(self.deviceToken)) - } - }) - .eraseToAnyPublisher() - } + private let pushTokenChannel = AirshipAsyncChannel() + + private let notificationStatusChannel = AirshipAsyncChannel() - private let notificationStatusSubject = PassthroughSubject() public var notificationStatusPublisher: AnyPublisher { - notificationStatusSubject - .prepend(Future { promise in - Task { - return await promise(.success(self.notificationStatus)) - } - }) - .removeDuplicates() + return notificationStatusUpdates + .airshipPublisher + .compactMap { $0 } .eraseToAnyPublisher() } var notificationStatusUpdates: AsyncStream { - let publisher = self.notificationStatusPublisher - return AsyncStream { continuation in - let cancellable = publisher - .removeDuplicates() - .sink { update in - continuation.yield(update) - } - - continuation.onTermination = { _ in - cancellable.cancel() - } - } + return self.notificationStatusChannel.makeNonIsolatedDedupingStream( + initialValue: { [weak self] in await self?.notificationStatus } + ) } - - private static let pushNotificationsOptionsKey = - "UAUserPushNotificationsOptions" - private static let userPushNotificationsEnabledKey = - "UAUserPushNotificationsEnabled" - private static let backgroundPushNotificationsEnabledKey = - "UABackgroundPushNotificationsEnabled" - private static let requestExplicitPermissionWhenEphemeralKey = - "UAExtendedPushNotificationPermissionEnabled" - - + private static let pushNotificationsOptionsKey = "UAUserPushNotificationsOptions" + private static let userPushNotificationsEnabledKey = "UAUserPushNotificationsEnabled" + private static let backgroundPushNotificationsEnabledKey = "UABackgroundPushNotificationsEnabled" + private static let requestExplicitPermissionWhenEphemeralKey = "UAExtendedPushNotificationPermissionEnabled" private static let badgeSettingsKey = "UAPushBadge" private static let deviceTokenKey = "UADeviceToken" private static let quietTimeSettingsKey = "UAPushQuietTime" private static let quietTimeEnabledSettingsKey = "UAPushQuietTimeEnabled" private static let timeZoneSettingsKey = "UAPushTimeZone" - private static let typesAuthorizedKey = "UAPushTypesAuthorized" private static let authorizationStatusKey = "UAPushAuthorizationStatus" private static let userPromptedForNotificationsKey = "UAPushUserPromptedForNotifications" @@ -323,7 +294,6 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { @MainActor public private(set) var deviceToken: String? { set { - guard let deviceToken = newValue else { self.dataStore.removeObject(forKey: AirshipPush.deviceTokenKey) self.updateNotificationStatus() @@ -357,6 +327,9 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { forKey: AirshipPush.deviceTokenKey ) AirshipLogger.importantInfo("Device token: \(deviceToken)") + Task { + await self.pushTokenChannel.send(deviceToken) + } } catch { AirshipLogger.error("Unable to set device token") } @@ -596,7 +569,6 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { private func waitForDeviceTokenRegistration() async { guard self.waitForDeviceToken, self.privacyManager.isEnabled(.push), - self.deviceToken == nil, self.apnsRegistrar.isRegisteredForRemoteNotifications else { return @@ -604,31 +576,27 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { self.waitForDeviceToken = false - var subscription: AnyCancellable? - defer { - subscription?.cancel() + let updates = await pushTokenChannel.makeStream() + guard self.deviceToken == nil else { + return } - await withCheckedContinuation { continuation in - let cancelTask = Task { @MainActor in - try await Task.sleep( - nanoseconds: UInt64(AirshipPush.deviceTokenRegistrationWaitTime * 1_000_000_000) - ) - subscription?.cancel() - try Task.checkCancellation() - continuation.resume() + let waitTask = Task { + for await _ in updates { + return } + } - subscription = self.pushTokenPublisher - .receive(on: RunLoop.main) - .sink { token in - if (token != nil) { - continuation.resume() - cancelTask.cancel() - subscription?.cancel() - } - } + let cancelTask = Task { @MainActor in + try await Task.sleep( + nanoseconds: UInt64(AirshipPush.deviceTokenRegistrationWaitTime * 1_000_000_000) + ) + try Task.checkCancellation() + waitTask.cancel() } + + await waitTask.value + cancelTask.cancel() } @objc @@ -715,8 +683,8 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable { } private func updateNotificationStatus() { - Task { @MainActor in - self.notificationStatusSubject.send(await self.notificationStatus) + Task { + await self.notificationStatusChannel.send(await self.notificationStatus) } } @@ -1317,7 +1285,7 @@ public struct QuietTimeSettings: Sendable, Equatable { /// End minute public let endMinute: UInt - + var startString: String { return "\(String(format: "%02d", startHour)):\(String(format: "%02d", startMinute))" } @@ -1372,6 +1340,4 @@ public struct QuietTimeSettings: Sendable, Equatable { self.endHour = endParts[0] self.endMinute = endParts[1] } - - } diff --git a/Airship/AirshipCore/Tests/AirshipAsyncChannelTest.swift b/Airship/AirshipCore/Tests/AirshipAsyncChannelTest.swift index 69ab029db..3992997dd 100644 --- a/Airship/AirshipCore/Tests/AirshipAsyncChannelTest.swift +++ b/Airship/AirshipCore/Tests/AirshipAsyncChannelTest.swift @@ -47,4 +47,115 @@ final class AirshipAsyncChannelTest: XCTestCase { } } + func testNonIsolatedDedupingStreamMapped() async throws { + var updates = channel.makeNonIsolatedDedupingStream( + initialValue: { + "1" + }, + transform: { int in + "\(int)" + } + ).makeAsyncIterator() + + // Wait for first so we know the task is setup to listen for changes + let first = await updates.next() + XCTAssertEqual(first, "1") + + await channel.send(2) + await channel.send(2) + await channel.send(2) + + await channel.send(3) + await channel.send(3) + await channel.send(4) + + var received: [String] = [] + for _ in 0...2 { + received.append(await updates.next()!) + } + + XCTAssertEqual(["2", "3", "4"], received) + } + + func testNonIsolatedDedupingStream() async throws { + var updates = channel.makeNonIsolatedDedupingStream( + initialValue: { + 1 + } + ).makeAsyncIterator() + await channel.send(1) + + // Wait for first so we know the task is setup to listen for changes + let first = await updates.next() + XCTAssertEqual(first, 1) + + await channel.send(1) + await channel.send(1) + + await channel.send(2) + await channel.send(2) + await channel.send(3) + + var received: [Int] = [] + for _ in 0...1 { + received.append(await updates.next()!) + } + + XCTAssertEqual([2, 3], received) + } + + func testNonIsolatedStreamMapped() async throws { + var updates = channel.makeNonIsolatedStream( + initialValue: { + "1" + }, + transform: { int in + "\(int)" + } + ).makeAsyncIterator() + + // Wait for first so we know the task is setup to listen for changes + let first = await updates.next() + XCTAssertEqual(first, "1") + + await channel.send(1) + await channel.send(2) + await channel.send(2) + + await channel.send(3) + await channel.send(4) + + var received: [String] = [] + for _ in 0...4 { + received.append(await updates.next()!) + } + + XCTAssertEqual(["1", "2", "2", "3", "4"], received) + } + + func testNonIsolatedStream() async throws { + var updates = channel.makeNonIsolatedStream( + initialValue: { 1 } + ).makeAsyncIterator() + + // Wait for first so we know the task is setup to listen for changes + let first = await updates.next() + XCTAssertEqual(first, 1) + + await channel.send(1) + await channel.send(1) + await channel.send(1) + + await channel.send(2) + await channel.send(2) + await channel.send(3) + + var received: [Int] = [] + for _ in 0...5 { + received.append(await updates.next()!) + } + + XCTAssertEqual([1, 1, 1, 2, 2, 3], received) + } + } diff --git a/Airship/AirshipCore/Tests/PushTest.swift b/Airship/AirshipCore/Tests/PushTest.swift index 02e80ad39..808654446 100644 --- a/Airship/AirshipCore/Tests/PushTest.swift +++ b/Airship/AirshipCore/Tests/PushTest.swift @@ -1095,7 +1095,33 @@ class PushTest: XCTestCase { ), status ) + } + + @MainActor + func testChannelRegistrationWaitsForToken() async { + apnsRegistrar.isRegisteredForRemoteNotifications = true + + let startedCRATask = self.expectation(description: "Started CRA") + + Task { + await fulfillment(of: [startedCRATask]) + push.didRegisterForRemoteNotifications( + PushTest.validDeviceToken.hexData + ) + } + + let payload = await Task { + Task { @MainActor in + if #available(iOS 16.0, *) { + try await Task.sleep(for: .milliseconds(100)) + } + startedCRATask.fulfill() + } + return await self.channel.channelPayload + }.value + + XCTAssertNotNil(payload.channel.pushAddress) } } diff --git a/Airship/AirshipDebug/Source/View/AirshipDebugView.swift b/Airship/AirshipDebug/Source/View/AirshipDebugView.swift index dc725da48..506b0f254 100644 --- a/Airship/AirshipDebug/Source/View/AirshipDebugView.swift +++ b/Airship/AirshipDebug/Source/View/AirshipDebugView.swift @@ -385,18 +385,18 @@ private class AirshipDebugViewModel: ObservableObject { private func subscribeUpdates() { NotificationCenter.default .publisher(for: AirshipNotifications.ChannelCreated.name) + .receive(on: RunLoop.main) .sink(receiveValue: { _ in self.channelID = Airship.channel.identifier }) .store(in: &self.subscriptions) Airship.push.notificationStatusPublisher - .sink(receiveValue: { status in - Task { @MainActor in - self.isPushNotificationsOptedIn = status.isOptedIn - self.deviceToken = Airship.push.deviceToken - } - }) + .receive(on: RunLoop.main) + .sink { status in + self.isPushNotificationsOptedIn = status.isOptedIn + self.deviceToken = Airship.push.deviceToken + } .store(in: &self.subscriptions) Airship.contact.namedUserIDPublisher diff --git a/Airship/AirshipMessageCenter/Source/MessageCenterList.swift b/Airship/AirshipMessageCenter/Source/MessageCenterList.swift index a5b5a597e..aa05122c3 100644 --- a/Airship/AirshipMessageCenter/Source/MessageCenterList.swift +++ b/Airship/AirshipMessageCenter/Source/MessageCenterList.swift @@ -138,81 +138,32 @@ final class MessageCenterInbox: NSObject, MessageCenterInboxProtocol, Sendable { } public var messagePublisher: AnyPublisher<[MessageCenterMessage], Never> { - let messagesSubject = CurrentValueSubject<[MessageCenterMessage]?, Never>(nil) - let messageUpdates = self.messageUpdates - - Task { [messageUpdates, weak messagesSubject] in - for await update in messageUpdates { - guard let messagesSubject else { return } - messagesSubject.send(update) - } - } - - return messagesSubject.compactMap { $0 }.eraseToAnyPublisher() + return self.messageUpdates + .airshipPublisher + .compactMap { $0 } + .eraseToAnyPublisher() } var messageUpdates: AsyncStream<[MessageCenterMessage]> { - AsyncStream<[MessageCenterMessage]> { [weak self] continuation in - let task = Task { [weak self] in - guard let stream = await self?.updateChannel.makeStream() else { - return - } - - if let messages = await self?.messages { - continuation.yield(messages) - } - - for await update in stream { - if update != .refreshFailed, let messages = await self?.messages { - continuation.yield(messages) - } - } - } - - continuation.onTermination = { _ in - task.cancel() - } + return self.updateChannel.makeNonIsolatedDedupingStream { [weak self] in + await self?.messages + } transform: { [weak self] _ in + await self?.messages } } public var unreadCountPublisher: AnyPublisher { - let unreadCountSubject = CurrentValueSubject(nil) - let unreadCountUpdates = self.unreadCountUpdates - - Task { [unreadCountUpdates, weak unreadCountSubject] in - for await update in unreadCountUpdates { - guard let unreadCountSubject else { return } - unreadCountSubject.send(update) - } - } - - return unreadCountSubject + return self.unreadCountUpdates + .airshipPublisher .compactMap { $0 } - .removeDuplicates() .eraseToAnyPublisher() } var unreadCountUpdates: AsyncStream { - AsyncStream { [weak self] continuation in - let task = Task { [weak self] in - guard let stream = await self?.updateChannel.makeStream() else { - return - } - - if let count = await self?.unreadCount { - continuation.yield(count) - } - - for await update in stream { - if update != .refreshFailed, let unreadCount = await self?.unreadCount { - continuation.yield(unreadCount) - } - } - } - - continuation.onTermination = { _ in - task.cancel() - } + return self.updateChannel.makeNonIsolatedDedupingStream { [weak self] in + await self?.unreadCount + } transform: { [weak self] _ in + await self?.unreadCount } }