Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Volume Health Information #946

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions control-plane/agents/src/bin/core/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub(crate) mod io_engine;
/// Various policies' definitions(e.g. rebuild policy)
pub(crate) mod policies;
mod pstor_cache;
/// reconciliation logic
pub(crate) mod reconciler;
/// registry with node and all its resources
Expand Down
51 changes: 51 additions & 0 deletions control-plane/agents/src/bin/core/controller/pstor_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use agents::errors::SvcError;
use snafu::ResultExt;
use std::collections::BTreeMap;
use stor_port::{
pstor,
pstor::{error::DeserialiseValue, Error, ObjectKey, StorableObject, StoreWatchReceiver},
};

#[derive(Clone)]
pub(crate) struct PStorCache {
entries: BTreeMap<String, serde_json::Value>,
}

impl PStorCache {
pub(crate) async fn new(
pstor: &mut impl pstor::StoreKv,
page_size: i64,
prefix: &str,
) -> Result<Self, SvcError> {
let entries = pstor
.get_values_paged_all(prefix, page_size)
.await?
.into_iter()
.collect::<BTreeMap<_, _>>();

Ok(Self { entries })
}
}

#[async_trait::async_trait]
impl pstor::StoreObj for PStorCache {
async fn put_obj<O: StorableObject>(&mut self, _object: &O) -> Result<(), Error> {
unimplemented!()
}

async fn get_obj<O: StorableObject>(&mut self, key: &O::Key) -> Result<O, Error> {
let key = key.key();
match self.entries.get(&key) {
Some(kv) => Ok(
serde_json::from_value(kv.clone()).context(DeserialiseValue {
value: kv.to_string(),
})?,
),
None => Err(Error::MissingEntry { key }),
}
}

async fn watch_obj<K: ObjectKey>(&mut self, _key: &K) -> Result<StoreWatchReceiver, Error> {
unimplemented!()
}
}
78 changes: 70 additions & 8 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
use super::{resources::operations_helper::*, wrapper::NodeWrapper};
use crate::{
controller::{
pstor_cache::PStorCache,
reconciler::ReconcilerControl,
resources::VolumeHealthWatcher,
task_poller::{PollEvent, PollTriggerEvent},
wrapper::InternalOps,
},
Expand All @@ -36,7 +38,7 @@ use stor_port::{
},
types::v0::{
store::{
nexus_persistence::delete_all_v1_nexus_info,
nexus_persistence::{delete_all_v1_nexus_info, NexusInfo},
registry::{ControlPlaneService, CoreRegistryConfig, NodeRegistration},
volume::InitiatorAC,
},
Expand Down Expand Up @@ -74,6 +76,7 @@ pub(crate) struct RegistryInner<S: Store> {
nodes: NodesMapLocked,
/// spec (aka desired state) of the various resources.
specs: ResourceSpecsLocked,
health: VolumeHealthWatcher,
/// period to refresh the cache.
cache_period: std::time::Duration,
store: Arc<Mutex<S>>,
Expand Down Expand Up @@ -165,6 +168,7 @@ impl Registry {
inner: Arc::new(RegistryInner {
nodes: Default::default(),
specs: ResourceSpecsLocked::new(),
health: VolumeHealthWatcher::new(&store),
cache_period,
store: Arc::new(Mutex::new(store.clone())),
store_timeout,
Expand Down Expand Up @@ -322,6 +326,10 @@ impl Registry {
pub(crate) fn specs(&self) -> &ResourceSpecsLocked {
&self.specs
}
/// Get a reference to the volume health watcher.
pub(crate) fn health(&self) -> &VolumeHealthWatcher {
&self.health
}

/// Serialized write to the persistent store.
pub(crate) async fn store_obj<O: StorableObject>(&self, object: &O) -> Result<(), SvcError> {
Expand Down Expand Up @@ -386,6 +394,32 @@ impl Registry {
}
}

/// Serialized delete to the persistent store, with a prefix.
pub(crate) async fn delete_kv_prefix<K: StoreKey>(&self, key: &K) -> Result<(), SvcError> {
let store = self.store.clone();
match tokio::time::timeout(self.store_timeout, async move {
let mut store = store.lock().await;
let key = key.to_string();
Self::op_with_threshold(async move { store.delete_values_prefix(&key).await }).await
})
.await
{
Ok(result) => match result {
Ok(_) => Ok(()),
// already deleted, no problem
Err(StoreError::MissingEntry { .. }) => {
tracing::warn!("Entry with key {} missing from store.", key.to_string());
Ok(())
}
Err(error) => Err(SvcError::from(error)),
},
Err(_) => Err(SvcError::from(StoreError::Timeout {
operation: "Delete".to_string(),
timeout: self.store_timeout,
})),
}
}

async fn op_with_threshold<F, O>(future: F) -> O
where
F: Future<Output = O>,
Expand Down Expand Up @@ -437,14 +471,42 @@ impl Registry {

/// Initialise the registry with the content of the persistent store.
async fn init(&self) -> Result<(), SvcError> {
{
let store = self.store.clone();
let mut store = store.lock().await;
self.specs
.init(
store.deref_mut(),
self.legacy_prefix_present,
self.etcd_max_page_size,
)
.await?;
}
self.init_health().await?;

Ok(())
}

async fn init_health(&self) -> Result<(), SvcError> {
self.health.init().await?;
let mut store = self.store.lock().await;
self.specs
.init(
store.deref_mut(),
self.legacy_prefix_present,
self.etcd_max_page_size,
)
.await?;
let mut pstor_cache = PStorCache::new(
store.deref_mut(),
self.etcd_max_page_size,
self.health.key_prefix(),
)
.await?;
for volume in self.specs.volumes_rsc() {
let Some(nexus_info_key) = volume.immutable_ref().health_info_key() else {
continue;
};
let Ok(mut info) = pstor_cache.get_obj::<NexusInfo>(&nexus_info_key).await else {
continue;
};
info.uuid = nexus_info_key.nexus_id().clone();
info.volume_uuid = Some(volume.uuid().clone());
self.health.if_empty_insert(info);
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub(crate) mod operations_helper;
/// Generic resources map.
pub(crate) mod resource_map;

pub(crate) use volume::VolumeHealthWatcher;

impl<T: OperationSequencer + Sized, R> Drop for OperationGuard<T, R> {
fn drop(&mut self) {
self.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,23 +974,10 @@ impl ResourceSpecsLocked {
where
T: DeserializeOwned,
{
let specs: Vec<Result<T, serde_json::Error>> = values
.iter()
.map(|v| serde_json::from_value(v.clone()))
.collect();

let mut result = vec![];
for spec in specs {
match spec {
Ok(s) => {
result.push(s);
}
Err(e) => {
return Err(e);
}
}
}
Ok(result)
values
.into_iter()
.map(serde_json::from_value)
.collect::<Result<Vec<_>, serde_json::Error>>()
}

/// Populate the resource specs with data from the persistent store.
Expand All @@ -1015,7 +1002,7 @@ impl ResourceSpecsLocked {
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_values = store_entries.iter().map(|e| e.1.clone()).collect();
let store_values = store_entries.into_iter().map(|e| e.1).collect();

let mut resource_specs = self.0.write();
match spec_type {
Expand All @@ -1024,6 +1011,7 @@ impl ResourceSpecsLocked {
Self::deserialise_specs::<VolumeSpec>(store_values).context(Deserialise {
obj_type: StorableObjectType::VolumeSpec,
})?;
// Add the volume watch here for all the specs OR after this call.
let ag_specs = get_affinity_group_specs(&specs);
resource_specs.volumes.populate(specs);
// Load the ag specs in memory, ag specs are not persisted in memory so we don't
Expand Down
107 changes: 104 additions & 3 deletions control-plane/agents/src/bin/core/controller/resources/volume/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
mod snapshot;

use super::{ResourceMutex, ResourceUid};
use stor_port::types::v0::{
store::volume::{AffinityGroupSpec, VolumeSpec},
transport::VolumeId,
use parking_lot::Mutex;
use std::{collections::BTreeMap, sync::Arc};
use stor_port::{
pstor,
types::v0::{
store::{
nexus_persistence::{NexusInfo, NexusInfoKey},
volume::{AffinityGroupSpec, VolumeSpec},
},
transport::VolumeId,
},
};

impl ResourceMutex<VolumeSpec> {
Expand Down Expand Up @@ -55,3 +63,96 @@ macro_rules! volume_span {
};
}
crate::impl_trace_span!(volume_span, VolumeSpec);

use pstor::{StoreKv, StoreKvWatcher};
use stor_port::types::v0::transport::NexusId;

pub(crate) struct VolumeHealthWatcher {
watcher: Box<dyn StoreKvWatcher>,
key_prefix: String,
health: Arc<Mutex<BTreeMap<uuid::Uuid, Arc<NexusInfo>>>>,
}
impl std::fmt::Debug for VolumeHealthWatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VolumeHealthWatcher").finish()
}
}

impl VolumeHealthWatcher {
/// Create a new `Self`.
pub(crate) fn new(store: &impl StoreKv) -> Self {
let health = Arc::new(Mutex::new(BTreeMap::new()));

let health_cln = health.clone();
let watcher = store.kv_watcher(move |arg| {
let cnt = pstor::WatchResult::Continue;

if arg.value.is_empty() {
match NexusInfoKey::parse_id(arg.updated_key) {
Ok(id) => {
tracing::debug!(key=%arg.updated_key, %id, "Removing key");
health_cln.lock().remove(id.uuid());
}
Err(error) => {
tracing::warn!(key=%arg.updated_key, error, "Received unexpected PStor Update");
}
}
return cnt;
}

let Ok(nexus_info) = serde_json::from_str::<NexusInfo>(arg.value) else {
tracing::error!(
key = arg.updated_key,
value = arg.value,
"Failed to parse health value information"
);
return cnt;
};

match nexus_info.with_key(arg.updated_key) {
Ok(Some(info)) => {
tracing::debug!(?info, "Updating Health info");
health_cln.lock().insert(*info.uuid, Arc::new(info));
}
Ok(None) => {
tracing::warn!(key=%arg.updated_key, "Received unexpected PStor Update");
}
Err(error) => tracing::warn!(key=%arg.updated_key, %error, "Failed to parse uuids"),
}

cnt
});

Self {
watcher: Box::new(watcher),
key_prefix: NexusInfoKey::key_prefix(),
health,
}
}
/// Get the health key prefix.
pub(crate) fn key_prefix(&self) -> &str {
&self.key_prefix
}
/// Start the watcher.
/// All registered pstor key updates will be propagated via the callback.
pub(crate) async fn init(&self) -> Result<(), agents::errors::SvcError> {
self.watcher
.watch(pstor::WatchKey::new(NexusInfoKey::key_prefix()), ())?;
Ok(())
}
/// If the health info hasn't been added yet, insert it.
pub(crate) fn if_empty_insert(&self, info: NexusInfo) {
let mut health = self.health.lock();
if health.get(&info.uuid).is_none() {
health.insert(*info.uuid, Arc::new(info));
}
}
/// Get the volume health info for the given target.
pub(crate) fn health(&self, target: &NexusId) -> Option<Arc<NexusInfo>> {
self.health.lock().get(target.uuid()).cloned()
}
/// Expose a retain interface, allowing clean up of objects.
pub(crate) fn retain<F: FnMut(&uuid::Uuid, &mut Arc<NexusInfo>) -> bool>(&self, retain: F) {
self.health.lock().retain(retain);
}
}
Loading