Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VIT-6048: Fix incremental sync not progressing #104

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions VitalClient/src/main/java/io/tryvital/client/utils/logger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package io.tryvital.client.utils

import android.util.Log

private const val VITAL_LOGGER = "vital-logger"
const val VITAL_LOGGER = "vital-logger"

class VitalLogger private constructor() {
@Volatile
var enabled: Boolean = false

fun info(message: () -> String) {
inline fun info(crossinline message: () -> String) {
if (enabled) {
Log.i(VITAL_LOGGER, message())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import androidx.health.connect.client.records.Vo2MaxRecord
import androidx.health.connect.client.records.WeightRecord
import io.tryvital.client.services.data.IngestibleTimeseriesResource
import io.tryvital.client.services.data.SampleType
import io.tryvital.client.utils.VitalLogger
import io.tryvital.vitalhealthconnect.SupportedSleepApps
import io.tryvital.vitalhealthconnect.ext.toDate
import io.tryvital.vitalhealthconnect.model.HCQuantitySample
Expand Down Expand Up @@ -485,6 +486,7 @@ internal class HealthConnectRecordProcessor(

// between is start-inclusive, end-exclusive
val numberOfDays = ChronoUnit.DAYS.between(startDate, endDate.plusDays(1)).toInt()
VitalLogger.getOrCreate().info { "activity: summarizing $numberOfDays ($startDate ... $endDate)" }

val summaryAggregators = Array(numberOfDays) { offset ->
async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ internal data class ResourceSyncWorkerInput(
@JsonClass(generateAdapter = false)
internal sealed class ResourceSyncState {
@JsonClass(generateAdapter = true)
data class Historical(val start: Date, val end: Date) : ResourceSyncState()
data class Historical(val start: Date, val end: Date) : ResourceSyncState() {
override fun toString(): String = "historical(${start.toInstant()} ..< ${end.toInstant()})"
}

@JsonClass(generateAdapter = true)
data class Incremental(val changesToken: String, val lastSync: Date) : ResourceSyncState()
data class Incremental(val changesToken: String, val lastSync: Date) : ResourceSyncState() {
override fun toString(): String = "incremental($changesToken at ${lastSync.toInstant()})"
}

companion object {
val adapterFactory: PolymorphicJsonAdapterFactory<ResourceSyncState>
Expand Down Expand Up @@ -148,7 +153,7 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
val state = sharedPreferences.getJson(input.resource.syncStateKey)
?: initialState(timeZone)

vitalLogger.logI("ResourceSyncWorker: starting for ${input.resource}; state = $state")
vitalLogger.logI("${input.resource}: begin at $state")

when (state) {
is ResourceSyncState.Historical -> historicalBackfill(state, timeZone)
Expand Down Expand Up @@ -185,6 +190,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
// we want to monitor, probably due to permission changes.
// Treat this as if the changesToken has expired.
if (recordTypesToMonitor != monitoringTypes) {
vitalLogger.info { "${input.resource}: types to monitor have changed" }

return genericBackfill(
stage = DataStage.Daily,
start = state.lastSync.toInstant(),
Expand All @@ -200,7 +207,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
changes = client.getChanges(token)

if (changes.changesTokenExpired) {
vitalLogger.info { "incremental: changesToken expired; " }
vitalLogger.info { "${input.resource}: changesToken expired" }

return genericBackfill(
stage = DataStage.Daily,
start = state.lastSync.toInstant(),
Expand All @@ -209,28 +217,34 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
)
}

val delta = processChangesResponse(
resource = input.resource,
responses = changes,
timeZone = timeZone,
currentDevice = Build.MODEL,
reader = recordReader,
processor = recordProcessor,
)
if (changes.changes.isNotEmpty()) {
vitalLogger.info { "${input.resource}: found ${changes.changes.count()} changes" }

uploadResources(
delta,
uploader = recordUploader,
stage = DataStage.Daily,
timeZoneId = timeZone.id,
userId = userId,
)
val delta = processChangesResponse(
resource = input.resource,
responses = changes,
timeZone = timeZone,
currentDevice = Build.MODEL,
reader = recordReader,
processor = recordProcessor,
)

uploadResources(
delta,
uploader = recordUploader,
stage = DataStage.Daily,
timeZoneId = timeZone.id,
userId = userId,
)
} else {
vitalLogger.info { "${input.resource}: found no change" }
}

// Since we have successfully uploaded this batch of changes,
// save the next change token, in case we get rate limited on the
// next `getChanges(token)` call.
setIncremental(token = state.changesToken)
token = state.changesToken
setIncremental(token = changes.nextChangesToken)
token = changes.nextChangesToken

} while (changes.hasMore)
}
Expand All @@ -255,6 +269,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
DataStage.Daily -> Pair(null, null)
}

vitalLogger.info { "${input.resource}: generic backfill $start ..< $end" }

// TODO: Chunk by days
val allData = mutableListOf<ProcessedResourceData>()
allData += readResourceByTimeRange(
Expand All @@ -273,6 +289,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
changes = client.getChanges(token)
check(!changes.changesTokenExpired)

vitalLogger.info { "${input.resource}: found ${changes.changes.count()} new changes after range request" }

token = changes.nextChangesToken

allData += processChangesResponse(
Expand Down Expand Up @@ -324,6 +342,8 @@ internal class ResourceSyncWorker(appContext: Context, workerParams: WorkerParam
sharedPreferences.edit()
.putJson<ResourceSyncState>(input.resource.syncStateKey, newState)
.apply()

vitalLogger.info { "${input.resource}: updated to $newState" }
}
}

Expand Down
Loading