Skip to content

Commit

Permalink
Handle IO errors (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
chromy authored Sep 18, 2024
1 parent f5dfc68 commit 929b9f2
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 34 deletions.
60 changes: 46 additions & 14 deletions reaper/reaper/src/main/kotlin/com/emergetools/reaper/ReaperImpl.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package com.emergetools.reaper

import android.system.ErrnoException
import java.io.DataOutputStream
import java.io.File
import java.io.FileOutputStream
import java.io.IOException
import java.util.UUID

// The Reaper report currently in progress:
private class Report(
val stream: FileOutputStream,
val dataStream: DataOutputStream,
val path: File
val file: File,
val id: String,
) {
// All the hashes we have written so far.
val written = mutableSetOf<Long>()
Expand All @@ -31,6 +34,7 @@ internal class ReaperImpl(
fun listCurrentReports(): Collection<String>
fun listPendingReports(): Collection<String>
fun requestUpload(apiKey: String, baseUrl: String, isDebug: Boolean)
fun sendError(message: String)
fun d(message: String)
fun e(message: String)
fun <T> trace(name: String, block: () -> T): T
Expand All @@ -56,24 +60,50 @@ internal class ReaperImpl(
delegate.e("No report to flush")
return@trace
} else {
delegate.d("Flushing report ${report.path.absolutePath}")
delegate.d("Flushing report ${report.file.absolutePath}")
}

// Hashes observed since the last flush() this will normally be a mixture of hashes we
// already saw and new hashes.
tracker.flush {
it.forEach { hash ->
if (!report.written.contains(hash)) {
report.dataStream.writeLong(hash)
report.written.add(hash)
try {
tracker.flush {
it.forEach { hash ->
if (!report.written.contains(hash)) {
report.dataStream.writeLong(hash)
report.written.add(hash)
}
}
}

// Flush the underlying file.
report.dataStream.flush()
report.stream.flush()
} catch (e: ErrnoException) {
stopReaperDueToIoError(e)
} catch (e: IOException) {
stopReaperDueToIoError(e)
}
}
}

private fun stopReaperDueToIoError(e: Exception) {
val report = this.report
this.report = null
if (report != null) {
val id = report.id
report.dataStream.close()
report.stream.close()
delegate.deleteReport(id)
}

// Flush the underlying file.
report.dataStream.flush()
report.stream.flush()
for (id in delegate.listPendingReports()) {
delegate.deleteReport(id)
}
for (id in delegate.listCurrentReports()) {
delegate.deleteReport(id)
}

delegate.sendError(e.toString())
}

/**
Expand Down Expand Up @@ -105,8 +135,8 @@ internal class ReaperImpl(
@Synchronized
fun startReport(): String? {
return delegate.trace("Reaper#startReport") {
val shortUuid = UUID.randomUUID().toString().substring(0, REPORT_SUFFIX_SIZE)
val file = delegate.startReport(shortUuid)
val id = UUID.randomUUID().toString().substring(0, REPORT_SUFFIX_SIZE)
val file = delegate.startReport(id)
if (file == null) {
delegate.e("Failed to open Reaper report for writing")
return@trace null
Expand All @@ -118,7 +148,7 @@ internal class ReaperImpl(
delegate.e("Failed to open Reaper report stream for writing $absolutePath")
return@trace null
}
report = Report(stream, DataOutputStream(stream), file)
report = Report(stream, DataOutputStream(stream), file = file, id = id)

delegate.d(
"Reaper report started. report=$absolutePath backend=$baseUrl tracker=${tracker.name}"
Expand All @@ -139,7 +169,9 @@ internal class ReaperImpl(
delegate.e("No report to finalize")
return@trace
} else {
delegate.d("Finalizing report ${report.path.absolutePath}")
report.dataStream.close()
report.stream.close()
delegate.d("Finalizing report ${report.file.absolutePath}")
}

// Move report to pending and schedule upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import android.os.Looper
import android.system.ErrnoException
import android.util.Log
import androidx.annotation.MainThread
import androidx.lifecycle.ProcessLifecycleOwner
Expand All @@ -21,6 +22,8 @@ import androidx.work.OneTimeWorkRequest
import androidx.work.WorkManager
import androidx.work.workDataOf
import java.io.File
import java.io.FileNotFoundException
import java.io.IOException

private const val DEFERRED_START_MS = 5L * 1000L
private const val FLUSH_PERIOD_MS = 60L * 1000L
Expand Down Expand Up @@ -94,7 +97,7 @@ internal object ReaperInternal {
lambda(theImpl)
}
} catch (e: Exception) {
reportError(context, e.toString())
sendError(context, e.toString())
}
}

Expand Down Expand Up @@ -150,12 +153,23 @@ internal object ReaperInternal {

private class ReaperImplDelegate(private val context: Context) : ReaperImpl.Delegate {

override fun startReport(id: String): File {
override fun startReport(id: String): File? {
ensureDirectories(context)
val name = getReportName(id)
val file = File(getCurrentDir(context), name)
file.delete()
file.createNewFile()
try {
file.delete()
file.createNewFile()
} catch (e: IOException) {
e("Failed to create report $e")
return null
} catch (e: FileNotFoundException) {
e("Failed to create report $e")
return null
} catch (e: ErrnoException) {
e("Failed to create report $e")
return null
}
return file
}

Expand All @@ -171,7 +185,7 @@ private class ReaperImplDelegate(private val context: Context) : ReaperImpl.Dele
ensureDirectories(context)
val name = getReportName(id)
val current = File(getCurrentDir(context), name)
val pending = File(getCurrentDir(context), name)
val pending = File(getPendingDir(context), name)
current.renameTo(pending)
}

Expand Down Expand Up @@ -221,6 +235,10 @@ private class ReaperImplDelegate(private val context: Context) : ReaperImpl.Dele
WorkManager.getInstance(context).enqueue(uploadWorkRequest)
}

override fun sendError(message: String) {
sendError(context, message)
}

override fun d(message: String) {
Log.d(TAG, message)
}
Expand Down Expand Up @@ -269,6 +287,7 @@ private fun fatalError(context: Context, message: String) {

@MainThread
private fun mainThreadStart(context: Context, impl: ReaperImpl) {
Log.d(TAG, "Reaper#mainThreadStart")
androidTrace("Reaper#mainThreadStart") {
val thread = HandlerThread("ReaperWorker")
thread.start()
Expand Down Expand Up @@ -299,18 +318,12 @@ private fun wrap(context: Context, lambda: () -> Unit) {
try {
lambda()
} catch (e: Exception) {
reportError(context, e.toString())
sendError(context, e.toString())
}
}

private fun workerThreadStart(context: Context, looper: Looper, impl: ReaperImpl) {
Log.d(TAG, "Reaper#workerThreadStart")
val observer = OnStopLifecycleObserver {
Handler(looper).post {
impl.finalizeReport()
}
}
ProcessLifecycleOwner.get().lifecycle.addObserver(observer)
androidTrace("Reaper#workerThreadStart") {
// If Reaper was running previously but we did not get a chance to finalize the report we may
// end up with reports 'stuck' in current which will never be finalized. Schedule those for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ReaperReportErrorWorker(
}
}

fun reportError(ctx: Context, message: String) {
fun sendError(ctx: Context, message: String) {
val metaData = ctx.packageManager.getApplicationInfo(
ctx.packageName,
PackageManager.GET_META_DATA
Expand All @@ -138,7 +138,7 @@ fun reportError(ctx: Context, message: String) {
val baseUrl = getBaseUrl(metaData)
val apiKey = getApiKey(metaData)

Log.e(TAG, "ReaperError: backend`=$baseUrl message=$message")
Log.e(TAG, "ReaperError: backend=$baseUrl message=$message")
val data = workDataOf(
ReaperReportErrorWorker.EXTRA_API_KEY to apiKey,
ReaperReportErrorWorker.EXTRA_BASE_URL to baseUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ internal class ReaperReportUploadWorker(
"Upload failed, backend or apiKey unknown. backend=$baseUrl apiKey=$apiKey"
)
} else {
reportError(applicationContext, response.toString())
sendError(applicationContext, response.toString())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ private class FakeDelegate : ReaperImpl.Delegate {
// Empty
}

override fun sendError(message: String) {
// Empty
}

override fun d(message: String) {
// Empty
}
Expand All @@ -55,7 +59,6 @@ private class FakeDelegate : ReaperImpl.Delegate {
class ReaperImplTest {
private lateinit var tracker: HashTracker
private lateinit var delegate: FakeDelegate
private lateinit var impl: ReaperImpl

@BeforeEach
fun init() {
Expand All @@ -69,14 +72,14 @@ class ReaperImplTest {
delegate.startReport("$i")
delegate.markReportPending("$i")
}
impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
val impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
impl.sweepReports()
assertThat(delegate.listPendingReports()).isEmpty()
}

@Test
fun `uploads reports containing hashes`() {
impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
val impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")

impl.startReport()
tracker.logMethodEntry(-1L)
Expand All @@ -101,7 +104,7 @@ class ReaperImplTest {
@Test
fun `reports hashes prior to construction in the first report`() {
tracker.logMethodEntry(0x0102030405060708)
impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
val impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
impl.startReport()
impl.finalizeReport()

Expand All @@ -125,7 +128,7 @@ class ReaperImplTest {
fun `sweeping finalizes pre-existing reports`() {
delegate.startReport("previous1")
delegate.startReport("previous2")
impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
val impl = ReaperImpl(tracker = tracker, delegate = delegate, apiKey = "<apikey>")
impl.sweepReports()

assertThat(delegate.listPendingReports()).containsExactly("previous1", "previous2")
Expand Down

0 comments on commit 929b9f2

Please sign in to comment.