Skip to content

Commit

Permalink
Keep download state until request completed
Browse files Browse the repository at this point in the history
  • Loading branch information
james58899 committed Dec 15, 2023
1 parent ef58de1 commit 272beef
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ struct Args {
proxy: Option<String>,
}

type DownloadState = RwLock<HashMap<[u8; 20], (watch::Receiver<Option<Arc<TempPath>>>, watch::Receiver<u64>)>>;
type DownloadState = RwLock<HashMap<[u8; 20], (watch::Receiver<Option<Arc<TempPath>>>, Arc<watch::Sender<u64>>)>>;

struct AppState {
runtime: Handle,
Expand Down
16 changes: 9 additions & 7 deletions src/route/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ async fn hath(
data.download_state.write().remove(&info.hash());
return HttpResponse::NotFound().body("An error has occurred. (404)");
}
(tempfile.unwrap().as_ref().unwrap().clone(), progress)
(tempfile.unwrap().as_ref().unwrap().clone(), progress.subscribe())
} else {
// Tracking download progress
let (temp_tx, temp_rx) = watch::channel(None); // Tempfile
let (tx, rx) = watch::channel(0); // Download progress
data.download_state.write().insert(info.hash(), (temp_rx.clone(), rx.clone()));
let tx = Arc::new(watch::channel(0).0); // Download progress
data.download_state.write().insert(info.hash(), (temp_rx.clone(), tx.clone()));

let temp_path = Arc::new(data.cache_manager.create_temp_file().await);
temp_tx.send_replace(Some(temp_path.clone()));
Expand All @@ -109,6 +109,7 @@ async fn hath(
};

// Download worker
let tx2: Arc<watch::Sender<u64>> = tx.clone();
let info2 = info.clone();
let temp_path2 = temp_path.clone();
data.runtime.clone().spawn(async move {
Expand Down Expand Up @@ -169,18 +170,19 @@ async fn hath(
}
hasher.update(data);
progress += write_size as u64;
tx.send_replace(progress);
tx2.send_replace(progress);
}
if progress == file_size {
if let Err(err) = file.flush().await {
error!("Proxy temp file flush fail: {}", err);
break 'retry;
}
let hash = hasher.finish();
tx2.send_replace(progress);
tx2.closed().await; // Wait all request done
data.download_state.write().remove(&info2.hash());
tx.send_replace(progress);
if hash == info2.hash() {
tx.closed().await; // Wait all request done
tx2.closed().await; // Wait again to avoid race conditions
data.cache_manager.import_cache(&info2, &temp_path2).await;
} else {
error!("Cache hash mismatch: expected: {:x?}, got: {:x?}", info2.hash(), hash);
Expand All @@ -194,7 +196,7 @@ async fn hath(
data.download_state.write().remove(&info2.hash());
});

(temp_path, rx)
(temp_path, tx.subscribe())
};

// Wait download start or 404
Expand Down

0 comments on commit 272beef

Please sign in to comment.