From 2e0061f9ea672d4aa4ab8bcc7cf21a73f6c65f90 Mon Sep 17 00:00:00 2001 From: Diwakar Sharma Date: Mon, 5 Feb 2024 11:34:43 +0000 Subject: [PATCH 1/3] feat(volume/resize): enforce cluster capacity limit check during volume resize Signed-off-by: Diwakar Sharma --- .../src/bin/core/tests/volume/resize.rs | 128 ++++++++++++++++-- .../agents/src/bin/core/volume/service.rs | 24 +++- .../agents/src/bin/core/volume/specs.rs | 18 +++ .../grpc/src/operations/volume/traits.rs | 18 +-- control-plane/rest/service/src/v0/volumes.rs | 2 +- .../src/types/v0/transport/volume.rs | 6 +- 6 files changed, 167 insertions(+), 29 deletions(-) diff --git a/control-plane/agents/src/bin/core/tests/volume/resize.rs b/control-plane/agents/src/bin/core/tests/volume/resize.rs index a3633a779..129004834 100644 --- a/control-plane/agents/src/bin/core/tests/volume/resize.rs +++ b/control-plane/agents/src/bin/core/tests/volume/resize.rs @@ -4,11 +4,17 @@ use grpc::operations::{ volume::traits::VolumeOperations, }; use std::time::Duration; -use stor_port::types::v0::transport::{ - CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, Volume, VolumeId, - VolumeShareProtocol, +use stor_port::{ + transport_api::ReplyErrorKind, + types::v0::transport::{ + CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, VolumeShareProtocol, + }, }; +const SIZE: u64 = 50 * 1024 * 1024; // 50MiB +const EXPANDED_SIZE: u64 = 2 * SIZE; // 100MiB +const CAPACITY_LIMIT_DIFF: u64 = 20 * 1024 * 1024; // 20MiB + /// Validate that the size of volume and replicas are as per expected_size /// Return true if validation is successful, otherwise false. async fn validate_resized_volume( @@ -46,9 +52,9 @@ async fn validate_resized_volume( } #[tokio::test] -async fn resize_unpublished() { +async fn resize_unpublished_and_published() { let cluster = ClusterBuilder::builder() - .with_rest(true) + .with_rest(false) .with_agents(vec!["core"]) .with_io_engines(3) .with_pool(0, "malloc:///p1?size_mb=200") @@ -56,6 +62,7 @@ async fn resize_unpublished() { .with_pool(2, "malloc:///p1?size_mb=200") .with_cache_period("1s") .with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1)) + .with_options(|o| o.with_isolated_io_engine(true)) .build() .await .unwrap(); @@ -85,8 +92,8 @@ async fn resize_unpublished() { .resize( &ResizeVolume { uuid: volume.uuid().clone(), - requested_size: 2 * volume.spec().size, - capacity_limit: None, + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: None, }, None, ) @@ -125,8 +132,8 @@ async fn resize_published(cluster: &Cluster) { .create( &CreateVolume { uuid: "df3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(), - size: 50 * 1024 * 1024, - replicas: 3, + size: SIZE, + replicas: 1, thin: false, ..Default::default() }, @@ -152,8 +159,8 @@ async fn resize_published(cluster: &Cluster) { .resize( &ResizeVolume { uuid: volume.uuid().clone(), - requested_size: 2 * volume.spec().size, - capacity_limit: None, + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: None, }, None, ) @@ -177,7 +184,7 @@ async fn resize_published(cluster: &Cluster) { #[tokio::test] async fn resize_on_no_capacity_pool() { let cluster = ClusterBuilder::builder() - .with_rest(true) + .with_rest(false) .with_agents(vec!["core"]) .with_io_engines(3) .with_pool(0, "malloc:///p1?size_mb=200") @@ -185,6 +192,7 @@ async fn resize_on_no_capacity_pool() { .with_pool(2, "malloc:///p1?size_mb=100") .with_cache_period("1s") .with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1)) + .with_options(|o| o.with_isolated_io_engine(true)) .build() .await .unwrap(); @@ -195,7 +203,7 @@ async fn resize_on_no_capacity_pool() { .create( &CreateVolume { uuid: "de3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(), - size: 50 * 1024 * 1024, + size: SIZE, replicas: 3, thin: false, ..Default::default() @@ -209,8 +217,8 @@ async fn resize_on_no_capacity_pool() { .resize( &ResizeVolume { uuid: volume.uuid().clone(), - requested_size: 2 * volume.spec().size, - capacity_limit: None, + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: None, }, None, ) @@ -228,3 +236,93 @@ async fn resize_on_no_capacity_pool() { // TODO: Add reclaim monitor validations for replicas that got resized as part // of this failed volume resize. } + +#[tokio::test] +async fn resize_with_cluster_capacity_limit() { + let cluster = ClusterBuilder::builder() + .with_rest(false) + .with_agents(vec!["core"]) + .with_io_engines(2) + .with_pool(0, "malloc:///p1?size_mb=200") + .with_pool(1, "malloc:///p1?size_mb=200") + .with_cache_period("1s") + .with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1)) + .with_options(|o| o.with_isolated_io_engine(true)) + .build() + .await + .unwrap(); + + let vol_cli = cluster.grpc_client().volume(); + + // resize exceeding the capacity limit + grpc_resize_volume_with_limit( + &vol_cli, + Some(EXPANDED_SIZE - CAPACITY_LIMIT_DIFF), + Some(ReplyErrorKind::CapacityLimitExceeded {}), + ) + .await; + + // resize within the capacity limit + grpc_resize_volume_with_limit(&vol_cli, Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF), None).await; +} + +async fn grpc_resize_volume_with_limit( + volume_client: &dyn VolumeOperations, + capacity: Option, + expected_error: Option, +) { + let volume = volume_client + .create( + &CreateVolume { + uuid: "de3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(), + size: SIZE, + replicas: 2, + thin: false, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let result = volume_client + .resize( + &ResizeVolume { + uuid: volume.uuid().clone(), + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: capacity, + }, + None, + ) + .await; + + match result { + Ok(resized_volume) => { + assert!(resized_volume.spec().uuid == volume.spec().uuid); + assert!(resized_volume.spec().size == EXPANDED_SIZE); + volume_client + .destroy( + &DestroyVolume { + uuid: resized_volume.uuid().try_into().unwrap(), + }, + None, + ) + .await + .unwrap(); + assert!(expected_error.is_none()); + } + Err(e) => { + assert_eq!(expected_error, Some(e.kind)); // wrong error + // Volume not needed anymore. + volume_client + .destroy( + &DestroyVolume { + uuid: volume.uuid().try_into().unwrap(), + }, + None, + ) + .await + .unwrap(); + } + } +} diff --git a/control-plane/agents/src/bin/core/volume/service.rs b/control-plane/agents/src/bin/core/volume/service.rs index 05b038030..7ae641417 100644 --- a/control-plane/agents/src/bin/core/volume/service.rs +++ b/control-plane/agents/src/bin/core/volume/service.rs @@ -47,6 +47,7 @@ use stor_port::{ pub(super) struct Service { registry: Registry, create_volume_limiter: std::sync::Arc, + capacity_limit_borrow: std::sync::Arc>, } #[tonic::async_trait] @@ -243,6 +244,7 @@ impl Service { create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new( registry.create_volume_limit(), )), + capacity_limit_borrow: std::sync::Arc::new(parking_lot::RwLock::new(0)), registry, } } @@ -578,6 +580,26 @@ impl Service { #[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.uuid))] pub(super) async fn resize_volume(&self, request: &ResizeVolume) -> Result { let mut volume = self.specs().volume(&request.uuid).await?; - volume.resize(&self.registry, request).await + + let Some(limit) = request.cluster_capacity_limit() else { + return volume.resize(&self.registry, request).await; + }; + + let required = request.requested_size - volume.as_ref().size; + *self.capacity_limit_borrow.write() += required; + // If there is a defined system wide capacity limit, ensure we don't breach that. + let current = *self.capacity_limit_borrow.read(); + self.specs() + .check_capacity_limit_for_resize(limit, current) + .map_err(|e| { + *self.capacity_limit_borrow.write() -= required; + e + })?; + + let resize_ret = volume.resize(&self.registry, request).await; + // Reset the capacity limit that we consumed and will now be accounted in the system's + // current total. + *self.capacity_limit_borrow.write() -= required; + resize_ret } } diff --git a/control-plane/agents/src/bin/core/volume/specs.rs b/control-plane/agents/src/bin/core/volume/specs.rs index f01f23ab4..0b839529e 100644 --- a/control-plane/agents/src/bin/core/volume/specs.rs +++ b/control-plane/agents/src/bin/core/volume/specs.rs @@ -816,6 +816,24 @@ impl ResourceSpecsLocked { } } + pub(crate) fn check_capacity_limit_for_resize( + &self, + cluster_capacity_limit: u64, + current_borrowed_limit: u64, + ) -> Result<(), SvcError> { + let specs = self.write(); + let total: u64 = specs.volumes.values().map(|v| v.lock().size).sum(); + let forthcoming_total = current_borrowed_limit + total; + tracing::trace!(current_borrowed_limit=%current_borrowed_limit, total=%total, forthcoming_total=%forthcoming_total, "Cluster capacity limit checks "); + if forthcoming_total > cluster_capacity_limit { + return Err(SvcError::CapacityLimitExceeded { + cluster_capacity_limit, + excess: forthcoming_total - cluster_capacity_limit, + }); + } + Ok(()) + } + /// Worker that reconciles dirty VolumeSpecs's with the persistent store. /// This is useful when volume operations are performed but we fail to /// update the spec with the persistent store. diff --git a/control-plane/grpc/src/operations/volume/traits.rs b/control-plane/grpc/src/operations/volume/traits.rs index 179d78951..06806e080 100644 --- a/control-plane/grpc/src/operations/volume/traits.rs +++ b/control-plane/grpc/src/operations/volume/traits.rs @@ -1127,7 +1127,7 @@ impl From<&dyn DestroyVolumeInfo> for DestroyVolumeRequest { pub struct ValidatedResizeVolumeRequest { uuid: VolumeId, requested_size: u64, - capacity_limit: Option, + cluster_capacity_limit: Option, } /// Trait to be implemented for ResizeVolume operation. pub trait ResizeVolumeInfo: Send + Sync + std::fmt::Debug { @@ -1136,7 +1136,7 @@ pub trait ResizeVolumeInfo: Send + Sync + std::fmt::Debug { /// Requested new size of the volume, in bytes fn req_size(&self) -> u64; /// Total capacity limit for all volumes, in bytes - fn capacity_limit(&self) -> Option; + fn cluster_capacity_limit(&self) -> Option; } impl ResizeVolumeInfo for ResizeVolume { @@ -1148,8 +1148,8 @@ impl ResizeVolumeInfo for ResizeVolume { self.requested_size } - fn capacity_limit(&self) -> Option { - self.capacity_limit + fn cluster_capacity_limit(&self) -> Option { + self.cluster_capacity_limit } } @@ -1159,7 +1159,7 @@ impl ValidateRequestTypes for ResizeVolumeRequest { Ok(ValidatedResizeVolumeRequest { uuid: VolumeId::try_from(StringValue(Some(self.uuid)))?, requested_size: self.requested_size, - capacity_limit: self.capacity_limit, + cluster_capacity_limit: self.capacity_limit, }) } } @@ -1169,7 +1169,7 @@ impl From<&dyn ResizeVolumeInfo> for ResizeVolume { Self { uuid: data.uuid(), requested_size: data.req_size(), - capacity_limit: data.capacity_limit(), + cluster_capacity_limit: data.cluster_capacity_limit(), } } } @@ -1179,7 +1179,7 @@ impl From<&dyn ResizeVolumeInfo> for ResizeVolumeRequest { Self { uuid: data.uuid().to_string(), requested_size: data.req_size(), - capacity_limit: data.capacity_limit(), + capacity_limit: data.cluster_capacity_limit(), } } } @@ -1193,8 +1193,8 @@ impl ResizeVolumeInfo for ValidatedResizeVolumeRequest { self.requested_size } - fn capacity_limit(&self) -> Option { - self.capacity_limit + fn cluster_capacity_limit(&self) -> Option { + self.cluster_capacity_limit } } diff --git a/control-plane/rest/service/src/v0/volumes.rs b/control-plane/rest/service/src/v0/volumes.rs index 114447bb6..09828aac7 100644 --- a/control-plane/rest/service/src/v0/volumes.rs +++ b/control-plane/rest/service/src/v0/volumes.rs @@ -205,7 +205,7 @@ impl apis::actix_server::Volumes for RestApi { &ResizeVolume { uuid: volume_id.into(), requested_size: resize_volume_body.size as u64, - capacity_limit: None, + cluster_capacity_limit: None, }, None, ) diff --git a/control-plane/stor-port/src/types/v0/transport/volume.rs b/control-plane/stor-port/src/types/v0/transport/volume.rs index 14679a35d..a9fff9083 100644 --- a/control-plane/stor-port/src/types/v0/transport/volume.rs +++ b/control-plane/stor-port/src/types/v0/transport/volume.rs @@ -477,15 +477,15 @@ pub struct ResizeVolume { /// The requested new size of the volume in bytes. pub requested_size: u64, /// Total capacity limit of all volumes' provisioning. - pub capacity_limit: Option, + pub cluster_capacity_limit: Option, } impl ResizeVolume { /// Create a new `ResizeVolume` request. - pub fn new(uuid: VolumeId, requested_size: u64, capacity_limit: Option) -> Self { + pub fn new(uuid: VolumeId, requested_size: u64, cluster_capacity_limit: Option) -> Self { Self { uuid, requested_size, - capacity_limit, + cluster_capacity_limit, } } } From 8dea6d272ea0dd30a33eb7adf3866029a6d9b7f4 Mon Sep 17 00:00:00 2001 From: Diwakar Sharma Date: Mon, 5 Feb 2024 11:38:22 +0000 Subject: [PATCH 2/3] test(volume/resize): add test for concurrent cluster capacity limit checks Signed-off-by: Diwakar Sharma --- .../src/bin/core/tests/volume/resize.rs | 131 +++++++++++++++++- 1 file changed, 128 insertions(+), 3 deletions(-) diff --git a/control-plane/agents/src/bin/core/tests/volume/resize.rs b/control-plane/agents/src/bin/core/tests/volume/resize.rs index 129004834..98c9ad716 100644 --- a/control-plane/agents/src/bin/core/tests/volume/resize.rs +++ b/control-plane/agents/src/bin/core/tests/volume/resize.rs @@ -5,12 +5,15 @@ use grpc::operations::{ }; use std::time::Duration; use stor_port::{ - transport_api::ReplyErrorKind, + transport_api::{ReplyError, ReplyErrorKind}, types::v0::transport::{ - CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, VolumeShareProtocol, + CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, Volume, VolumeId, + VolumeShareProtocol, }, }; +use uuid::Uuid; + const SIZE: u64 = 50 * 1024 * 1024; // 50MiB const EXPANDED_SIZE: u64 = 2 * SIZE; // 100MiB const CAPACITY_LIMIT_DIFF: u64 = 20 * 1024 * 1024; // 20MiB @@ -233,6 +236,28 @@ async fn resize_on_no_capacity_pool() { let vol_obj = &v_arr.entries[0]; // Size shouldn't have changed. assert!(vol_obj.spec().size == volume.spec().size); + + // try a resize again, this time setting cluster capacity limit. + let _ = vol_cli + .resize( + &ResizeVolume { + uuid: volume.uuid().clone(), + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF), + }, + None, + ) + .await + .expect_err("Expected error due to insufficient pool capacity"); + + let v_arr = vol_cli + .get(Filter::Volume(volume.spec().uuid), false, None, None) + .await + .unwrap(); + let vol_obj = &v_arr.entries[0]; + // Size shouldn't have changed. + assert!(vol_obj.spec().size == volume.spec().size); + // TODO: Add reclaim monitor validations for replicas that got resized as part // of this failed volume resize. } @@ -264,6 +289,79 @@ async fn resize_with_cluster_capacity_limit() { // resize within the capacity limit grpc_resize_volume_with_limit(&vol_cli, Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF), None).await; + // resize a new volume, but reduce the limit set previously. The limit balance + // calculations are expected to work based on reduced limit value now. + grpc_resize_volume_with_limit( + &vol_cli, + Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF / 2), + None, + ) + .await; +} + +// Take 600MiB pool. Create five volumes of 50MiB each, totalling a usage of 250MiB. +// Set cluster capacity limit to 400MiB and expand all five volumes to 100MiB. +// Since remaining limit is 150MiB, three volumes should successfully expand and +// two must fail to expand. +#[tokio::test] +async fn resize_with_cluster_capacity_limit_concurrent() { + let cluster = ClusterBuilder::builder() + .with_rest(false) + .with_agents(vec!["core"]) + .with_io_engines(2) + .with_pool(0, "malloc:///p1?size_mb=600") + .with_cache_period("1s") + .with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1)) + .with_options(|o| o.with_isolated_io_engine(true)) + .build() + .await + .unwrap(); + + let num_volumes = 5; + let mut success = 0; + let mut failure = 0; + let vol_cli = cluster.grpc_client().volume(); + let volume_ids = create_volumes(&vol_cli, 5).await; + let mut results = Vec::with_capacity(num_volumes); + + // Create a channel to collect results from the concurrent tasks + let (tx, mut rx) = tokio::sync::mpsc::channel::>(num_volumes); + // For each task + let refc_vol_cli = std::sync::Arc::new(vol_cli); + + for volume_id in volume_ids { + let refp_vol_cli = std::sync::Arc::clone(&refc_vol_cli); + let tx = tx.clone(); + tokio::spawn(async move { + let hdl = refp_vol_cli + .resize( + &ResizeVolume { + uuid: volume_id.try_into().unwrap(), + requested_size: EXPANDED_SIZE, + cluster_capacity_limit: Some(4 * EXPANDED_SIZE), + }, + None, + ) + .await; + + // Send the result to the channel + tx.send(hdl).await.unwrap(); + }); + } + // Collect results from the channel + for _ in 0 .. num_volumes { + results.push(rx.recv().await.unwrap()); + } + + results.iter().for_each(|r| { + if r.is_ok() { + success += 1; + } else { + failure += 1; + } + }); + assert_eq!(success, 3); + assert_eq!(failure, 2); } async fn grpc_resize_volume_with_limit( @@ -271,10 +369,12 @@ async fn grpc_resize_volume_with_limit( capacity: Option, expected_error: Option, ) { + let vol_uuid = Uuid::new_v4(); + let volume = volume_client .create( &CreateVolume { - uuid: "de3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(), + uuid: vol_uuid.try_into().unwrap(), size: SIZE, replicas: 2, thin: false, @@ -326,3 +426,28 @@ async fn grpc_resize_volume_with_limit( } } } + +// Creates count number of volumes, and return the uuid of volume to be resized. +async fn create_volumes(volume_client: &dyn VolumeOperations, count: u64) -> Vec { + let mut volumes = Vec::with_capacity(count as usize); + for _ in 0 .. count { + let vol_uuid = Uuid::new_v4(); + let volume = volume_client + .create( + &CreateVolume { + uuid: vol_uuid.try_into().unwrap(), + size: SIZE, + replicas: 1, + thin: false, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + volumes.push(volume.uuid().try_into().unwrap()) + } + + volumes +} From 5f68705f583761d6ad4c010e353487fb0790e929 Mon Sep 17 00:00:00 2001 From: Diwakar Sharma Date: Thu, 15 Feb 2024 12:12:52 +0000 Subject: [PATCH 3/3] feat(volume/resize): add volume resize to rest plugin Signed-off-by: Diwakar Sharma --- control-plane/plugin/src/operations.rs | 5 +++ control-plane/plugin/src/resources/error.rs | 6 ++++ control-plane/plugin/src/resources/volume.rs | 33 ++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/control-plane/plugin/src/operations.rs b/control-plane/plugin/src/operations.rs index 645cf1eb8..211dd2ed9 100644 --- a/control-plane/plugin/src/operations.rs +++ b/control-plane/plugin/src/operations.rs @@ -85,6 +85,11 @@ pub trait Get { pub trait Scale { type ID; async fn scale(id: &Self::ID, replica_count: u8, output: &utils::OutputFormat) -> PluginResult; + async fn resize( + id: &Self::ID, + requested_size: u64, + output: &utils::OutputFormat, + ) -> PluginResult; } /// Replica topology trait. diff --git a/control-plane/plugin/src/resources/error.rs b/control-plane/plugin/src/resources/error.rs index 225540a40..e0c7b4984 100644 --- a/control-plane/plugin/src/resources/error.rs +++ b/control-plane/plugin/src/resources/error.rs @@ -74,6 +74,12 @@ pub enum Error { id: String, source: openapi::tower::client::Error, }, + /// Error when resize volume request fails. + #[snafu(display("Failed to resize volume {id}. Error {source}"))] + ResizeVolumeError { + id: String, + source: openapi::tower::client::Error, + }, /// Error when set volume property request fails. #[snafu(display("Failed to set volume {id} property, Error {source}"))] ScaleVolumePropertyError { diff --git a/control-plane/plugin/src/resources/volume.rs b/control-plane/plugin/src/resources/volume.rs index 8bc9cf1ac..a58ebe4f1 100644 --- a/control-plane/plugin/src/resources/volume.rs +++ b/control-plane/plugin/src/resources/volume.rs @@ -190,6 +190,39 @@ impl Scale for Volume { } Ok(()) } + + async fn resize( + id: &Self::ID, + requested_size: u64, + output: &utils::OutputFormat, + ) -> PluginResult { + let req = openapi::models::ResizeVolumeBody { + size: requested_size as usize, + }; + match RestClient::client() + .volumes_api() + .put_volume_size(id, req) + .await + { + Ok(volume) => match output { + OutputFormat::Yaml | OutputFormat::Json => { + // Print json or yaml based on output format. + utils::print_table(output, volume.into_body()); + } + OutputFormat::None => { + // In case the output format is not specified, show a success message. + println!("Volume {id} resized successfully 🚀") + } + }, + Err(source) => { + return Err(Error::ScaleVolumeError { + id: id.to_string(), + source, + }); + } + } + Ok(()) + } } #[async_trait(?Send)]