Skip to content

Commit

Permalink
Implement Pause Resume Support
Browse files Browse the repository at this point in the history
  • Loading branch information
gunishjain committed Nov 15, 2024
1 parent 5ace8b2 commit b6c5fef
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 68 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/com/gunishjain/sample/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fun DownloadUI() {
startDownload(
url = "https://www.learningcontainer.com/download/sample-50-mb-pdf-file/?wpdmdl=3675&refresh=6721f942bd70b1730279746",
dirPath = downloadsDirectory,
fileName = "himnashunew.pdf"
fileName = "test.pdf"
)
}) {
Text("Download")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package com.gunishjain.grabbit.internal.download

import android.util.Log
import com.gunishjain.grabbit.internal.network.HttpClient
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.cancellation.CancellationException

class DownloadDispatcher(private val httpClient: HttpClient) {

private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
private val downloadTasks = ConcurrentHashMap<Int, DownloadTask>()
private val activeJobs = ConcurrentHashMap<Int, Job>()

private fun executeOnMain(block: () -> Unit){
scope.launch {
Expand All @@ -25,54 +29,82 @@ class DownloadDispatcher(private val httpClient: HttpClient) {
downloadTasks[req.downloadId] = downloadTask

val job = scope.launch {
execute(req)
execute(req,downloadTask)
}
activeJobs[req.downloadId] = job
req.job = job
return req.downloadId
}

private suspend fun execute(request: DownloadRequest) {

DownloadTask(request,httpClient).run (
onStart = {
executeOnMain { request.onStart() }
},
onProgress = {
executeOnMain { request.onProgress(it) }
},
onPause = {
executeOnMain { request.onPause() }
},
onCompleted = {
executeOnMain {
request.onCompleted()
private suspend fun execute(request: DownloadRequest,task: DownloadTask) {
try {
task.run(
onStart = {
executeOnMain { request.onStart() }
},
onProgress = {
executeOnMain { request.onProgress(it) }
},
onPause = {
executeOnMain { request.onPause() }
},
onCompleted = {
executeOnMain {
request.onCompleted()
}
cleanup(request.downloadId)

},
onError = {
executeOnMain { request.onError(it) }
cleanup(request.downloadId)
}
downloadTasks.remove(request.downloadId)

},
onError = {
executeOnMain { request.onError(it) }
downloadTasks.remove(request.downloadId)
}
)
)
} catch (e: CancellationException){
cleanup(request.downloadId)
throw e
}

}

fun pause(downloadId: Int) {
downloadTasks[downloadId]?.pauseDownload()
Log.d("DownloadDispatcher", "Attempting to pause download: $downloadId")
downloadTasks[downloadId]?.let { task ->
task.pauseDownload()
} ?: Log.e("DownloadDispatcher", "No download task found for id: $downloadId")
}

fun resume(downloadId: Int) {
downloadTasks[downloadId]?.resumeDownload()
Log.d("DownloadDispatcher", "Attempting to resume download: $downloadId")
downloadTasks[downloadId]?.let { task ->
task.resumeDownload()
} ?: Log.e("DownloadDispatcher", "No download task found for id: $downloadId")
}

fun cancel(req: DownloadRequest) {
req.job.cancel()
Log.d("DownloadDispatcher", "Attempting to cancel download: ${req.downloadId}")
downloadTasks[req.downloadId]?.let { task ->
task.cancelDownload()
cleanup(req.downloadId)
}
activeJobs[req.downloadId]?.cancel()
}

fun cancelAll() {
Log.d("DownloadDispatcher", "Cancelling all downloads")
downloadTasks.keys.forEach { downloadId ->
downloadTasks[downloadId]?.cancelDownload()
}
downloadTasks.clear()
activeJobs.values.forEach { it.cancel() }
activeJobs.clear()
scope.cancel()
}

private fun cleanup(downloadId: Int) {
downloadTasks.remove(downloadId)
activeJobs.remove(downloadId)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package com.gunishjain.grabbit.internal.download
import android.util.Log
import com.gunishjain.grabbit.internal.network.HttpClient
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import okhttp3.internal.notify
import okhttp3.internal.wait
import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.cancellation.CancellationException
import kotlin.coroutines.resume

class DownloadTask(private val request: DownloadRequest,private val httpClient: HttpClient) {

Expand All @@ -21,6 +18,8 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:
private var file = File(request.dirPath, request.fileName)
private var downloadedBytes = AtomicLong(0L) // Use AtomicLong for thread safety
private val pauseLock = Object()
private var eTag: String? = null // Store ETag for consistency checking
private var lastModified: String? = null

suspend fun run(
onStart: () -> Unit = {},
Expand All @@ -42,16 +41,16 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:

if (request.totalBytes <= 0) {
request.totalBytes = httpClient.getFileSize(request.url)
Log.d("Download Task",request.totalBytes.toString())
Log.d("Download Task", "Total bytes: ${request.totalBytes}")
}

while (!isCompleted) {
if (isPaused.get()) {
Log.d("DownloadTask", "Download paused.")
Log.d("DownloadTask", "Download paused at byte: ${downloadedBytes.get()}")
onPause()
waitForResume()
Log.d("DownloadTask", "Download resumed.")
continue // Important: restart the loop after resume
Log.d("DownloadTask", "Download resumed from byte: ${downloadedBytes.get()}")
continue
}

try {
Expand All @@ -60,7 +59,12 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:
url = request.url,
file = file,
startByte = downloadedBytes.get(),
timeout = request.connectTimeout
timeout = request.connectTimeout,
onHeadersReceived = { headers ->
eTag = headers["ETag"]
lastModified = headers["Last-Modified"]
true
}
) { currentBytes, totalBytes ->
downloadedBytes.set(currentBytes)

Expand All @@ -80,7 +84,7 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:

isCompleted = true
onCompleted()
Log.d("DownloadTask", "Download completed.")
Log.d("DownloadTask", "Download completed successfully")
break

} catch (e: PauseException) {
Expand Down Expand Up @@ -109,7 +113,7 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:
}

fun pauseDownload() {
Log.d("DownloadTask", "pauseDownload() called.")
Log.d("DownloadTask", "pauseDownload() called. Current bytes: ${downloadedBytes.get()}")
isPaused.set(true)
}

Expand All @@ -122,22 +126,19 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient:
}

private suspend fun waitForResume() {
suspendCancellableCoroutine<Unit> { continuation ->
synchronized(pauseLock) {
while (isPaused.get()) {
try {
Log.d("DownloadTask", "Waiting for resume.")
pauseLock.wait()
} catch (e: InterruptedException) {
continuation.resume(Unit)
return@synchronized
}
}
continuation.resume(Unit)
}
withContext(Dispatchers.IO) {
while (isPaused.get()) {
Log.d("DownloadTask", "Waiting for resume.")
delay(100) // Use delay in coroutines to suspend efficiently without blocking the thread
}
}
}

fun cancelDownload() {
Log.d("DownloadTask", "cancelDownload() called at byte: ${downloadedBytes.get()}")
isCompleted = true
}

private class PauseException : Exception()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DefaultHttpClient : HttpClient {
file: File,
startByte: Long,
timeout: Int,
onHeadersReceived: (Map<String, String>) -> Boolean,
onProgress: (Long, Long) -> Unit
) = withContext(Dispatchers.IO){

Expand All @@ -45,51 +46,52 @@ class DefaultHttpClient : HttpClient {
throw IOException("Unexpected response code: ${response.code}")
}

// Extract and pass headers to callback
val headers = mutableMapOf<String, String>()
response.headers.forEach { (name, value) ->
headers[name] = value
}

// Allow caller to validate headers
if (!onHeadersReceived(headers)) {
throw IOException("Header validation failed")
}

// Get content length from header
val contentLength = response.header("Content-Length")?.toLong() ?: -1L
val totalBytes = if (contentLength != -1L) contentLength + startByte else -1L


Log.d("Default HTTPCLIENT", "Total File Size: $totalBytes")
Log.d("DefaultHttpClient", "Resuming download from byte: $startByte")

// Create parent directories if they don't exist
file.parentFile?.mkdirs()

// Use response body to write to file
response.body?.let { body ->
file.parentFile?.mkdirs()

Log.d("HTTPCLIENT",body.toString())
val bufferedSink = file.sink(append = startByte > 0).buffer()
val source = body.source()
val buffer = Buffer()
var downloadedBytes = startByte

while (true) {
val read = source.read(buffer, 8192L) // Read chunks of 8KB
val read = source.read(buffer, 8192L)
if (read == -1L) break

bufferedSink.write(buffer, read)
downloadedBytes += read

// Report progress

// Log.d("HTTPCLIENT",downloadedBytes.toString())

onProgress(downloadedBytes, totalBytes)
}

bufferedSink.close()
source.close()
} ?: throw IOException("Response body is null")
}

} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Log.e("HTTPCLEINT", "Error occurred: ${e.message}", e)
Log.e("DefaultHttpClient", "Error occurred: ${e.message}", e)
throw IOException("Download failed: ${e.message}", e)
}

}

override suspend fun getFileSize(url: String): Long = withContext(Dispatchers.IO) {
Expand All @@ -98,8 +100,9 @@ class DefaultHttpClient : HttpClient {
.head() // Use HEAD request to get only headers
.build()

val response = okHttpClient.newCall(request).execute()
response.header("Content-Length")?.toLong() ?: -1L
okHttpClient.newCall(request).execute().use { response ->
response.header("Content-Length")?.toLong() ?: -1L
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ interface HttpClient {
file: File,
startByte: Long = 0,
timeout: Int = 30000,
onHeadersReceived: (Map<String, String>) -> Boolean = { true },
onProgress: (downloadedBytes: Long, totalBytes: Long) -> Unit = { _, _ -> }
)

Expand Down

0 comments on commit b6c5fef

Please sign in to comment.