Skip to content

Commit

Permalink
Rework channel to combine (#3296)
Browse files Browse the repository at this point in the history
* Rework channel to combine

* Small things

* Update AirshipAsyncChannel.swift

* Update AirshipAsyncChannel.swift

---------

Co-authored-by: David <[email protected]>
  • Loading branch information
rlepinski and crow authored Dec 4, 2024
1 parent bb04510 commit 6b44bce
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 140 deletions.
150 changes: 148 additions & 2 deletions Airship/AirshipCore/Source/AirshipAsyncChannel.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/* Copyright Airship and Contributors */

import Foundation
import Combine

/// Simple implemenation of a `channel` that allows multiple AsyncStream of the same data.
/// Simple implementation of a `channel` that allows multiple AsyncStreams of the same data.
/// - Note: for internal use only. :nodoc:
public actor AirshipAsyncChannel<T: Sendable> {

public enum BufferPolicy: Sendable {
case unbounded
case bufferingNewest(Int)
Expand Down Expand Up @@ -63,3 +63,149 @@ public actor AirshipAsyncChannel<T: Sendable> {
}
}
}

public extension AirshipAsyncChannel {

/// Makes a stream that is nonisolated from the actor by wrapping the
/// actor stream in an async stream.
/// - Parameters:
/// - bufferPolicy: The buffer policy.
/// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream.
/// - transform: Transforms the channel values to the stream value. If nil, it a value is mapped to nil it will finish the stream.
/// - Returns: An AsyncStream.
nonisolated func makeNonIsolatedStream<R: Sendable>(
bufferPolicy: BufferPolicy = .unbounded,
initialValue: (@Sendable () async -> R?)? = nil,
transform: @escaping @Sendable (T) async -> R?
) -> AsyncStream<R> {
return AsyncStream<R> { [weak self] continuation in
let task = Task { [weak self] in
guard let stream = await self?.makeStream() else {
return
}

if let initialValue {
guard let value = await initialValue() else {
continuation.finish()
return
}

continuation.yield(value)
}

for await update in stream.map(transform) {
if let update {
continuation.yield(update)
} else {
continuation.finish()
}
}
continuation.finish()
}

continuation.onTermination = { _ in
task.cancel()
}
}
}

/// Makes a stream that is nonisolated from the actor by wrapping the
/// actor stream in an async stream.
/// - Parameters:
/// - bufferPolicy: The buffer policy.
/// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream.
/// - transform: Transforms the channel values to the stream value. If nil, it a value is mapped to nil it will finish the stream.
/// - Returns: An AsyncStream.
nonisolated func makeNonIsolatedStream(
bufferPolicy: BufferPolicy = .unbounded,
initialValue: (@Sendable () async -> T)? = nil
) -> AsyncStream<T> {
return makeNonIsolatedStream(
bufferPolicy: bufferPolicy,
initialValue: initialValue,
transform: { $0 }
)
}

/// Makes a stream that is nonisolated from the actor by wrapping the
/// actor stream in an async stream. Values will only be emitted if they are different than the previous value.
/// - Parameters:
/// - bufferPolicy: The buffer policy.
/// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream.
/// - Returns: An AsyncStream.
nonisolated func makeNonIsolatedDedupingStream<R: Sendable&Equatable>(
bufferPolicy: BufferPolicy = .unbounded,
initialValue: (@Sendable () async -> R?)? = nil,
transform: @escaping @Sendable (T) async -> R?
) -> AsyncStream<R> {
return AsyncStream<R> { [weak self] continuation in
let task = Task { [weak self] in
guard let stream = await self?.makeStream() else {
return
}

var last: R? = nil

if let initialValue {
guard let value = await initialValue() else {
continuation.finish()
return
}

continuation.yield(value)
last = value
}

for await update in stream.map(transform) {
guard let update else {
continuation.finish()
return
}

if update != last {
continuation.yield(update)
last = update
}
}
}

continuation.onTermination = { _ in
task.cancel()
}
}
}

/// Makes a stream that is nonisolated from the actor by wrapping the
/// actor stream in an async stream. Values will only be emitted if they are different than the previous value.
/// - Parameters:
/// - bufferPolicy: The buffer policy.
/// - initialValue: Optional initial value closure. If provided and the value is nil, it will finish the stream.
/// - Returns: An AsyncStream.
nonisolated func makeNonIsolatedDedupingStream(
bufferPolicy: BufferPolicy = .unbounded,
initialValue: (@Sendable () async -> T?)? = nil
) -> AsyncStream<T> where T: Equatable {
return makeNonIsolatedDedupingStream(
bufferPolicy: bufferPolicy,
initialValue: initialValue,
transform: { $0 }
)
}
}

public extension AsyncStream where Element : Sendable {

/// Creates a combine publisher from an AsyncStream.
/// - Note: for internal use only. :nodoc:
var airshipPublisher: AnyPublisher<Element?, Never>{
let subject = CurrentValueSubject<Element?, Never>(nil)
Task { [weak subject] in
for await update in self {
guard let subject else { return }
subject.send(update)
}
}
return subject.eraseToAnyPublisher()
}

}
104 changes: 35 additions & 69 deletions Airship/AirshipCore/Source/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,32 @@ import UIKit
/// This singleton provides an interface to the functionality provided by the Airship iOS Push API.
final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable {

private let pushTokenSubject = PassthroughSubject<String?, Never>()
private var pushTokenPublisher: AnyPublisher<String?, Never> {
self.pushTokenSubject
.prepend(Future { promise in
Task {
return await promise(.success(self.deviceToken))
}
})
.eraseToAnyPublisher()
}
private let pushTokenChannel = AirshipAsyncChannel<String>()

private let notificationStatusChannel = AirshipAsyncChannel<AirshipNotificationStatus>()

private let notificationStatusSubject = PassthroughSubject<AirshipNotificationStatus, Never>()
public var notificationStatusPublisher: AnyPublisher<AirshipNotificationStatus, Never> {
notificationStatusSubject
.prepend(Future { promise in
Task {
return await promise(.success(self.notificationStatus))
}
})
.removeDuplicates()
return notificationStatusUpdates
.airshipPublisher
.compactMap { $0 }
.eraseToAnyPublisher()
}

var notificationStatusUpdates: AsyncStream<AirshipNotificationStatus> {
let publisher = self.notificationStatusPublisher
return AsyncStream { continuation in
let cancellable = publisher
.removeDuplicates()
.sink { update in
continuation.yield(update)
}

continuation.onTermination = { _ in
cancellable.cancel()
}
}
return self.notificationStatusChannel.makeNonIsolatedDedupingStream(
initialValue: { [weak self] in await self?.notificationStatus }
)
}


private static let pushNotificationsOptionsKey =
"UAUserPushNotificationsOptions"
private static let userPushNotificationsEnabledKey =
"UAUserPushNotificationsEnabled"
private static let backgroundPushNotificationsEnabledKey =
"UABackgroundPushNotificationsEnabled"
private static let requestExplicitPermissionWhenEphemeralKey =
"UAExtendedPushNotificationPermissionEnabled"


private static let pushNotificationsOptionsKey = "UAUserPushNotificationsOptions"
private static let userPushNotificationsEnabledKey = "UAUserPushNotificationsEnabled"
private static let backgroundPushNotificationsEnabledKey = "UABackgroundPushNotificationsEnabled"
private static let requestExplicitPermissionWhenEphemeralKey = "UAExtendedPushNotificationPermissionEnabled"
private static let badgeSettingsKey = "UAPushBadge"
private static let deviceTokenKey = "UADeviceToken"
private static let quietTimeSettingsKey = "UAPushQuietTime"
private static let quietTimeEnabledSettingsKey = "UAPushQuietTimeEnabled"
private static let timeZoneSettingsKey = "UAPushTimeZone"

private static let typesAuthorizedKey = "UAPushTypesAuthorized"
private static let authorizationStatusKey = "UAPushAuthorizationStatus"
private static let userPromptedForNotificationsKey = "UAPushUserPromptedForNotifications"
Expand Down Expand Up @@ -323,7 +294,6 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable {
@MainActor
public private(set) var deviceToken: String? {
set {

guard let deviceToken = newValue else {
self.dataStore.removeObject(forKey: AirshipPush.deviceTokenKey)
self.updateNotificationStatus()
Expand Down Expand Up @@ -357,6 +327,9 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable {
forKey: AirshipPush.deviceTokenKey
)
AirshipLogger.importantInfo("Device token: \(deviceToken)")
Task {
await self.pushTokenChannel.send(deviceToken)
}
} catch {
AirshipLogger.error("Unable to set device token")
}
Expand Down Expand Up @@ -596,39 +569,34 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable {
private func waitForDeviceTokenRegistration() async {
guard self.waitForDeviceToken,
self.privacyManager.isEnabled(.push),
self.deviceToken == nil,
self.apnsRegistrar.isRegisteredForRemoteNotifications
else {
return
}

self.waitForDeviceToken = false

var subscription: AnyCancellable?
defer {
subscription?.cancel()
let updates = await pushTokenChannel.makeStream()
guard self.deviceToken == nil else {
return
}

await withCheckedContinuation { continuation in
let cancelTask = Task { @MainActor in
try await Task.sleep(
nanoseconds: UInt64(AirshipPush.deviceTokenRegistrationWaitTime * 1_000_000_000)
)
subscription?.cancel()
try Task.checkCancellation()
continuation.resume()
let waitTask = Task {
for await _ in updates {
return
}
}

subscription = self.pushTokenPublisher
.receive(on: RunLoop.main)
.sink { token in
if (token != nil) {
continuation.resume()
cancelTask.cancel()
subscription?.cancel()
}
}
let cancelTask = Task { @MainActor in
try await Task.sleep(
nanoseconds: UInt64(AirshipPush.deviceTokenRegistrationWaitTime * 1_000_000_000)
)
try Task.checkCancellation()
waitTask.cancel()
}

await waitTask.value
cancelTask.cancel()
}

@objc
Expand Down Expand Up @@ -715,8 +683,8 @@ final class AirshipPush: NSObject, AirshipPushProtocol, @unchecked Sendable {
}

private func updateNotificationStatus() {
Task { @MainActor in
self.notificationStatusSubject.send(await self.notificationStatus)
Task {
await self.notificationStatusChannel.send(await self.notificationStatus)
}
}

Expand Down Expand Up @@ -1317,7 +1285,7 @@ public struct QuietTimeSettings: Sendable, Equatable {
/// End minute
public let endMinute: UInt


var startString: String {
return "\(String(format: "%02d", startHour)):\(String(format: "%02d", startMinute))"
}
Expand Down Expand Up @@ -1372,6 +1340,4 @@ public struct QuietTimeSettings: Sendable, Equatable {
self.endHour = endParts[0]
self.endMinute = endParts[1]
}


}
Loading

0 comments on commit 6b44bce

Please sign in to comment.