From b6c5fefcd545430dca9d3a0781bf8b01597c117a Mon Sep 17 00:00:00 2001 From: gunishjain Date: Fri, 15 Nov 2024 15:48:37 +0530 Subject: [PATCH] Implement Pause Resume Support --- .../com/gunishjain/sample/MainActivity.kt | 2 +- .../internal/download/DownloadDispatcher.kt | 86 +++++++++++++------ .../grabbit/internal/download/DownloadTask.kt | 49 +++++------ .../internal/network/DefaultHttpClient.kt | 35 ++++---- .../grabbit/internal/network/HttpClient.kt | 1 + 5 files changed, 105 insertions(+), 68 deletions(-) diff --git a/app/src/main/java/com/gunishjain/sample/MainActivity.kt b/app/src/main/java/com/gunishjain/sample/MainActivity.kt index a4330d2..994c22b 100644 --- a/app/src/main/java/com/gunishjain/sample/MainActivity.kt +++ b/app/src/main/java/com/gunishjain/sample/MainActivity.kt @@ -148,7 +148,7 @@ fun DownloadUI() { startDownload( url = "https://www.learningcontainer.com/download/sample-50-mb-pdf-file/?wpdmdl=3675&refresh=6721f942bd70b1730279746", dirPath = downloadsDirectory, - fileName = "himnashunew.pdf" + fileName = "test.pdf" ) }) { Text("Download") diff --git a/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadDispatcher.kt b/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadDispatcher.kt index 840c823..2040106 100644 --- a/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadDispatcher.kt +++ b/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadDispatcher.kt @@ -1,17 +1,21 @@ package com.gunishjain.grabbit.internal.download +import android.util.Log import com.gunishjain.grabbit.internal.network.HttpClient import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.cancellation.CancellationException class DownloadDispatcher(private val httpClient: HttpClient) { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main) private val downloadTasks = ConcurrentHashMap() + private val activeJobs = ConcurrentHashMap() private fun executeOnMain(block: () -> Unit){ scope.launch { @@ -25,54 +29,82 @@ class DownloadDispatcher(private val httpClient: HttpClient) { downloadTasks[req.downloadId] = downloadTask val job = scope.launch { - execute(req) + execute(req,downloadTask) } + activeJobs[req.downloadId] = job req.job = job return req.downloadId } - private suspend fun execute(request: DownloadRequest) { - - DownloadTask(request,httpClient).run ( - onStart = { - executeOnMain { request.onStart() } - }, - onProgress = { - executeOnMain { request.onProgress(it) } - }, - onPause = { - executeOnMain { request.onPause() } - }, - onCompleted = { - executeOnMain { - request.onCompleted() + private suspend fun execute(request: DownloadRequest,task: DownloadTask) { + try { + task.run( + onStart = { + executeOnMain { request.onStart() } + }, + onProgress = { + executeOnMain { request.onProgress(it) } + }, + onPause = { + executeOnMain { request.onPause() } + }, + onCompleted = { + executeOnMain { + request.onCompleted() + } + cleanup(request.downloadId) + + }, + onError = { + executeOnMain { request.onError(it) } + cleanup(request.downloadId) } - downloadTasks.remove(request.downloadId) - - }, - onError = { - executeOnMain { request.onError(it) } - downloadTasks.remove(request.downloadId) - } - ) + ) + } catch (e: CancellationException){ + cleanup(request.downloadId) + throw e + } } fun pause(downloadId: Int) { - downloadTasks[downloadId]?.pauseDownload() + Log.d("DownloadDispatcher", "Attempting to pause download: $downloadId") + downloadTasks[downloadId]?.let { task -> + task.pauseDownload() + } ?: Log.e("DownloadDispatcher", "No download task found for id: $downloadId") } fun resume(downloadId: Int) { - downloadTasks[downloadId]?.resumeDownload() + Log.d("DownloadDispatcher", "Attempting to resume download: $downloadId") + downloadTasks[downloadId]?.let { task -> + task.resumeDownload() + } ?: Log.e("DownloadDispatcher", "No download task found for id: $downloadId") } fun cancel(req: DownloadRequest) { - req.job.cancel() + Log.d("DownloadDispatcher", "Attempting to cancel download: ${req.downloadId}") + downloadTasks[req.downloadId]?.let { task -> + task.cancelDownload() + cleanup(req.downloadId) + } + activeJobs[req.downloadId]?.cancel() } fun cancelAll() { + Log.d("DownloadDispatcher", "Cancelling all downloads") + downloadTasks.keys.forEach { downloadId -> + downloadTasks[downloadId]?.cancelDownload() + } + downloadTasks.clear() + activeJobs.values.forEach { it.cancel() } + activeJobs.clear() scope.cancel() } + private fun cleanup(downloadId: Int) { + downloadTasks.remove(downloadId) + activeJobs.remove(downloadId) + } + } \ No newline at end of file diff --git a/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadTask.kt b/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadTask.kt index 5c18b84..3b04091 100644 --- a/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadTask.kt +++ b/grabbit/src/main/java/com/gunishjain/grabbit/internal/download/DownloadTask.kt @@ -3,15 +3,12 @@ package com.gunishjain.grabbit.internal.download import android.util.Log import com.gunishjain.grabbit.internal.network.HttpClient import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.delay import kotlinx.coroutines.withContext -import okhttp3.internal.notify -import okhttp3.internal.wait import java.io.File import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.cancellation.CancellationException -import kotlin.coroutines.resume class DownloadTask(private val request: DownloadRequest,private val httpClient: HttpClient) { @@ -21,6 +18,8 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: private var file = File(request.dirPath, request.fileName) private var downloadedBytes = AtomicLong(0L) // Use AtomicLong for thread safety private val pauseLock = Object() + private var eTag: String? = null // Store ETag for consistency checking + private var lastModified: String? = null suspend fun run( onStart: () -> Unit = {}, @@ -42,16 +41,16 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: if (request.totalBytes <= 0) { request.totalBytes = httpClient.getFileSize(request.url) - Log.d("Download Task",request.totalBytes.toString()) + Log.d("Download Task", "Total bytes: ${request.totalBytes}") } while (!isCompleted) { if (isPaused.get()) { - Log.d("DownloadTask", "Download paused.") + Log.d("DownloadTask", "Download paused at byte: ${downloadedBytes.get()}") onPause() waitForResume() - Log.d("DownloadTask", "Download resumed.") - continue // Important: restart the loop after resume + Log.d("DownloadTask", "Download resumed from byte: ${downloadedBytes.get()}") + continue } try { @@ -60,7 +59,12 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: url = request.url, file = file, startByte = downloadedBytes.get(), - timeout = request.connectTimeout + timeout = request.connectTimeout, + onHeadersReceived = { headers -> + eTag = headers["ETag"] + lastModified = headers["Last-Modified"] + true + } ) { currentBytes, totalBytes -> downloadedBytes.set(currentBytes) @@ -80,7 +84,7 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: isCompleted = true onCompleted() - Log.d("DownloadTask", "Download completed.") + Log.d("DownloadTask", "Download completed successfully") break } catch (e: PauseException) { @@ -109,7 +113,7 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: } fun pauseDownload() { - Log.d("DownloadTask", "pauseDownload() called.") + Log.d("DownloadTask", "pauseDownload() called. Current bytes: ${downloadedBytes.get()}") isPaused.set(true) } @@ -122,22 +126,19 @@ class DownloadTask(private val request: DownloadRequest,private val httpClient: } private suspend fun waitForResume() { - suspendCancellableCoroutine { continuation -> - synchronized(pauseLock) { - while (isPaused.get()) { - try { - Log.d("DownloadTask", "Waiting for resume.") - pauseLock.wait() - } catch (e: InterruptedException) { - continuation.resume(Unit) - return@synchronized - } - } - continuation.resume(Unit) - } + withContext(Dispatchers.IO) { + while (isPaused.get()) { + Log.d("DownloadTask", "Waiting for resume.") + delay(100) // Use delay in coroutines to suspend efficiently without blocking the thread + } } } + fun cancelDownload() { + Log.d("DownloadTask", "cancelDownload() called at byte: ${downloadedBytes.get()}") + isCompleted = true + } + private class PauseException : Exception() diff --git a/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/DefaultHttpClient.kt b/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/DefaultHttpClient.kt index 3b8f42f..49769ce 100644 --- a/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/DefaultHttpClient.kt +++ b/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/DefaultHttpClient.kt @@ -26,6 +26,7 @@ class DefaultHttpClient : HttpClient { file: File, startByte: Long, timeout: Int, + onHeadersReceived: (Map) -> Boolean, onProgress: (Long, Long) -> Unit ) = withContext(Dispatchers.IO){ @@ -45,36 +46,39 @@ class DefaultHttpClient : HttpClient { throw IOException("Unexpected response code: ${response.code}") } + // Extract and pass headers to callback + val headers = mutableMapOf() + response.headers.forEach { (name, value) -> + headers[name] = value + } + + // Allow caller to validate headers + if (!onHeadersReceived(headers)) { + throw IOException("Header validation failed") + } + // Get content length from header val contentLength = response.header("Content-Length")?.toLong() ?: -1L val totalBytes = if (contentLength != -1L) contentLength + startByte else -1L - Log.d("Default HTTPCLIENT", "Total File Size: $totalBytes") + Log.d("DefaultHttpClient", "Resuming download from byte: $startByte") - // Create parent directories if they don't exist - file.parentFile?.mkdirs() - // Use response body to write to file response.body?.let { body -> + file.parentFile?.mkdirs() - Log.d("HTTPCLIENT",body.toString()) val bufferedSink = file.sink(append = startByte > 0).buffer() val source = body.source() val buffer = Buffer() var downloadedBytes = startByte while (true) { - val read = source.read(buffer, 8192L) // Read chunks of 8KB + val read = source.read(buffer, 8192L) if (read == -1L) break bufferedSink.write(buffer, read) downloadedBytes += read - - // Report progress - -// Log.d("HTTPCLIENT",downloadedBytes.toString()) - onProgress(downloadedBytes, totalBytes) } @@ -82,14 +86,12 @@ class DefaultHttpClient : HttpClient { source.close() } ?: throw IOException("Response body is null") } - } catch (e: CancellationException) { throw e } catch (e: Exception) { - Log.e("HTTPCLEINT", "Error occurred: ${e.message}", e) + Log.e("DefaultHttpClient", "Error occurred: ${e.message}", e) throw IOException("Download failed: ${e.message}", e) } - } override suspend fun getFileSize(url: String): Long = withContext(Dispatchers.IO) { @@ -98,8 +100,9 @@ class DefaultHttpClient : HttpClient { .head() // Use HEAD request to get only headers .build() - val response = okHttpClient.newCall(request).execute() - response.header("Content-Length")?.toLong() ?: -1L + okHttpClient.newCall(request).execute().use { response -> + response.header("Content-Length")?.toLong() ?: -1L + } } } \ No newline at end of file diff --git a/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/HttpClient.kt b/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/HttpClient.kt index 30550ac..84dc0f9 100644 --- a/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/HttpClient.kt +++ b/grabbit/src/main/java/com/gunishjain/grabbit/internal/network/HttpClient.kt @@ -9,6 +9,7 @@ interface HttpClient { file: File, startByte: Long = 0, timeout: Int = 30000, + onHeadersReceived: (Map) -> Boolean = { true }, onProgress: (downloadedBytes: Long, totalBytes: Long) -> Unit = { _, _ -> } )