Skip to content

Commit

Permalink
VIT-6048: Fix incremental sync not progressing
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Mar 22, 2024
1 parent e12a0a3 commit f93fe1c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 23 deletions.
4 changes: 2 additions & 2 deletions VitalClient/src/main/java/io/tryvital/client/utils/logger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package io.tryvital.client.utils

import android.util.Log

private const val VITAL_LOGGER = "vital-logger"
const val VITAL_LOGGER = "vital-logger"

class VitalLogger private constructor() {
@Volatile
var enabled: Boolean = false

fun info(message: () -> String) {
inline fun info(crossinline message: () -> String) {
if (enabled) {
Log.i(VITAL_LOGGER, message())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import androidx.health.connect.client.records.Vo2MaxRecord
import androidx.health.connect.client.records.WeightRecord
import io.tryvital.client.services.data.IngestibleTimeseriesResource
import io.tryvital.client.services.data.SampleType
import io.tryvital.client.utils.VitalLogger
import io.tryvital.vitalhealthconnect.SupportedSleepApps
import io.tryvital.vitalhealthconnect.ext.toDate
import io.tryvital.vitalhealthconnect.model.HCQuantitySample
Expand Down Expand Up @@ -485,6 +486,7 @@ internal class HealthConnectRecordProcessor(

// between is start-inclusive, end-exclusive
val numberOfDays = ChronoUnit.DAYS.between(startDate, endDate.plusDays(1)).toInt()
VitalLogger.getOrCreate().info { "activity: summarizing $numberOfDays ($startDate ... $endDate)" }

val summaryAggregators = Array(numberOfDays) { offset ->
async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ internal data class ResourceSyncWorkerInput(
@JsonClass(generateAdapter = false)
internal sealed class ResourceSyncState {
@JsonClass(generateAdapter = true)
data class Historical(val start: Date, val end: Date) : ResourceSyncState()
data class Historical(val start: Date, val end: Date) : ResourceSyncState() {
override fun toString(): String = "historical(${start.toInstant()} ..< ${end.toInstant()})"
}

@JsonClass(generateAdapter = true)
data class Incremental(val changesToken: String, val lastSync: Date) : ResourceSyncState()
data class Incremental(val changesToken: String, val lastSync: Date) : ResourceSyncState() {
override fun toString(): String = "incremental($changesToken at ${lastSync.toInstant()})"
}

companion object {
val adapterFactory: PolymorphicJsonAdapterFactory<ResourceSyncState>
Expand Down Expand Up @@ -148,7 +153,7 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
val state = sharedPreferences.getJson(input.resource.syncStateKey)
?: initialState(timeZone)

vitalLogger.logI("ResourceSyncWorker: starting for ${input.resource}; state = $state")
vitalLogger.logI("${input.resource}: begin at $state")

when (state) {
is ResourceSyncState.Historical -> historicalBackfill(state, timeZone)
Expand Down Expand Up @@ -185,6 +190,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
// we want to monitor, probably due to permission changes.
// Treat this as if the changesToken has expired.
if (recordTypesToMonitor != monitoringTypes) {
vitalLogger.info { "${input.resource}: types to monitor have changed" }

return genericBackfill(
stage = DataStage.Daily,
start = state.lastSync.toInstant(),
Expand All @@ -200,7 +207,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
changes = client.getChanges(token)

if (changes.changesTokenExpired) {
vitalLogger.info { "incremental: changesToken expired; " }
vitalLogger.info { "${input.resource}: changesToken expired" }

return genericBackfill(
stage = DataStage.Daily,
start = state.lastSync.toInstant(),
Expand All @@ -209,28 +217,34 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
)
}

val delta = processChangesResponse(
resource = input.resource,
responses = changes,
timeZone = timeZone,
currentDevice = Build.MODEL,
reader = recordReader,
processor = recordProcessor,
)
if (changes.changes.isNotEmpty()) {
vitalLogger.info { "${input.resource}: found ${changes.changes.count()} changes" }

uploadResources(
delta,
uploader = recordUploader,
stage = DataStage.Daily,
timeZoneId = timeZone.id,
userId = userId,
)
val delta = processChangesResponse(
resource = input.resource,
responses = changes,
timeZone = timeZone,
currentDevice = Build.MODEL,
reader = recordReader,
processor = recordProcessor,
)

uploadResources(
delta,
uploader = recordUploader,
stage = DataStage.Daily,
timeZoneId = timeZone.id,
userId = userId,
)
} else {
vitalLogger.info { "${input.resource}: found no change" }
}

// Since we have successfully uploaded this batch of changes,
// save the next change token, in case we get rate limited on the
// next `getChanges(token)` call.
setIncremental(token = state.changesToken)
token = state.changesToken
setIncremental(token = changes.nextChangesToken)
token = changes.nextChangesToken

} while (changes.hasMore)
}
Expand All @@ -255,6 +269,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
DataStage.Daily -> Pair(null, null)
}

vitalLogger.info { "${input.resource}: generic backfill $start ..< $end" }

// TODO: Chunk by days
val allData = mutableListOf<ProcessedResourceData>()
allData += readResourceByTimeRange(
Expand All @@ -273,6 +289,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
changes = client.getChanges(token)
check(!changes.changesTokenExpired)

vitalLogger.info { "${input.resource}: found ${changes.changes.count()} new changes after range request" }

token = changes.nextChangesToken

allData += processChangesResponse(
Expand Down Expand Up @@ -324,6 +342,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
sharedPreferences.edit()
.putJson<ResourceSyncState>(input.resource.syncStateKey, newState)
.apply()

vitalLogger.info { "${input.resource}: updated to $newState" }
}
}

Expand Down

0 comments on commit f93fe1c

Please sign in to comment.