Skip to content

Commit

Permalink
Refactor volume management to streamline event handling, state update…
Browse files Browse the repository at this point in the history
…s, and improve watcher interactions within the system
  • Loading branch information
jamiepine committed Nov 3, 2024
1 parent f60ff48 commit 4c866e6
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 248 deletions.
1 change: 0 additions & 1 deletion core/src/api/volumes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let mut event_bus_rx = node.volumes.subscribe();

while let Ok(event) = event_bus_rx.recv().await {
tracing::debug!("Volume event: {:?}", event);
yield event;
}
})
Expand Down
125 changes: 50 additions & 75 deletions core/src/volume/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::{
};
use crate::library::{Library, LibraryManagerEvent};
use async_channel as chan;
use sd_prisma::prisma::album::pub_id;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tokio::time::Instant;
Expand All @@ -31,14 +32,6 @@ pub enum VolumeManagerMessage {
volume: Volume,
ack: oneshot::Sender<Result<(), VolumeError>>,
},
WatchVolume {
volume_id: Vec<u8>,
ack: oneshot::Sender<Result<(), VolumeError>>,
},
UnwatchVolume {
volume_id: Vec<u8>,
ack: oneshot::Sender<Result<(), VolumeError>>,
},
MountVolume {
volume_id: Vec<u8>,
ack: oneshot::Sender<Result<(), VolumeError>>,
Expand Down Expand Up @@ -101,14 +94,50 @@ impl VolumeManagerActor {
tokio::spawn(async move {
debug!("Starting volume event monitoring");
while let Ok(event) = event_rx.recv().await {
debug!("Volume event processed: {:?}", event);
debug!("Volume event received: {:?}", event);

match event {
VolumeEvent::VolumeAdded(mut volume) => {
if let Some(pub_id) = volume.pub_id.take() {
self.state.write().await.volumes.insert(pub_id, volume);
}
}
VolumeEvent::VolumeRemoved(mut volume) => {
if let Some(pub_id) = volume.pub_id.take() {
self.state.write().await.volumes.remove(&pub_id);
}
}
VolumeEvent::VolumeUpdated { old, new } => todo!(),
VolumeEvent::VolumeSpeedTested {
id,
read_speed,
write_speed,
} => {
self.state
.write()
.await
.volumes
.get_mut(&id)
.unwrap()
.read_speed_mbps = Some(read_speed);
self.state
.write()
.await
.volumes
.get_mut(&id)
.unwrap()
.write_speed_mbps = Some(write_speed);
}
VolumeEvent::VolumeMountChanged { id, is_mounted } => todo!(),
VolumeEvent::VolumeError { id, error } => todo!(),
}
}
warn!("Volume event monitoring ended");
});
}

/// Starts the VolumeManagerActor
pub async fn start(self) {
pub async fn start(self, device_pub_id: Vec<u8>) {
info!("Volume manager actor started");
let self_arc = Arc::new(Mutex::new(self));

Expand Down Expand Up @@ -179,19 +208,14 @@ impl VolumeManagerActor {
}
});

// Start the volume watcher
let self_arc_watcher = Arc::clone(&self_arc);
let event_tx = self_arc.lock().await.event_tx.clone();

tokio::spawn(async move {
let mut actor = self_arc_watcher.lock().await;
let state = actor.state.write().await;

// Create and start watcher for each volume
for (volume_id, volume) in &state.volumes {
if let Some(watcher) = state.watchers.get(volume_id) {
if let Err(e) = watcher.watcher.start().await {
error!(?e, "Failed to start watcher for volume {}", volume.name);
}
}
// start one watcher
let watcher = VolumeWatcher::new(event_tx);
if let Err(e) = watcher.start(device_pub_id.clone(), self_arc.clone()).await {
error!(?e, "Failed to start watcher for volumes");
return;
}
});

Expand Down Expand Up @@ -255,42 +279,6 @@ impl VolumeManagerActor {
// Update volumes
for (pub_id, volume) in updates {
state.volumes.insert(pub_id.clone(), volume);

// Create and start watcher if it doesn't exist
if !state.watchers.contains_key(&pub_id) {
let watcher = VolumeWatcher::new(self.event_tx.clone());
if let Err(e) = watcher.start().await {
error!(
?e,
"Failed to start watcher for volume {}",
hex::encode(&pub_id)
);
continue;
}

state.watchers.insert(
pub_id,
WatcherState {
watcher: Arc::new(watcher),
last_event: Instant::now(),
paused: false,
},
);
}
}

// Remove any watchers for volumes that no longer exist
let stale_watchers: Vec<_> = state
.watchers
.keys()
.filter(|id| !state.volumes.contains_key(*id))
.cloned()
.collect();

for volume_id in stale_watchers {
if let Some(watcher_state) = state.watchers.remove(&volume_id) {
watcher_state.watcher.stop().await;
}
}
}

Expand All @@ -317,21 +305,6 @@ impl VolumeManagerActor {
state = self.state.write().await;
}

// Clean up stale watchers
let stale_watchers: Vec<_> = state
.watchers
.iter()
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
.map(|(id, _)| id.clone())
.collect();

for volume_id in stale_watchers {
if let Some(watcher_state) = state.watchers.get(&volume_id) {
watcher_state.watcher.stop().await;
}
state.watchers.remove(&volume_id);
}

Ok(())
}

Expand Down Expand Up @@ -369,8 +342,6 @@ impl VolumeManagerActor {
ack,
} => todo!(),
VolumeManagerMessage::UpdateVolume { volume, ack } => todo!(),
VolumeManagerMessage::WatchVolume { volume_id, ack } => todo!(),
VolumeManagerMessage::UnwatchVolume { volume_id, ack } => todo!(),
VolumeManagerMessage::MountVolume { volume_id, ack } => todo!(),
VolumeManagerMessage::UnmountVolume { volume_id, ack } => todo!(),
VolumeManagerMessage::SpeedTest { volume_id, ack } => todo!(),
Expand All @@ -387,6 +358,10 @@ impl VolumeManagerActor {
Ok(self.state.read().await.volumes.values().cloned().collect())
}

pub async fn get_volumes(&self) -> Vec<Volume> {
self.state.read().await.volumes.values().cloned().collect()
}

async fn handle_list_library_volumes(
&self,
library: Arc<Library>,
Expand Down
142 changes: 28 additions & 114 deletions core/src/volume/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use uuid::Uuid;
pub struct VolumeManagerState {
/// All tracked volumes
pub volumes: HashMap<Vec<u8>, Volume>,
/// Active watchers
pub watchers: HashMap<Vec<u8>, WatcherState>,
/// Volume manager options
pub options: VolumeOptions,
/// Event broadcaster
Expand All @@ -36,115 +34,67 @@ impl VolumeManagerState {
) -> Result<Self, VolumeError> {
Ok(Self {
volumes: HashMap::new(),
watchers: HashMap::new(),
options,
event_tx,
last_scan: Instant::now(),
})
}

/// Scans the system for volumes and updates the state
/// This happens on startup, and during the volume manager's maintenance task
pub async fn scan_volumes(&mut self, device_pub_id: Vec<u8>) -> Result<(), VolumeError> {
debug!("Scanning for volumes...");
let detected_volumes = super::os::get_volumes().await?;
debug!("Found {} volumes during scan", detected_volumes.len());

let current_volumes = self.volumes.clone();
let mut new_state = HashMap::new();
let current_volumes = self.volumes.clone(); // Copy of current state
let mut new_state = HashMap::new(); // New state to build with detected volumes

// Process detected volumes
// Process each detected volume
for mut volume in detected_volumes {
// Skip virtual volumes if configured
if !self.options.include_virtual && matches!(volume.mount_type, MountType::Virtual) {
continue;
}

// Generate fingerprint for the new volume
// Generate a unique fingerprint to identify the volume
let fingerprint = volume.generate_fingerprint(device_pub_id.clone());

// Try to find existing volume by fingerprint
let existing_volume = current_volumes.values().find(|existing| {
// Check if this volume is already tracked in the current volumes
if let Some(existing) = current_volumes.values().find(|existing| {
existing.generate_fingerprint(device_pub_id.clone()) == fingerprint
});

let volume_id = if let Some(existing) = existing_volume {
// Keep existing ID
existing
.pub_id
.clone()
.unwrap_or_else(|| Uuid::now_v7().as_bytes().to_vec())
} else {
// Generate new ID only for truly new volumes
Uuid::now_v7().as_bytes().to_vec()
};

volume.pub_id = Some(volume_id.clone());

match current_volumes.get(&volume_id) {
Some(existing) if existing != &volume => {
new_state.insert(volume_id.clone(), volume.clone());
}) {
// Compare current and detected volume properties
if existing == &volume {
// If nothing has changed, just add to the new state and skip VolumeAdded
new_state.insert(existing.pub_id.clone().unwrap(), existing.clone());
continue;
} else {
// If properties have changed, update with the new properties and emit an update event
self.emit_event(VolumeEvent::VolumeUpdated {
old: existing.clone(),
new: volume,
new: volume.clone(),
})
.await;
}
None => {
new_state.insert(volume_id.clone(), volume.clone());
self.emit_event(VolumeEvent::VolumeAdded(volume)).await;
}
_ => {
new_state.insert(volume_id.clone(), volume);
}
} else {
// If the volume is genuinely new, assign an ID and emit a VolumeAdded event
let volume_id = Uuid::now_v7().as_bytes().to_vec();
volume.pub_id = Some(volume_id.clone());
self.emit_event(VolumeEvent::VolumeAdded(volume.clone()))
.await;
}

// Insert volume into new state (whether new or updated)
new_state.insert(volume.pub_id.clone().unwrap(), volume);
}

// Find removed volumes
// Identify and handle removed volumes
for (id, volume) in &current_volumes {
if !new_state.contains_key(id) {
self.watchers.remove(id);
self.emit_event(VolumeEvent::VolumeRemoved(volume.clone()))
.await;
}
}

// Update state
// Update the volume manager's state with the new volume list
self.volumes = new_state;
self.last_scan = Instant::now();
Ok(())
}

/// Starts watching a volume
#[instrument(skip(self))]
pub async fn watch_volume(&mut self, volume_id: Vec<u8>) -> Result<(), VolumeError> {
if self.watchers.contains_key(&volume_id) {
debug!("Already watching volume {:?}", hex::encode(&volume_id));
return Ok(());
}

let watcher = Arc::new(VolumeWatcher::new(self.event_tx.clone()));
watcher.start().await?;

self.watchers.insert(
volume_id.clone(),
WatcherState {
watcher,
last_event: Instant::now(),
paused: false,
},
);

debug!("Started watching volume {}", hex::encode(&volume_id));
Ok(())
}

/// Stops watching a volume
#[instrument(skip(self))]
pub async fn unwatch_volume(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> {
if let Some(state) = self.watchers.remove(volume_id) {
state.watcher.stop().await;
debug!("Stopped watching volume {}", hex::encode(volume_id));
}
self.last_scan = Instant::now(); // Update the last scan time
Ok(())
}

Expand Down Expand Up @@ -198,30 +148,6 @@ impl VolumeManagerState {
Ok(())
}

/// Temporarily pauses a volume watcher
#[instrument(skip(self))]
pub async fn pause_watcher(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> {
if let Some(state) = self.watchers.get_mut(volume_id) {
if !state.paused {
state.paused = true;
debug!("Paused watcher for volume {}", hex::encode(volume_id));
}
}
Ok(())
}

/// Resumes a paused volume watcher
#[instrument(skip(self))]
pub async fn resume_watcher(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> {
if let Some(state) = self.watchers.get_mut(volume_id) {
if state.paused {
state.paused = false;
debug!("Resumed watcher for volume {}", hex::encode(volume_id));
}
}
Ok(())
}

/// Helper to emit events
async fn emit_event(&self, event: VolumeEvent) {
if let Err(e) = self.event_tx.send(event) {
Expand All @@ -236,18 +162,6 @@ impl VolumeManagerState {
self.scan_volumes(device_pub_id).await?;
}

// Clean up stale watchers
let stale_watchers: Vec<_> = self
.watchers
.iter()
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
.map(|(id, _)| id.clone())
.collect();

for volume_id in stale_watchers {
self.unwatch_volume(&volume_id).await?;
}

Ok(())
}
}
Expand Down
Loading

0 comments on commit 4c866e6

Please sign in to comment.