-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#3886 3/4] Start saga for region replacement #5839
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1672,6 +1672,50 @@ impl DataStore { | |
} | ||
} | ||
|
||
/// Check if a region is present in a Volume Construction Request | ||
fn region_in_vcr( | ||
vcr: &VolumeConstructionRequest, | ||
region: &SocketAddrV6, | ||
) -> anyhow::Result<bool> { | ||
let mut parts: VecDeque<&VolumeConstructionRequest> = VecDeque::new(); | ||
parts.push_back(vcr); | ||
|
||
let mut region_found = false; | ||
|
||
while let Some(vcr_part) = parts.pop_front() { | ||
match vcr_part { | ||
VolumeConstructionRequest::Volume { sub_volumes, .. } => { | ||
for sub_volume in sub_volumes { | ||
parts.push_back(sub_volume); | ||
} | ||
|
||
// Skip looking at read-only parent, this function only looks | ||
// for R/W regions | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for now... |
||
} | ||
|
||
VolumeConstructionRequest::Url { .. } => { | ||
// nothing required | ||
} | ||
|
||
VolumeConstructionRequest::Region { opts, .. } => { | ||
for target in &opts.target { | ||
let parsed_target: SocketAddrV6 = target.parse()?; | ||
if parsed_target == *region { | ||
region_found = true; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
VolumeConstructionRequest::File { .. } => { | ||
// nothing required | ||
} | ||
} | ||
} | ||
|
||
Ok(region_found) | ||
} | ||
|
||
pub struct VolumeReplacementParams { | ||
pub volume_id: Uuid, | ||
pub region_id: Uuid, | ||
|
@@ -1796,6 +1840,61 @@ impl DataStore { | |
.transaction(&conn, |conn| { | ||
let err = err.clone(); | ||
async move { | ||
// Grab the old volume first | ||
let maybe_old_volume = { | ||
volume_dsl::volume | ||
.filter(volume_dsl::id.eq(existing.volume_id)) | ||
.select(Volume::as_select()) | ||
.first_async::<Volume>(&conn) | ||
.await | ||
.optional() | ||
.map_err(|e| { | ||
err.bail_retryable_or_else(e, |e| { | ||
VolumeReplaceRegionError::Public( | ||
public_error_from_diesel( | ||
e, | ||
ErrorHandler::Server, | ||
) | ||
) | ||
}) | ||
})? | ||
}; | ||
|
||
let old_volume = if let Some(old_volume) = maybe_old_volume { | ||
old_volume | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason you don't do the
pattern here like you do below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really, though it does reduce the indentation to bail early here. |
||
} else { | ||
// Existing volume was deleted, so return an error. We | ||
// can't perform the region replacement now! | ||
return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the only possible outcome other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I understand it, |
||
}; | ||
|
||
let old_vcr: VolumeConstructionRequest = | ||
match serde_json::from_str(&old_volume.data()) { | ||
Ok(vcr) => vcr, | ||
Err(e) => { | ||
return Err(err.bail(VolumeReplaceRegionError::SerdeError(e))); | ||
}, | ||
}; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this section just making sure that the replacement did not already happen in another There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section makes the whole function idempotent, yeah, but not for another saga - only one saga can win the lock that happens at the beginning of the start saga, so it's only guarding against when a node is rerun. |
||
// Does it look like this replacement already happened? | ||
let old_region_in_vcr = match region_in_vcr(&old_vcr, &existing.region_addr) { | ||
Ok(v) => v, | ||
Err(e) => { | ||
return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e))); | ||
}, | ||
}; | ||
let new_region_in_vcr = match region_in_vcr(&old_vcr, &replacement.region_addr) { | ||
Ok(v) => v, | ||
Err(e) => { | ||
return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e))); | ||
}, | ||
}; | ||
|
||
if !old_region_in_vcr && new_region_in_vcr { | ||
// It does seem like the replacement happened | ||
return Ok(()); | ||
} | ||
|
||
use db::schema::region::dsl as region_dsl; | ||
use db::schema::volume::dsl as volume_dsl; | ||
|
||
|
@@ -1838,40 +1937,6 @@ impl DataStore { | |
// Update the existing volume's construction request to | ||
// replace the existing region's SocketAddrV6 with the | ||
// replacement region's | ||
let maybe_old_volume = { | ||
volume_dsl::volume | ||
.filter(volume_dsl::id.eq(existing.volume_id)) | ||
.select(Volume::as_select()) | ||
.first_async::<Volume>(&conn) | ||
.await | ||
.optional() | ||
.map_err(|e| { | ||
err.bail_retryable_or_else(e, |e| { | ||
VolumeReplaceRegionError::Public( | ||
public_error_from_diesel( | ||
e, | ||
ErrorHandler::Server, | ||
) | ||
) | ||
}) | ||
})? | ||
}; | ||
|
||
let old_volume = if let Some(old_volume) = maybe_old_volume { | ||
old_volume | ||
} else { | ||
// existing volume was deleted, so return an error, we | ||
// can't perform the region replacement now! | ||
return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); | ||
}; | ||
|
||
let old_vcr: VolumeConstructionRequest = | ||
match serde_json::from_str(&old_volume.data()) { | ||
Ok(vcr) => vcr, | ||
Err(e) => { | ||
return Err(err.bail(VolumeReplaceRegionError::SerdeError(e))); | ||
}, | ||
}; | ||
|
||
// Copy the old volume's VCR, changing out the old region | ||
// for the new. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this function can just return a bool instead of wrapping it in a
Result
. There are no errors possible here. That would simplify the callers.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's unfortunately a
let parsed_target: SocketAddrV6 = target.parse()?;
that prevents this :(There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah damn, I totally missed that. All good.