Skip to content

Commit

Permalink
use a dedicated type to represent task statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
Trisfald committed Oct 1, 2024
1 parent 9269c37 commit 7ad602e
Showing 1 changed file with 66 additions and 46 deletions.
112 changes: 66 additions & 46 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,10 @@ fn split_shard_task(
resharder: FlatStorageResharderInner,
controller: FlatStorageResharderController,
) {
let success = split_shard_task_impl(resharder.clone(), controller.clone());
split_shard_task_postprocessing(resharder, success);
info!(target: "resharding", "flat storage shard split task finished, success: {success}");
if let Err(err) = controller.completion_sender.send(success) {
let task_status = split_shard_task_impl(resharder.clone(), controller.clone());
split_shard_task_postprocessing(resharder, task_status);
info!(target: "resharding", ?task_status, "flat storage shard split task finished");
if let Err(err) = controller.completion_sender.send(task_status) {
warn!(target: "resharding", ?err, "error notifying completion of flat storage shard split task")
};
}
Expand All @@ -349,9 +349,9 @@ fn get_parent_shard_and_status(
fn split_shard_task_impl(
resharder: FlatStorageResharderInner,
controller: FlatStorageResharderController,
) -> bool {
) -> FlatStorageReshardingTaskStatus {
if controller.is_interrupted() {
return false;
return FlatStorageReshardingTaskStatus::Interrupted;
}

/// Determines after how many key-values the process stops to
Expand Down Expand Up @@ -383,12 +383,12 @@ fn split_shard_task_impl(
shard_split_handle_key_value(key, value, &mut store_update, &status)
{
error!(target: "resharding", ?err, "failed to handle flat storage key");
return false;
return FlatStorageReshardingTaskStatus::Failed;
}
}
Some(Err(err)) => {
error!(target: "resharding", ?err, "failed to read flat storage value from parent shard");
return false;
return FlatStorageReshardingTaskStatus::Failed;
}
None => {
iter_exhausted = true;
Expand All @@ -399,7 +399,7 @@ fn split_shard_task_impl(
// Make a pause to commit and check if the routine should stop.
if let Err(err) = store_update.commit() {
error!(target: "resharding", ?err, "failed to commit store update");
return false;
return FlatStorageReshardingTaskStatus::Failed;
}

// TODO(Trisfald): metrics and logs
Expand All @@ -409,10 +409,10 @@ fn split_shard_task_impl(
break;
}
if controller.is_interrupted() {
return false;
return FlatStorageReshardingTaskStatus::Interrupted;
}
}
true
FlatStorageReshardingTaskStatus::Successful
}

/// Handles the inheritance of a key-value pair from parent shard to children shards.
Expand Down Expand Up @@ -508,42 +508,48 @@ fn shard_split_handle_key_value(

/// Performs post-processing of shard splitting after all key-values have been moved from parent to children.
/// `success` indicates whether or not the previous phase was successful.
fn split_shard_task_postprocessing(resharder: FlatStorageResharderInner, success: bool) {
let (parent_shard, status) = get_parent_shard_and_status(&resharder);
let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status;
fn split_shard_task_postprocessing(
resharder: FlatStorageResharderInner,
task_status: FlatStorageReshardingTaskStatus,
) {
let (parent_shard, split_status) = get_parent_shard_and_status(&resharder);
let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = split_status;
let flat_store = resharder.runtime.store().flat_store();
info!(target: "resharding", ?parent_shard, ?success, ?status, "flat storage shard split task: post-processing");
info!(target: "resharding", ?parent_shard, ?task_status, ?split_status, "flat storage shard split task: post-processing");

let mut store_update = flat_store.store_update();
if success {
// Split shard completed successfully.
// Parent flat storage can be deleted from the FlatStoreManager.
resharder
.runtime
.get_flat_storage_manager()
.remove_flat_storage_for_shard(parent_shard, &mut store_update)
.unwrap();
store_update.remove_flat_storage(parent_shard);
// Children must perform catchup.
for child_shard in [left_child_shard, right_child_shard] {
match task_status {
FlatStorageReshardingTaskStatus::Successful => {
// Split shard completed successfully.
// Parent flat storage can be deleted from the FlatStoreManager.
resharder
.runtime
.get_flat_storage_manager()
.remove_flat_storage_for_shard(parent_shard, &mut store_update)
.unwrap();
store_update.remove_flat_storage(parent_shard);
// Children must perform catchup.
for child_shard in [left_child_shard, right_child_shard] {
store_update.set_flat_storage_status(
child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
flat_head.hash,
)),
);
}
// TODO(trisfald): trigger catchup
}
FlatStorageReshardingTaskStatus::Failed | FlatStorageReshardingTaskStatus::Interrupted => {
// We got an error or an interrupt request.
// Reset parent.
store_update.set_flat_storage_status(
child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(
flat_head.hash,
)),
parent_shard,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }),
);
}
// TODO(trisfald): trigger catchup
} else {
// We got an error or an interrupt request.
// Reset parent.
store_update.set_flat_storage_status(
parent_shard,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }),
);
// Remove children shards leftovers.
for child_shard in [left_child_shard, right_child_shard] {
store_update.remove_flat_storage(child_shard);
// Remove children shards leftovers.
for child_shard in [left_child_shard, right_child_shard] {
store_update.remove_flat_storage(child_shard);
}
}
}
store_update.commit().unwrap();
Expand All @@ -559,6 +565,14 @@ pub enum FlatStorageReshardingEventStatus {
SplitShard(ShardUId, SplittingParentStatus),
}

/// Status of a flat storage resharding task.
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub enum FlatStorageReshardingTaskStatus {
Successful,
Failed,
Interrupted,
}

/// Helps control the flat storage resharder operation. More specifically,
/// it has a way to know when the background task is done or to interrupt it.
#[derive(Clone)]
Expand All @@ -567,9 +581,9 @@ pub struct FlatStorageResharderController {
handle: ReshardingHandle,
/// This object will be used to signal when the background task is completed.
/// A value of `true` means that the operation completed successfully.
completion_sender: Sender<bool>,
completion_sender: Sender<FlatStorageReshardingTaskStatus>,
/// Corresponding receiver for `completion_sender`.
pub completion_receiver: Receiver<bool>,
pub completion_receiver: Receiver<FlatStorageReshardingTaskStatus>,
}

impl FlatStorageResharderController {
Expand Down Expand Up @@ -894,7 +908,10 @@ mod tests {
.is_ok_and(|val| val.is_some()));

// Controller should signal that resharding ended.
assert_eq!(controller.completion_receiver.recv_timeout(Duration::from_secs(1)), Ok(true));
assert_eq!(
controller.completion_receiver.recv_timeout(Duration::from_secs(1)),
Ok(FlatStorageReshardingTaskStatus::Successful)
);

// Check final status of parent flat storage.
let parent = ShardUId { version: 3, shard_id: 1 };
Expand Down Expand Up @@ -946,7 +963,10 @@ mod tests {

// Check that resharding was effectively interrupted.
let flat_store = resharder.inner.runtime.store().flat_store();
assert_eq!(controller.completion_receiver.recv_timeout(Duration::from_secs(1)), Ok(false));
assert_eq!(
controller.completion_receiver.recv_timeout(Duration::from_secs(1)),
Ok(FlatStorageReshardingTaskStatus::Interrupted)
);
assert_eq!(
flat_store.get_flat_storage_status(parent_shard),
Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }))
Expand Down

0 comments on commit 7ad602e

Please sign in to comment.