Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3886 3/4] Start saga for region replacement #5839

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 99 additions & 34 deletions nexus/db-queries/src/db/datastore/volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,50 @@ impl DataStore {
}
}

/// Check if a region is present in a Volume Construction Request
fn region_in_vcr(
vcr: &VolumeConstructionRequest,
region: &SocketAddrV6,
) -> anyhow::Result<bool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this function can just return a bool instead of wrapping it in a Result. There are no errors possible here. That would simplify the callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's unfortunately a let parsed_target: SocketAddrV6 = target.parse()?; that prevents this :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah damn, I totally missed that. All good.

let mut parts: VecDeque<&VolumeConstructionRequest> = VecDeque::new();
parts.push_back(vcr);

let mut region_found = false;

while let Some(vcr_part) = parts.pop_front() {
match vcr_part {
VolumeConstructionRequest::Volume { sub_volumes, .. } => {
for sub_volume in sub_volumes {
parts.push_back(sub_volume);
}

// Skip looking at read-only parent, this function only looks
// for R/W regions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now...

}

VolumeConstructionRequest::Url { .. } => {
// nothing required
}

VolumeConstructionRequest::Region { opts, .. } => {
for target in &opts.target {
let parsed_target: SocketAddrV6 = target.parse()?;
if parsed_target == *region {
region_found = true;
break;
}
}
}

VolumeConstructionRequest::File { .. } => {
// nothing required
}
}
}

Ok(region_found)
}

pub struct VolumeReplacementParams {
pub volume_id: Uuid,
pub region_id: Uuid,
Expand Down Expand Up @@ -1796,6 +1840,61 @@ impl DataStore {
.transaction(&conn, |conn| {
let err = err.clone();
async move {
// Grab the old volume first
let maybe_old_volume = {
volume_dsl::volume
.filter(volume_dsl::id.eq(existing.volume_id))
.select(Volume::as_select())
.first_async::<Volume>(&conn)
.await
.optional()
.map_err(|e| {
err.bail_retryable_or_else(e, |e| {
VolumeReplaceRegionError::Public(
public_error_from_diesel(
e,
ErrorHandler::Server,
)
)
})
})?
};

let old_volume = if let Some(old_volume) = maybe_old_volume {
old_volume
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you don't do the

let old_volume = match volume_dsl::volume.filter....
    Some(ov) => ov,
    _ => return Err..

pattern here like you do below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, though it does reduce the indentation to bail early here.

} else {
// existing volume was deleted, so return an error, we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// existing volume was deleted, so return an error, we
// Existing volume was deleted, so return an error. We

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 85ca4f9

// can't perform the region replacement now!
return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only possible outcome other than Some(old_volume) from the volume_dsl::volume... a
deleted volume?
I guess the err.bail_retryable_or_else() will return error directly if the query fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, bail here will stop the transaction retry from looping. The volume being deleted is fatal for the region replacement request's progress so we bail here.

};

let old_vcr: VolumeConstructionRequest =
match serde_json::from_str(&old_volume.data()) {
Ok(vcr) => vcr,
Err(e) => {
return Err(err.bail(VolumeReplaceRegionError::SerdeError(e)));
},
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this section just making sure that the replacement did not already happen in another
saga and we don't try to redo work that was already done by someone else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section makes the whole function idempotent, yeah, but not for another saga - only one saga can win the lock that happens at the beginning of the start saga, so it's only guarding against when a node is rerun.

// Does it look like this replacement already happened?
let old_region_in_vcr = match region_in_vcr(&old_vcr, &existing.region_addr) {
Ok(v) => v,
Err(e) => {
return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e)));
},
};
let new_region_in_vcr = match region_in_vcr(&old_vcr, &replacement.region_addr) {
Ok(v) => v,
Err(e) => {
return Err(err.bail(VolumeReplaceRegionError::RegionReplacementError(e)));
},
};

if !old_region_in_vcr && new_region_in_vcr {
// It does seem like the replacement happened
return Ok(());
}

use db::schema::region::dsl as region_dsl;
use db::schema::volume::dsl as volume_dsl;

Expand Down Expand Up @@ -1838,40 +1937,6 @@ impl DataStore {
// Update the existing volume's construction request to
// replace the existing region's SocketAddrV6 with the
// replacement region's
let maybe_old_volume = {
volume_dsl::volume
.filter(volume_dsl::id.eq(existing.volume_id))
.select(Volume::as_select())
.first_async::<Volume>(&conn)
.await
.optional()
.map_err(|e| {
err.bail_retryable_or_else(e, |e| {
VolumeReplaceRegionError::Public(
public_error_from_diesel(
e,
ErrorHandler::Server,
)
)
})
})?
};

let old_volume = if let Some(old_volume) = maybe_old_volume {
old_volume
} else {
// existing volume was deleted, so return an error, we
// can't perform the region replacement now!
return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted));
};

let old_vcr: VolumeConstructionRequest =
match serde_json::from_str(&old_volume.data()) {
Ok(vcr) => vcr,
Err(e) => {
return Err(err.bail(VolumeReplaceRegionError::SerdeError(e)));
},
};

// Copy the old volume's VCR, changing out the old region
// for the new.
Expand Down
176 changes: 164 additions & 12 deletions nexus/src/app/background/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,36 @@
//! Background task for detecting regions that need replacing and beginning that
//! process
//!
//! TODO this is currently a placeholder for a future PR
//! This task's responsibility is to create region replacement requests when
//! physical disks are expunged, and trigger the region replacement start saga
//! for any requests that are in state "Requested". See the documentation there
//! for more information.

use super::common::BackgroundTask;
use crate::app::sagas::SagaRequest;
use crate::app::authn;
use crate::app::sagas;
use crate::app::RegionAllocationStrategy;
use futures::future::BoxFuture;
use futures::FutureExt;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::TypedUuid;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

pub struct RegionReplacementDetector {
_datastore: Arc<DataStore>,
_saga_request: Sender<SagaRequest>,
datastore: Arc<DataStore>,
saga_request: Sender<sagas::SagaRequest>,
}

impl RegionReplacementDetector {
pub fn new(
datastore: Arc<DataStore>,
saga_request: Sender<SagaRequest>,
saga_request: Sender<sagas::SagaRequest>,
) -> Self {
RegionReplacementDetector {
_datastore: datastore,
_saga_request: saga_request,
}
RegionReplacementDetector { datastore, saga_request }
}
}

Expand All @@ -43,15 +47,163 @@ impl BackgroundTask for RegionReplacementDetector {
let log = &opctx.log;
warn!(&log, "region replacement task started");

// TODO
let mut ok = 0;
let mut err = 0;

// Find regions on expunged physical disks
let regions_to_be_replaced =
match self.datastore.find_regions_on_expunged_physical_disks(
opctx
).await {
Ok(regions) => regions,

Err(e) => {
error!(&log, "find_regions_on_expunged_physical_disks failed: {e}");
err += 1;

return json!({
"region_replacement_started_ok": ok,
"region_replacement_started_err": err,
});
}
};

// Then create replacement requests for those if one doesn't exist
// yet.
for region in regions_to_be_replaced {
let maybe_request =
match self.datastore.lookup_region_replacement_request_by_old_region_id(
opctx,
TypedUuid::from_untyped_uuid(region.id()),
).await {
Ok(v) => v,

Err(e) => {
error!(&log, "error looking for existing region replacement requests for {}: {e}", region.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does cargo fmt not try to break this into multiple lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo fmt is so lazy when it comes to macros. I'd still manually shrink these lines though. What happens is that one long line prevents cargo fmt from working properly across the rest of the file. I've had to fix this before and it's frustrating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, done in 5f5d732

continue;
}
};

if maybe_request.is_none() {
match self.datastore.create_region_replacement_request_for_region(opctx, &region).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should handle if another task is also running this code, and that other task finds and inserts the
replacement request just ahead of this, right? I see we continue on error below so I believe if the
race is possible, we still handle it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The maybe_request.is_none() check above is a tiny optimization, but the reality is that this background task will wake up on the N Nexuses that are running, and they'll all try to insert the request record, and then all try to run the start saga for that record.

Ok(request_id) => {
info!(
&log,
"added region replacement request {request_id} for {} volume {}",
region.id(),
region.volume_id(),
);
}

Err(e) => {
error!(
&log,
"error adding region replacement request for region {} volume id {}: {e}",
region.id(),
region.volume_id(),
);
continue;
}
}
}
}

// Next, for each region replacement request in state "Requested", run the start saga.
match self.datastore.get_requested_region_replacements(opctx).await {
Ok(requests) => {
for request in requests {
let result = self.saga_request.send(sagas::SagaRequest::RegionReplacementStart {
params: sagas::region_replacement_start::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
request,
allocation_strategy: RegionAllocationStrategy::RandomWithDistinctSleds { seed: None },
},
}).await;

match result {
Ok(()) => {
ok += 1;
}

Err(e) => {
error!(&log, "sending region replacement request failed: {e}");
err += 1;
}
};
}
}

Err(e) => {
error!(&log, "query for region replacement requests failed: {e}");
}
}

warn!(&log, "region replacement task done");

json!({
"region_replacement_started_ok": 0,
"region_replacement_started_err": 0,
"region_replacement_started_ok": ok,
"region_replacement_started_err": err,
})
}
.boxed()
}
}

#[cfg(test)]
mod test {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)

use super::*;
use nexus_db_model::RegionReplacement;
use nexus_test_utils_macros::nexus_test;
use tokio::sync::mpsc;
use uuid::Uuid;

type ControlPlaneTestContext =
nexus_test_utils::ControlPlaneTestContext<crate::Server>;

#[nexus_test(server = crate::Server)]
async fn test_add_region_replacement_causes_start(
cptestctx: &ControlPlaneTestContext,
) {
let nexus = &cptestctx.server.server_context().nexus;
let datastore = nexus.datastore();
let opctx = OpContext::for_tests(
cptestctx.logctx.log.clone(),
datastore.clone(),
);

let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1);
let mut task =
RegionReplacementDetector::new(datastore.clone(), saga_request_tx);

// Noop test
let result = task.activate(&opctx).await;
assert_eq!(
result,
json!({
"region_replacement_started_ok": 0,
"region_replacement_started_err": 0,
})
);

// Add a region replacement request for a fake region
let request = RegionReplacement::new(Uuid::new_v4(), Uuid::new_v4());

datastore
.insert_region_replacement_request(&opctx, request)
.await
.unwrap();

// Activate the task - it should pick that up and try to run the region
// replacement start saga
let result = task.activate(&opctx).await;
assert_eq!(
result,
json!({
"region_replacement_started_ok": 1,
"region_replacement_started_err": 0,
})
);

saga_request_rx.try_recv().unwrap();
}
}
Loading
Loading