Skip to content

Commit

Permalink
#303 Refactor round storage methods to return actions
Browse files Browse the repository at this point in the history
+ remove_contributor_unsafe
+ remove_locks_unsafe
+ remove_chunk_contributions_unsafe
  • Loading branch information
kellpossible committed Aug 30, 2021
1 parent 06659f4 commit c93c1b6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 68 deletions.
20 changes: 8 additions & 12 deletions phase1-coordinator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2293,12 +2293,11 @@ impl Coordinator {
let mut round = Self::load_current_round(&self.storage)?;

// Remove the contributor from the round self.
round.remove_contributor_unsafe(
&mut self.storage,
self.storage.perform_actions(round.remove_contributor_unsafe(
&replace_action.dropped_contributor,
&replace_action.locked_chunks,
&replace_action.tasks,
)?;
)?)?;

// Assign a replacement contributor to the dropped tasks for the current round.
round.add_replacement_contributor_unsafe(replace_action.replacement_contributor.clone())?;
Expand All @@ -2324,18 +2323,14 @@ impl Coordinator {
warn!("Removing locked chunks and all impacted contributions");

// Remove the lock from the specified chunks.
round.remove_locks_unsafe(
&mut self.storage,
&remove_action.dropped_verifier,
&remove_action.locked_chunks,
self.storage.perform_actions(
round.remove_locks_unsafe(&remove_action.dropped_verifier, &remove_action.locked_chunks)?,
)?;
warn!("Removed locked chunks");

// Remove the contributions from the specified chunks.
round.remove_chunk_contributions_unsafe(
&mut self.storage,
&remove_action.dropped_verifier,
&remove_action.tasks,
self.storage.perform_actions(
round.remove_chunk_contributions_unsafe(&remove_action.dropped_verifier, &remove_action.tasks)?,
)?;
warn!("Removed impacted contributions");

Expand Down Expand Up @@ -2453,7 +2448,8 @@ impl Coordinator {
let mut round = Self::load_round(&mut self.storage, current_round_height)?;

tracing::debug!("Resetting round and applying storage changes");
self.storage.process(round.reset(&reset_action.remove_participants))?;
self.storage
.perform_action(round.reset(&reset_action.remove_participants))?;

// Clear all files
self.storage.clear_round_files(current_round_height)?;
Expand Down
107 changes: 52 additions & 55 deletions phase1-coordinator/src/objects/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,19 +798,18 @@ impl Round {
/// Remove a contributor from the round.
pub(crate) fn remove_contributor_unsafe(
&mut self,
storage: &mut Disk,
contributor: &Participant,
locked_chunks: &[u64],
tasks: &[Task],
) -> Result<(), CoordinatorError> {
) -> Result<Vec<StorageAction>, CoordinatorError> {
warn!("Removing locked chunks and all impacted contributions");

// Remove the lock from the specified chunks.
self.remove_locks_unsafe(storage, contributor, locked_chunks)?;
let mut actions = self.remove_locks_unsafe(contributor, locked_chunks)?;
warn!("Removed locked chunks");

// Remove the contributions from the specified chunks.
self.remove_chunk_contributions_unsafe(storage, contributor, tasks)?;
actions.append(&mut self.remove_chunk_contributions_unsafe(contributor, tasks)?);
warn!("Removed impacted contributions");

self.contributor_ids = self
Expand All @@ -820,7 +819,7 @@ impl Round {
.filter(|participant| participant != contributor)
.collect();

Ok(())
Ok(actions)
}

///
Expand All @@ -835,10 +834,9 @@ impl Round {
#[inline]
pub(crate) fn remove_locks_unsafe(
&mut self,
storage: &mut Disk,
participant: &Participant,
locked_chunks: &[u64],
) -> Result<(), CoordinatorError> {
) -> Result<Vec<StorageAction>, CoordinatorError> {
// Sanity check that the participant holds the lock for each specified chunk.
let locked_chunks: Vec<_> = locked_chunks
.par_iter()
Expand All @@ -854,8 +852,9 @@ impl Round {

// Remove the response locator for a contributor, and remove the next challenge locator
// for both a contributor and verifier.
for chunk_id in locked_chunks.into_iter() {
match participant {
let actions = locked_chunks.into_iter().map(|chunk_id| {
let mut actions: Vec<StorageAction> = Vec::new();
match &participant {
Participant::Contributor(_) => {
// Check that the participant is an *authorized* contributor
// for the current round.
Expand Down Expand Up @@ -887,25 +886,22 @@ impl Round {
next_contribution_id,
false,
));
if storage.exists(&response_locator) {
storage.remove(&response_locator)?;
}

actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator)));

// Removing contribution file signature for pending task
let response_signature_locator = Locator::ContributionFileSignature(
let unverified_response_signature_locator = Locator::ContributionFileSignature(
ContributionSignatureLocator::new(current_round_height, *chunk_id, next_contribution_id, false),
);
if storage.exists(&response_signature_locator) {
storage.remove(&response_signature_locator)?;
}

actions.push(StorageAction::RemoveIfExists(RemoveAction::new(unverified_response_signature_locator)));

// Removing contribution file signature for verified task
let response_signature_locator = Locator::ContributionFileSignature(
let verified_response_signature_locator = Locator::ContributionFileSignature(
ContributionSignatureLocator::new(current_round_height, *chunk_id, next_contribution_id, true),
);
if storage.exists(&response_signature_locator) {
storage.remove(&response_signature_locator)?;
}

actions.push(StorageAction::RemoveIfExists(RemoveAction::new(verified_response_signature_locator)));

// TODO: revisit the logic of removing challenges
// https://github.com/AleoHQ/aleo-setup/issues/250
Expand Down Expand Up @@ -934,11 +930,10 @@ impl Round {
true,
))
};

// Don't remove initial challenge
if storage.exists(&contribution_file)
&& chunk.current_contribution()?.get_contributor().is_some()
{
storage.remove(&contribution_file)?;
if chunk.current_contribution()?.get_contributor().is_some() {
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(contribution_file)));
}
}
}
Expand All @@ -960,9 +955,7 @@ impl Round {
)),
};

if storage.exists(&response_locator) {
storage.remove(&response_locator)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator)));

let response_locator_signature = match is_final_contribution {
true => Locator::ContributionFileSignature(ContributionSignatureLocator::new(
Expand All @@ -979,19 +972,27 @@ impl Round {
)),
};

if storage.exists(&response_locator_signature) {
storage.remove(&response_locator_signature)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator_signature)));
}
};

warn!("Removing the lock for chunk {} from {}", chunk_id, participant);

// Remove the lock for each given chunk ID.
chunk.set_lock_holder_unsafe(None);
}

Ok(())
Ok(actions)
})
// flat map the results so they can be collected into a single Vec
.flat_map(|result| {
match result {
Ok(ok) => ok.into_iter().map(|action| Ok(action)).collect(),
Err(err) => vec![Err(err)],
}
})
.collect::<Result<Vec<StorageAction>, CoordinatorError>>()?;

Ok(actions)
}

///
Expand All @@ -1009,20 +1010,19 @@ impl Round {
///
#[tracing::instrument(
level = "error",
skip(self, storage, tasks),
skip(self, tasks),
fields(round = self.round_height())
)]
pub(crate) fn remove_chunk_contributions_unsafe(
&mut self,
storage: &mut Disk,
participant: &Participant,
tasks: &[Task],
) -> Result<(), CoordinatorError> {
) -> Result<Vec<StorageAction>, CoordinatorError> {
// Check if the participant is a verifier. As verifications are not dependent
// on each other, no further update is necessary in the round state.
if participant.is_verifier() {
warn!("Skipping removal of contributions as {} is a verifier", participant);
return Ok(());
return Ok(Vec::new());
}

// Check that the participant is in the current contributors ID.
Expand All @@ -1032,41 +1032,30 @@ impl Round {
}

// Remove the given contribution from each chunk in the current round.
for task in tasks {
tasks.iter().map(|task| {
let mut actions: Vec<StorageAction> = Vec::new();
let chunk = self.chunk_mut(task.chunk_id())?;
if let Ok(contribution) = chunk.get_contribution(task.contribution_id()) {
warn!("Removing task {:?}", task.to_tuple());

// Remove the unverified contribution file, if it exists.
if let Some(locator) = contribution.get_contributed_location() {
let path = storage.to_locator(&locator)?;
if storage.exists(&path) {
storage.remove(&path)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone())));
}

// Remove the contribution signature file, if it exists.
if let Some(locator) = contribution.get_contributed_signature_location() {
let path = storage.to_locator(&locator)?;
if storage.exists(&path) {
storage.remove(&path)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone())));
}

// Remove the verified contribution file, if it exists.
if let Some(locator) = contribution.get_verified_location() {
let path = storage.to_locator(&locator)?;
if storage.exists(&path) {
storage.remove(&path)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone())));
}

// Remove the verified contribution file signature, if it exists.
if let Some(locator) = contribution.get_verified_signature_location() {
let path = storage.to_locator(&locator)?;
if storage.exists(&path) {
storage.remove(&path)?;
}
actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone())));
}

// Remove the given contribution and all subsequent contributions.
Expand All @@ -1081,9 +1070,17 @@ impl Round {
chunk,
);
}
}

Ok(())
Ok(actions)
})
// flat map the results so they can be collected into a single Vec
.flat_map(|result| {
match result {
Ok(ok) => ok.into_iter().map(|action| Ok(action)).collect(),
Err(err) => vec![Err(err)],
}
})
.collect::<Result<Vec<StorageAction>, CoordinatorError>>()
}

///
Expand Down
20 changes: 19 additions & 1 deletion phase1-coordinator/src/storage/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,33 @@ impl Disk {
Ok(size)
}

pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> {
/// Perform a [StorageAction] to mutate something in storage.
pub fn perform_action(&mut self, action: StorageAction) -> Result<(), CoordinatorError> {
match action {
StorageAction::Remove(remove_action) => {
let locator = remove_action.try_into_locator(self)?;
self.remove(&locator)
}
StorageAction::Update(update_action) => self.update(&update_action.locator, update_action.object),
StorageAction::RemoveIfExists(remove_action) => {
let locator = remove_action.try_into_locator(self)?;
if self.exists(&locator) {
self.remove(&locator)
} else {
Ok(())
}
}
}
}

/// Convenience method to run [Self::perform_action] over
/// something that can provide an [Iterator] of [StorageAction].
pub fn perform_actions(
&mut self,
actions: impl IntoIterator<Item = StorageAction>,
) -> Result<(), CoordinatorError> {
actions.into_iter().map(|action| self.perform_action(action)).collect()
}
}

impl StorageLocator for Disk {
Expand Down
5 changes: 5 additions & 0 deletions phase1-coordinator/src/storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,12 @@ pub struct UpdateAction {
/// [Storage::process()].
#[non_exhaustive]
pub enum StorageAction {
/// Remove an item in storage, will fail if the item does not yet
/// exist.
Remove(RemoveAction),
/// Remove an item in storage if it exists.
RemoveIfExists(RemoveAction),
/// Update an item in storage.
Update(UpdateAction),
}

Expand Down

0 comments on commit c93c1b6

Please sign in to comment.