Skip to content

Commit

Permalink
Add volume event logging and improve watcher integration with enhance…
Browse files Browse the repository at this point in the history
…d state management and maintenance routines
  • Loading branch information
jamiepine committed Nov 3, 2024
1 parent 9a7fa46 commit f60ff48
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 320 deletions.
1 change: 1 addition & 0 deletions core/src/api/volumes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let mut event_bus_rx = node.volumes.subscribe();

while let Ok(event) = event_bus_rx.recv().await {
tracing::debug!("Volume event: {:?}", event);
yield event;
}
})
Expand Down
162 changes: 120 additions & 42 deletions core/src/volume/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use super::{
error::VolumeError,
types::{Volume, VolumeEvent, VolumeOptions},
volumes::Volumes,
watcher::{VolumeWatcher, WatcherState},
VolumeManagerContext, VolumeManagerState,
};
use crate::library::{Library, LibraryManagerEvent};
use async_channel as chan;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tracing::{error, info, trace};
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_CHANNEL_SIZE: usize = 128;
Expand Down Expand Up @@ -77,10 +79,10 @@ impl VolumeManagerActor {
options: VolumeOptions,
) -> Result<(Volumes, Self), VolumeError> {
let (message_tx, message_rx) = chan::bounded(DEFAULT_CHANNEL_SIZE);
let (event_tx, _) = broadcast::channel(DEFAULT_CHANNEL_SIZE);
let (event_tx, event_rx) = broadcast::channel(DEFAULT_CHANNEL_SIZE);

let manager = Volumes::new(message_tx, event_tx.clone());
let state = VolumeManagerState::new(options).await?;
let state = VolumeManagerState::new(options, event_tx.clone()).await?;

let actor = VolumeManagerActor {
state: Arc::new(RwLock::new(state)),
Expand All @@ -89,9 +91,22 @@ impl VolumeManagerActor {
ctx,
};

// Pass event_rx to start monitoring task immediately
actor.clone().start_event_monitoring(event_rx);

Ok((manager, actor))
}

fn start_event_monitoring(self, mut event_rx: broadcast::Receiver<VolumeEvent>) {
tokio::spawn(async move {
debug!("Starting volume event monitoring");
while let Ok(event) = event_rx.recv().await {
debug!("Volume event processed: {:?}", event);
}
warn!("Volume event monitoring ended");
});
}

/// Starts the VolumeManagerActor
pub async fn start(self) {
info!("Volume manager actor started");
Expand Down Expand Up @@ -164,25 +179,36 @@ impl VolumeManagerActor {
}
});

info!("Volume manager actor initialized");
}
// Start the volume watcher
let self_arc_watcher = Arc::clone(&self_arc);
tokio::spawn(async move {
let mut actor = self_arc_watcher.lock().await;
let state = actor.state.write().await;

// Create and start watcher for each volume
for (volume_id, volume) in &state.volumes {
if let Some(watcher) = state.watchers.get(volume_id) {
if let Err(e) = watcher.watcher.start().await {
error!(?e, "Failed to start watcher for volume {}", volume.name);
}
}
}
});

async fn scan_volumes(&mut self) -> Result<(), VolumeError> {
let mut state = self.state.write().await;
state.scan_volumes().await
info!("Volume manager actor initialized");
}

pub async fn initialize(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
// Use device_id from context instead of node
let device_pub_id = self.ctx.device_id.clone();

// Scan for system volumes first
{
let mut state = self.state.write().await;
state.scan_volumes().await?;
state.scan_volumes(device_pub_id.clone()).await?;
}

// Use device_id from context instead of node
let device_id = self.ctx.device_id.clone();

// Rest of initialize remains the same...
// Get volumes from library database
let db_volumes = library
.db
.volume()
Expand All @@ -202,11 +228,11 @@ impl VolumeManagerActor {

// Prepare updates
for db_volume in db_volumes {
let fingerprint = db_volume.generate_fingerprint(device_id.clone().into());
let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into());

if let Some(system_volume) = current_volumes
.values()
.find(|v| v.generate_fingerprint(device_id.clone().into()) == fingerprint)
.find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint)
{
let merged = Volume::merge_with_db_volume(system_volume, &db_volume);
if let Some(pub_id) = &merged.pub_id {
Expand All @@ -222,11 +248,49 @@ impl VolumeManagerActor {
}
}

// Apply updates
// Apply updates and initialize watchers
{
let mut state = self.state.write().await;

// Update volumes
for (pub_id, volume) in updates {
state.volumes.insert(pub_id, volume);
state.volumes.insert(pub_id.clone(), volume);

// Create and start watcher if it doesn't exist
if !state.watchers.contains_key(&pub_id) {
let watcher = VolumeWatcher::new(self.event_tx.clone());
if let Err(e) = watcher.start().await {
error!(
?e,
"Failed to start watcher for volume {}",
hex::encode(&pub_id)
);
continue;
}

state.watchers.insert(
pub_id,
WatcherState {
watcher: Arc::new(watcher),
last_event: Instant::now(),
paused: false,
},
);
}
}

// Remove any watchers for volumes that no longer exist
let stale_watchers: Vec<_> = state
.watchers
.keys()
.filter(|id| !state.volumes.contains_key(*id))
.cloned()
.collect();

for volume_id in stale_watchers {
if let Some(watcher_state) = state.watchers.remove(&volume_id) {
watcher_state.watcher.stop().await;
}
}
}

Expand All @@ -237,6 +301,45 @@ impl VolumeManagerActor {

Ok(())
}

async fn perform_maintenance(&mut self) -> Result<(), VolumeError> {
let mut state = self.state.write().await;

// Pass device_id to maintenance
if let Err(e) = state.maintenance(self.ctx.device_id.clone()).await {
error!(?e, "Volume maintenance error");
}

// Rescan volumes periodically
if state.last_scan.elapsed() > Duration::from_secs(300) {
drop(state);
self.scan_volumes().await?;
state = self.state.write().await;
}

// Clean up stale watchers
let stale_watchers: Vec<_> = state
.watchers
.iter()
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
.map(|(id, _)| id.clone())
.collect();

for volume_id in stale_watchers {
if let Some(watcher_state) = state.watchers.get(&volume_id) {
watcher_state.watcher.stop().await;
}
state.watchers.remove(&volume_id);
}

Ok(())
}

async fn scan_volumes(&mut self) -> Result<(), VolumeError> {
let mut state = self.state.write().await;
state.scan_volumes(self.ctx.device_id.clone()).await
}

async fn handle_message(&mut self, msg: VolumeManagerMessage) -> Result<(), VolumeError> {
trace!("VolumeManagerActor received message: {:?}", msg);
match msg {
Expand Down Expand Up @@ -356,31 +459,6 @@ impl VolumeManagerActor {
Ok(())
}

async fn perform_maintenance(&mut self) -> Result<(), VolumeError> {
let mut state = self.state.write().await;

// Rescan volumes periodically
if state.last_scan.elapsed() > Duration::from_secs(300) {
drop(state);
self.scan_volumes().await?;
state = self.state.write().await;
}

// Clean up stale watchers
let stale_watchers: Vec<_> = state
.watchers
.iter()
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
.map(|(id, _)| id.clone())
.collect();

for volume_id in stale_watchers {
state.unwatch_volume(&volume_id).await?;
}

Ok(())
}

async fn handle_library_deletion(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
// Clean up volumes associated with deleted library
let _state = self.state.write().await;
Expand Down
Loading

0 comments on commit f60ff48

Please sign in to comment.