Skip to content

Commit

Permalink
VIT-7076: Buffer, dedup and batch execute background delivery callouts
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Aug 2, 2024
1 parent 4b85938 commit 4260f32
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ public enum SyncContextTag: Int, Codable {
struct BackgroundDeliveryPayload: CustomStringConvertible {
let resources: [RemappedVitalResource]
let completion: (Completion) -> Void
let tags: Set<SyncContextTag>

var description: String {
"\(resources.map(\.wrapped.logDescription).joined(separator: ",")) \(tags.map(String.init(describing:)).joined(separator: ","))"
"\(resources.map(\.wrapped.logDescription).joined(separator: ",")))"
}

enum Completion {
Expand Down
100 changes: 70 additions & 30 deletions Sources/VitalHealthKit/HealthKit/VitalHealthKitClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public extension VitalHealthKitClient {
let task: TaskHandle?
let resources: Set<RemappedVitalResource>
let objectTypes: Set<HKObjectType>
let streamContinuation: AsyncStream<BackgroundDeliveryPayload>.Continuation
let streamContinuation: AsyncStream<BackgroundDeliveryStage>.Continuation

func cancel() {
streamContinuation.finish()
Expand Down Expand Up @@ -273,8 +273,8 @@ extension VitalHealthKitClient {
/// Submit BGProcessingTasks
scope.task { await self.submitProcessingTasks() }

let stream: AsyncStream<BackgroundDeliveryPayload>
let streamContinuation: AsyncStream<BackgroundDeliveryPayload>.Continuation
let stream: AsyncStream<BackgroundDeliveryStage>
let streamContinuation: AsyncStream<BackgroundDeliveryStage>.Continuation

if #available(iOS 15.0, *) {
(stream, streamContinuation) = bundledBackgroundObservers(for: bundles)
Expand All @@ -283,14 +283,40 @@ extension VitalHealthKitClient {
}

let payloadListener = scope.task(priority: .userInitiated) {
for await payload in stream {
if Task.isCancelled {
payload.completion(.completed)
continue
}
var bufferedPayloads: [BackgroundDeliveryPayload] = []
var timer: Task<Void, Never>?

defer {
// If there is any accidential leftover payload, make the completion callback anyway.
bufferedPayloads.forEach { $0.completion(.completed) }
timer?.cancel()
}

for await stage in stream {
switch stage {
case let .received(payload):
if Task.isCancelled {
payload.completion(.completed)
continue
}

bufferedPayloads.append(payload)
VitalLogger.healthKit.info("buffered: \(payload)", source: "BgDelivery")

if timer == nil {
timer = Task(priority: .high) {
// Throttle by 16ms
// Catch as many parallel HealthKit observer callouts as possible.
try? await Task.sleep(nanoseconds: NSEC_PER_MSEC * 16)
streamContinuation.yield(.evaluate)
}
}

case .evaluate:
timer = nil
let payloads = bufferedPayloads
bufferedPayloads = []

// Allow multiple resource sync to run concurrently.
self.scope.task(priority: .userInitiated) {
// Task is not cancelled — we must call the HealthKit completion handler irrespective of
// the sync process outcome. This is to avoid triggering the "strike on 3rd missed delivery"
// rule of HealthKit background delivery.
Expand All @@ -300,19 +326,30 @@ extension VitalHealthKitClient {
// behaviour adds little to no value in maintaining data freshness.
//
// (except for the task cancellation redelivery expectation stated above).
defer { payload.completion(.completed) }
defer { payloads.forEach { $0.completion(.completed) } }

VitalLogger.healthKit.info("received: \(payload)", source: "BgDelivery")
let prioritizedResources = Set(payloads.flatMap(\.resources))
.sorted(by: { $0.wrapped.priority < $1.wrapped.priority })
let syncsConcurrently = AppStateTracker.shared.state.status == .foreground

await withTaskGroup(of: Void.self) { group in
for resource in payload.resources {
group.addTask {
await self.sync(resource, payload.tags)
VitalLogger.healthKit.info(
"dequeued: \(prioritizedResources); will sync \(syncsConcurrently ? "concurrent" : "serially")",
source: "BgDelivery"
)

if syncsConcurrently {
await withTaskGroup(of: Void.self) { group in
for resource in prioritizedResources {
group.addTask {
await self.sync(resource, [.healthKit])
}
}
}
} else {
for resource in prioritizedResources {
await self.sync(resource, [.healthKit])
}
}
} onCancel: {
payload.completion(.completed)
}
}
}
Expand Down Expand Up @@ -450,11 +487,11 @@ extension VitalHealthKitClient {
@available(iOS 15.0, *)
private func bundledBackgroundObservers(
for typesBundle: Set<[HKSampleType]>
) -> (AsyncStream<BackgroundDeliveryPayload>, AsyncStream<BackgroundDeliveryPayload>.Continuation) {
) -> (AsyncStream<BackgroundDeliveryStage>, AsyncStream<BackgroundDeliveryStage>.Continuation) {

var _continuation: AsyncStream<BackgroundDeliveryPayload>.Continuation!
var _continuation: AsyncStream<BackgroundDeliveryStage>.Continuation!

let stream = AsyncStream<BackgroundDeliveryPayload> { continuation in
let stream = AsyncStream<BackgroundDeliveryStage> { continuation in
_continuation = continuation

var queries: [HKObserverQuery] = []
Expand Down Expand Up @@ -498,12 +535,11 @@ extension VitalHealthKitClient {
if completion == .completed {
handler()
}
},
tags: [.healthKit]
}
)
VitalLogger.healthKit.info("notified: \(payload)", source: "HealthKit")

continuation.yield(payload)
continuation.yield(.received(payload))

SyncProgressStore.shared.recordSystem(
remapped,
Expand Down Expand Up @@ -531,10 +567,10 @@ extension VitalHealthKitClient {

private func backgroundObservers(
for sampleTypes: some Sequence<HKSampleType>
) -> (AsyncStream<BackgroundDeliveryPayload>, AsyncStream<BackgroundDeliveryPayload>.Continuation) {
var _continuation: AsyncStream<BackgroundDeliveryPayload>.Continuation!
) -> (AsyncStream<BackgroundDeliveryStage>, AsyncStream<BackgroundDeliveryStage>.Continuation) {
var _continuation: AsyncStream<BackgroundDeliveryStage>.Continuation!

let stream = AsyncStream<BackgroundDeliveryPayload> { continuation in
let stream = AsyncStream<BackgroundDeliveryStage> { continuation in
_continuation = continuation

var queries: [HKObserverQuery] = []
Expand All @@ -559,12 +595,11 @@ extension VitalHealthKitClient {
if completion == .completed {
handler()
}
},
tags: [.healthKit]
}
)
VitalLogger.healthKit.info("notified: \(payload)", source: "HealthKit")

continuation.yield(payload)
continuation.yield(.received(payload))

SyncProgressStore.shared.recordSystem(
remapped,
Expand Down Expand Up @@ -1120,6 +1155,11 @@ extension VitalHealthKitClient {
}
}

enum BackgroundDeliveryStage {
case received(BackgroundDeliveryPayload)
case evaluate
}

enum PipelineStage {
case read(uncommittedAnchors: [StoredAnchor] = [])
case upload(ProcessedResourceData?, [StoredAnchor], hasMore: Bool)
Expand Down
4 changes: 3 additions & 1 deletion Sources/VitalHealthKit/UI/ForEachVitalResource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ public struct ForEachVitalResource: View {
HStack {
if let sync = resource.latestSync {
icon(for: sync.lastStatus)
.frame(width: 22, height: 22)
}

VStack(alignment: .leading) {
Text("\(key.rawValue)")
if let tags = resource.latestSync?.tags {
Text(verbatim: "\(tags.map(String.init(describing:)).joined(separator: ", "))")
Text(verbatim: "\(tags.map(String.init(describing:)).sorted().joined(separator: ", "))")
.foregroundColor(Color.secondary)
.font(Font.subheadline)
}
Expand Down Expand Up @@ -201,6 +202,7 @@ private struct ResourceProgressDetailView: View {
} label: {
HStack(alignment: .center) {
icon(for: sync.lastStatus)
.frame(width: 22, height: 22)

VStack(alignment: .leading) {
if sync.lastStatus.isInProgress {
Expand Down

0 comments on commit 4260f32

Please sign in to comment.