Skip to content

Commit

Permalink
chore(bors): merge pull request #747
Browse files Browse the repository at this point in the history
747: check cluster capacity limit during volume resize, and also allow volume resize via rest plugin r=dsharma-dc a=dsharma-dc

- Enforce cluster capacity limit checks during volume resize.
- Allow volume resize operation via REST plugin.

Co-authored-by: Diwakar Sharma <[email protected]>
  • Loading branch information
mayastor-bors and dsharma-dc committed Feb 26, 2024
2 parents 3d15cf6 + 5f68705 commit 550e4df
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 29 deletions.
253 changes: 238 additions & 15 deletions control-plane/agents/src/bin/core/tests/volume/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@ use grpc::operations::{
volume::traits::VolumeOperations,
};
use std::time::Duration;
use stor_port::types::v0::transport::{
CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, Volume, VolumeId,
VolumeShareProtocol,
use stor_port::{
transport_api::{ReplyError, ReplyErrorKind},
types::v0::transport::{
CreateVolume, DestroyVolume, Filter, PublishVolume, ResizeVolume, Volume, VolumeId,
VolumeShareProtocol,
},
};

use uuid::Uuid;

const SIZE: u64 = 50 * 1024 * 1024; // 50MiB
const EXPANDED_SIZE: u64 = 2 * SIZE; // 100MiB
const CAPACITY_LIMIT_DIFF: u64 = 20 * 1024 * 1024; // 20MiB

/// Validate that the size of volume and replicas are as per expected_size
/// Return true if validation is successful, otherwise false.
async fn validate_resized_volume(
Expand Down Expand Up @@ -46,16 +55,17 @@ async fn validate_resized_volume(
}

#[tokio::test]
async fn resize_unpublished() {
async fn resize_unpublished_and_published() {
let cluster = ClusterBuilder::builder()
.with_rest(true)
.with_rest(false)
.with_agents(vec!["core"])
.with_io_engines(3)
.with_pool(0, "malloc:///p1?size_mb=200")
.with_pool(1, "malloc:///p1?size_mb=200")
.with_pool(2, "malloc:///p1?size_mb=200")
.with_cache_period("1s")
.with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1))
.with_options(|o| o.with_isolated_io_engine(true))
.build()
.await
.unwrap();
Expand Down Expand Up @@ -85,8 +95,8 @@ async fn resize_unpublished() {
.resize(
&ResizeVolume {
uuid: volume.uuid().clone(),
requested_size: 2 * volume.spec().size,
capacity_limit: None,
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: None,
},
None,
)
Expand Down Expand Up @@ -125,8 +135,8 @@ async fn resize_published(cluster: &Cluster) {
.create(
&CreateVolume {
uuid: "df3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(),
size: 50 * 1024 * 1024,
replicas: 3,
size: SIZE,
replicas: 1,
thin: false,
..Default::default()
},
Expand All @@ -152,8 +162,8 @@ async fn resize_published(cluster: &Cluster) {
.resize(
&ResizeVolume {
uuid: volume.uuid().clone(),
requested_size: 2 * volume.spec().size,
capacity_limit: None,
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: None,
},
None,
)
Expand All @@ -177,14 +187,15 @@ async fn resize_published(cluster: &Cluster) {
#[tokio::test]
async fn resize_on_no_capacity_pool() {
let cluster = ClusterBuilder::builder()
.with_rest(true)
.with_rest(false)
.with_agents(vec!["core"])
.with_io_engines(3)
.with_pool(0, "malloc:///p1?size_mb=200")
.with_pool(1, "malloc:///p1?size_mb=200")
.with_pool(2, "malloc:///p1?size_mb=100")
.with_cache_period("1s")
.with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1))
.with_options(|o| o.with_isolated_io_engine(true))
.build()
.await
.unwrap();
Expand All @@ -195,7 +206,7 @@ async fn resize_on_no_capacity_pool() {
.create(
&CreateVolume {
uuid: "de3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(),
size: 50 * 1024 * 1024,
size: SIZE,
replicas: 3,
thin: false,
..Default::default()
Expand All @@ -209,8 +220,8 @@ async fn resize_on_no_capacity_pool() {
.resize(
&ResizeVolume {
uuid: volume.uuid().clone(),
requested_size: 2 * volume.spec().size,
capacity_limit: None,
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: None,
},
None,
)
Expand All @@ -225,6 +236,218 @@ async fn resize_on_no_capacity_pool() {
let vol_obj = &v_arr.entries[0];
// Size shouldn't have changed.
assert!(vol_obj.spec().size == volume.spec().size);

// try a resize again, this time setting cluster capacity limit.
let _ = vol_cli
.resize(
&ResizeVolume {
uuid: volume.uuid().clone(),
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF),
},
None,
)
.await
.expect_err("Expected error due to insufficient pool capacity");

let v_arr = vol_cli
.get(Filter::Volume(volume.spec().uuid), false, None, None)
.await
.unwrap();
let vol_obj = &v_arr.entries[0];
// Size shouldn't have changed.
assert!(vol_obj.spec().size == volume.spec().size);

// TODO: Add reclaim monitor validations for replicas that got resized as part
// of this failed volume resize.
}

#[tokio::test]
async fn resize_with_cluster_capacity_limit() {
let cluster = ClusterBuilder::builder()
.with_rest(false)
.with_agents(vec!["core"])
.with_io_engines(2)
.with_pool(0, "malloc:///p1?size_mb=200")
.with_pool(1, "malloc:///p1?size_mb=200")
.with_cache_period("1s")
.with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1))
.with_options(|o| o.with_isolated_io_engine(true))
.build()
.await
.unwrap();

let vol_cli = cluster.grpc_client().volume();

// resize exceeding the capacity limit
grpc_resize_volume_with_limit(
&vol_cli,
Some(EXPANDED_SIZE - CAPACITY_LIMIT_DIFF),
Some(ReplyErrorKind::CapacityLimitExceeded {}),
)
.await;

// resize within the capacity limit
grpc_resize_volume_with_limit(&vol_cli, Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF), None).await;
// resize a new volume, but reduce the limit set previously. The limit balance
// calculations are expected to work based on reduced limit value now.
grpc_resize_volume_with_limit(
&vol_cli,
Some(EXPANDED_SIZE + CAPACITY_LIMIT_DIFF / 2),
None,
)
.await;
}

// Take 600MiB pool. Create five volumes of 50MiB each, totalling a usage of 250MiB.
// Set cluster capacity limit to 400MiB and expand all five volumes to 100MiB.
// Since remaining limit is 150MiB, three volumes should successfully expand and
// two must fail to expand.
#[tokio::test]
async fn resize_with_cluster_capacity_limit_concurrent() {
let cluster = ClusterBuilder::builder()
.with_rest(false)
.with_agents(vec!["core"])
.with_io_engines(2)
.with_pool(0, "malloc:///p1?size_mb=600")
.with_cache_period("1s")
.with_reconcile_period(Duration::from_secs(1), Duration::from_secs(1))
.with_options(|o| o.with_isolated_io_engine(true))
.build()
.await
.unwrap();

let num_volumes = 5;
let mut success = 0;
let mut failure = 0;
let vol_cli = cluster.grpc_client().volume();
let volume_ids = create_volumes(&vol_cli, 5).await;
let mut results = Vec::with_capacity(num_volumes);

// Create a channel to collect results from the concurrent tasks
let (tx, mut rx) = tokio::sync::mpsc::channel::<Result<Volume, ReplyError>>(num_volumes);
// For each task
let refc_vol_cli = std::sync::Arc::new(vol_cli);

for volume_id in volume_ids {
let refp_vol_cli = std::sync::Arc::clone(&refc_vol_cli);
let tx = tx.clone();
tokio::spawn(async move {
let hdl = refp_vol_cli
.resize(
&ResizeVolume {
uuid: volume_id.try_into().unwrap(),
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: Some(4 * EXPANDED_SIZE),
},
None,
)
.await;

// Send the result to the channel
tx.send(hdl).await.unwrap();
});
}
// Collect results from the channel
for _ in 0 .. num_volumes {
results.push(rx.recv().await.unwrap());
}

results.iter().for_each(|r| {
if r.is_ok() {
success += 1;
} else {
failure += 1;
}
});
assert_eq!(success, 3);
assert_eq!(failure, 2);
}

async fn grpc_resize_volume_with_limit(
volume_client: &dyn VolumeOperations,
capacity: Option<u64>,
expected_error: Option<ReplyErrorKind>,
) {
let vol_uuid = Uuid::new_v4();

let volume = volume_client
.create(
&CreateVolume {
uuid: vol_uuid.try_into().unwrap(),
size: SIZE,
replicas: 2,
thin: false,
..Default::default()
},
None,
)
.await
.unwrap();

let result = volume_client
.resize(
&ResizeVolume {
uuid: volume.uuid().clone(),
requested_size: EXPANDED_SIZE,
cluster_capacity_limit: capacity,
},
None,
)
.await;

match result {
Ok(resized_volume) => {
assert!(resized_volume.spec().uuid == volume.spec().uuid);
assert!(resized_volume.spec().size == EXPANDED_SIZE);
volume_client
.destroy(
&DestroyVolume {
uuid: resized_volume.uuid().try_into().unwrap(),
},
None,
)
.await
.unwrap();
assert!(expected_error.is_none());
}
Err(e) => {
assert_eq!(expected_error, Some(e.kind)); // wrong error
// Volume not needed anymore.
volume_client
.destroy(
&DestroyVolume {
uuid: volume.uuid().try_into().unwrap(),
},
None,
)
.await
.unwrap();
}
}
}

// Creates count number of volumes, and return the uuid of volume to be resized.
async fn create_volumes(volume_client: &dyn VolumeOperations, count: u64) -> Vec<Uuid> {
let mut volumes = Vec::with_capacity(count as usize);
for _ in 0 .. count {
let vol_uuid = Uuid::new_v4();
let volume = volume_client
.create(
&CreateVolume {
uuid: vol_uuid.try_into().unwrap(),
size: SIZE,
replicas: 1,
thin: false,
..Default::default()
},
None,
)
.await
.unwrap();

volumes.push(volume.uuid().try_into().unwrap())
}

volumes
}
24 changes: 23 additions & 1 deletion control-plane/agents/src/bin/core/volume/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use stor_port::{
pub(super) struct Service {
registry: Registry,
create_volume_limiter: std::sync::Arc<tokio::sync::Semaphore>,
capacity_limit_borrow: std::sync::Arc<parking_lot::RwLock<u64>>,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -243,6 +244,7 @@ impl Service {
create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new(
registry.create_volume_limit(),
)),
capacity_limit_borrow: std::sync::Arc::new(parking_lot::RwLock::new(0)),
registry,
}
}
Expand Down Expand Up @@ -578,6 +580,26 @@ impl Service {
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.uuid))]
pub(super) async fn resize_volume(&self, request: &ResizeVolume) -> Result<Volume, SvcError> {
let mut volume = self.specs().volume(&request.uuid).await?;
volume.resize(&self.registry, request).await

let Some(limit) = request.cluster_capacity_limit() else {
return volume.resize(&self.registry, request).await;
};

let required = request.requested_size - volume.as_ref().size;
*self.capacity_limit_borrow.write() += required;
// If there is a defined system wide capacity limit, ensure we don't breach that.
let current = *self.capacity_limit_borrow.read();
self.specs()
.check_capacity_limit_for_resize(limit, current)
.map_err(|e| {
*self.capacity_limit_borrow.write() -= required;
e
})?;

let resize_ret = volume.resize(&self.registry, request).await;
// Reset the capacity limit that we consumed and will now be accounted in the system's
// current total.
*self.capacity_limit_borrow.write() -= required;
resize_ret
}
}
Loading

0 comments on commit 550e4df

Please sign in to comment.