Skip to content

Commit

Permalink
update fingerprint handling, and clean up unused code in volume manag…
Browse files Browse the repository at this point in the history
…ement
  • Loading branch information
jamiepine committed Nov 5, 2024
1 parent 4bc2b68 commit 4a5c4ce
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 216 deletions.
20 changes: 6 additions & 14 deletions core/src/api/volumes.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
use super::{utils::library, Ctx, R};
use crate::library::Library;
use crate::volume::{VolumeEvent, VolumeFingerprint};
use rspc::alpha::AlphaRouter;
use serde::Deserialize;
use specta::Type;
use std::path::PathBuf;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;

#[derive(Type, Deserialize)]
pub struct TrackVolumeInput {
pub volume_id: VolumeFingerprint,
}

pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router()
Expand All @@ -30,21 +21,22 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
)
.procedure(
"track",
R.with2(library())
.mutation(|(node, library), input: TrackVolumeInput| async move {
R.with2(library()).mutation(
|(node, library), fingerprint: VolumeFingerprint| async move {
tracing::debug!(
"Handling track volume request for volume_id={:?}",
input.volume_id
fingerprint
);

node.volumes
.track_volume(input.volume_id, library)
.track_volume(fingerprint, library)
.await
.map_err(|e| {
tracing::error!("Failed to track volume: {:?}", e);
e.into()
})
}),
},
),
)
.procedure(
"listForLibrary",
Expand Down
32 changes: 6 additions & 26 deletions core/src/volume/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ use super::{
watcher::VolumeWatcher,
VolumeManagerContext, VolumeManagerState,
};
use crate::volume::types::{VolumeFingerprint, VolumePubId};
use crate::volume::types::VolumeFingerprint;
use crate::{
library::{self, Library, LibraryManagerEvent},
library::{Library, LibraryManagerEvent},
volume::MountType,
};
use async_channel as chan;
use sd_core_sync::DevicePubId;
use sd_prisma::prisma::volume;
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tracing::{debug, error, info, trace, warn};
use tracing_subscriber::registry;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_CHANNEL_SIZE: usize = 128;

#[derive(Debug)]
Expand Down Expand Up @@ -80,7 +78,7 @@ impl VolumeManagerActor {
options: VolumeOptions,
) -> Result<(Volumes, Self), VolumeError> {
let (message_tx, message_rx) = chan::bounded(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = broadcast::channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, _) = broadcast::channel(DEFAULT_CHANNEL_SIZE);

let manager = Volumes::new(message_tx, event_tx.clone());
let state =
Expand Down Expand Up @@ -279,10 +277,6 @@ impl VolumeManagerActor {
let state = self.state.clone();
let state = state.write().await;
let mut registry = state.registry.write().await;
let mut library_registry = state.library_registry.write().await;

// Initialize library in state
library_registry.register_library(library.id);

let db_device = library
.db
Expand Down Expand Up @@ -312,25 +306,11 @@ impl VolumeManagerActor {
.find(|db_vol| VolumeFingerprint::new(&device_id, db_vol) == *fingerprint)
{
// Update existing volume
let updated = Volume::merge_with_db_volume(volume, db_volume);
let updated = Volume::merge_with_db(volume, db_volume);
registry.register_volume(updated.clone());

// Map to library
library_registry.track_volume(
library.id,
fingerprint.clone(),
VolumePubId(db_volume.pub_id.clone().unwrap()),
);
} else if volume.mount_type == MountType::System {
// Create new system volume in database
let created = volume.create(&library.db, device_id.to_db()).await?;

// Map to library
library_registry.track_volume(
library.id,
fingerprint.clone(),
VolumePubId(created.pub_id.unwrap()),
);
}
}

Expand Down Expand Up @@ -398,7 +378,7 @@ impl VolumeManagerActor {
self.state
.read()
.await
.get_volumes_for_library(library.id)
.get_volumes_for_library(library)
.await
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/volume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
//!
//! This module provides functionality for detecting, monitoring, and managing storage volumes
//! across different platforms.
use std::sync::Arc;

// Internal modules
//! Volumes use a fingerprint to identify them as they sometimes are not persisted in the database
//!
pub(crate) mod actor;
mod error;
mod os;
mod speed;
mod state;
// mod statistics;
mod types;
mod volumes;
mod watcher;
use crate::library::LibraryManagerEvent;
use crate::util::mpscrr;
// Core type exports

pub use {
actor::VolumeManagerActor,
error::VolumeError,
Expand Down Expand Up @@ -59,6 +57,7 @@ pub use os::linux::get_volumes;
#[cfg(target_os = "macos")]
pub use os::macos::get_volumes;
#[cfg(target_os = "windows")]
pub use os::windows::get_volumes;

// Internal utilities
pub(crate) mod util {
Expand Down
6 changes: 6 additions & 0 deletions core/src/volume/speed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ impl SpeedTest for Volume {
event_tx: Option<&Sender<VolumeEvent>>,
) -> Result<SpeedTestResult, VolumeError> {
let config = config.unwrap_or_default();

// if volume is not mounted or not writable, return an error
if !self.is_mounted || self.read_only {
return Err(VolumeError::Cancelled);
}

debug!("Starting speed test with config: {:?}", config);

let test_location = TestLocation::new(&self.mount_point, &self.mount_type).await?;
Expand Down
149 changes: 48 additions & 101 deletions core/src/volume/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::{
};

use sd_core_sync::DevicePubId;
use std::{clone, collections::HashSet};
use std::collections::HashSet;
use std::{collections::HashMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error};

use super::{VolumeError, VolumeOptions};
use super::{MountType, VolumeError, VolumeOptions};
// Core volume registry
pub struct VolumeRegistry {
volumes: HashMap<VolumeFingerprint, Volume>,
Expand Down Expand Up @@ -61,66 +61,9 @@ impl VolumeRegistry {
}
}

// Library volume mapping
#[derive(Default)]
pub struct LibraryVolumeRegistry {
// LibraryId -> (VolumeFingerprint -> VolumePubId)
mappings: HashMap<LibraryId, HashMap<VolumeFingerprint, VolumePubId>>,
}

impl LibraryVolumeRegistry {
pub fn new() -> Self {
Self {
mappings: HashMap::new(),
}
}

pub fn register_library(&mut self, library_id: LibraryId) {
self.mappings.entry(library_id).or_default();
}

pub fn track_volume(
&mut self,
library_id: LibraryId,
fingerprint: VolumeFingerprint,
pub_id: VolumePubId,
) {
if let Some(mapping) = self.mappings.get_mut(&library_id) {
mapping.insert(fingerprint, pub_id);
}
}

pub fn get_volume_id(
&self,
library_id: &LibraryId,
fingerprint: &VolumeFingerprint,
) -> Option<&VolumePubId> {
self.mappings.get(library_id)?.get(fingerprint)
}

///
pub fn untrack_volume(&mut self, library_id: &LibraryId, fingerprint: &VolumeFingerprint) {
if let Some(mapping) = self.mappings.get_mut(library_id) {
mapping.remove(fingerprint);
}
}

// Removes all mappings for a library
pub fn remove_library(
&mut self,
library_id: &LibraryId,
) -> Vec<(VolumeFingerprint, VolumePubId)> {
self.mappings
.remove(library_id)
.map(|m| m.into_iter().collect())
.unwrap_or_default()
}
}

// Main state manager
pub struct VolumeManagerState {
pub registry: Arc<RwLock<VolumeRegistry>>,
pub library_registry: Arc<RwLock<LibraryVolumeRegistry>>,
options: VolumeOptions,
event_tx: broadcast::Sender<VolumeEvent>,
last_scan: Instant,
Expand All @@ -134,7 +77,6 @@ impl VolumeManagerState {
) -> Self {
Self {
registry: Arc::new(RwLock::new(VolumeRegistry::new(device_id))),
library_registry: Arc::new(RwLock::new(LibraryVolumeRegistry::new())),
options,
event_tx,
last_scan: Instant::now(),
Expand Down Expand Up @@ -163,11 +105,14 @@ impl VolumeManagerState {
let event_tx = self.event_tx.clone();
drop(registry);

tokio::spawn(async move {
if let Err(e) = volume_clone.speed_test(None, Some(&event_tx)).await {
error!(?e, "Failed to perform speed test for volume");
}
});
// Spawn a background task to perform the speed test only for system volumes
if volume.mount_type == MountType::System {
tokio::spawn(async move {
if let Err(e) = volume_clone.speed_test(None, Some(&event_tx)).await {
error!(?e, "Failed to perform speed test for volume");
}
});
}

registry = self.registry.write().await;
}
Expand All @@ -184,39 +129,27 @@ impl VolumeManagerState {
Ok(())
}

pub async fn register_with_library(
&self,
library_id: LibraryId,
volume: &Volume,
library: Arc<Library>,
) -> Result<(), VolumeError> {
let device_id = self.registry.read().await.device_id.clone();
let fingerprint = VolumeFingerprint::new(&device_id, volume);

// Create in database
volume.create(&library.db, device_id.to_db()).await?;
// pub async fn register_with_library(
// &self,
// library_id: LibraryId,
// volume: &Volume,
// library: Arc<Library>,
// ) -> Result<(), VolumeError> {
// let device_id = self.registry.read().await.device_id.clone();
// let fingerprint = VolumeFingerprint::new(&device_id, volume);

// Track the relationship
self.library_registry.write().await.track_volume(
library_id,
fingerprint,
VolumePubId::from(volume.pub_id.clone().unwrap()),
);
// // Create in database
// volume.create(&library.db, device_id.to_db()).await?;

Ok(())
}
// // Track the relationship
// self.library_registry.write().await.track_volume(
// library_id,
// fingerprint,
// VolumePubId::from(volume.pub_id.clone().unwrap()),
// );

pub async fn get_volume_pub_id(
&self,
library_id: &LibraryId,
fingerprint: &VolumeFingerprint,
) -> Option<VolumePubId> {
self.library_registry
.read()
.await
.get_volume_id(library_id, fingerprint)
.cloned()
}
// Ok(())
// }

pub async fn get_volume(&self, fingerprint: &VolumeFingerprint) -> Option<Volume> {
self.registry.read().await.get_volume(fingerprint).cloned()
Expand All @@ -233,20 +166,34 @@ impl VolumeManagerState {

pub async fn get_volumes_for_library(
&self,
library_id: LibraryId,
library: Arc<Library>,
) -> Result<Vec<Volume>, VolumeError> {
let registry = self.registry.read().await;
let library_registry = self.library_registry.read().await;

let mut volumes = Vec::new();

let device_id = &self.registry.read().await.device_id;

let db_volumes = library
.db
.volume()
.find_many(vec![])
.exec()
.await?
.into_iter()
.map(Volume::from)
.map(|v| {
// TODO: maybe just store the fingerprint at this point?
let fingerprint = VolumeFingerprint::new(device_id, &v);
(fingerprint, v)
})
.collect::<HashMap<VolumeFingerprint, Volume>>();

for (fingerprint, volume) in registry.volumes() {
debug!("Processing volume: {:?}", volume);
let mut volume = volume.clone();

// Update volume with library-specific pub_id if available
if let Some(pub_id) = library_registry.get_volume_id(&library_id, fingerprint) {
volume.pub_id = Some(pub_id.clone().into());
if let Some(db_volume) = db_volumes.get(&fingerprint) {
volume = Volume::merge_with_db(&volume, db_volume);
}

volumes.push(volume);
Expand Down
Loading

0 comments on commit 4a5c4ce

Please sign in to comment.