Skip to content

Commit

Permalink
chore(bors): merge pull request openebs#718
Browse files Browse the repository at this point in the history
718: feat(app node): add support for app node registration and operations r=Abhinandan-Purkait a=Abhinandan-Purkait

App Node: Represents a CSI node or a node where io app/initiator would be situated.

Goals:
- App node resource is needed to issue commands to the node for operations like fsfreeze, unfreeze.
- App nodes will now register itself to the control-plane via the rest API.
- These app nodes would be used to issue fsfreeze, unfreeze as a part of snapshot creation to ensure filesystem consistency.

Changes in this PR:
- Adds the support for registration from rest api to control plane via grpc layer. This adds the grpc definitions and implementations for the same.
- Also adds the feature to get or list these app node from control-plane to rest via grpc, to be eventually used by the controller.

Co-authored-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
mayastor-bors and Abhinandan-Purkait committed Jan 25, 2024
2 parents 3ea84a8 + 8d24aaa commit b2ea4e1
Show file tree
Hide file tree
Showing 32 changed files with 1,395 additions and 14 deletions.
17 changes: 17 additions & 0 deletions control-plane/agents/src/bin/core/app_node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::controller::registry::Registry;
use agents::Service;
use grpc::operations::app_node::server::AppNodeServer;
use std::sync::Arc;

mod registry;
mod service;
mod specs;

pub(crate) fn configure(builder: Service) -> Service {
let registry = builder.shared_state::<Registry>().clone();
let new_service = Arc::new(service::Service::new(registry));
let app_node_server = AppNodeServer::new(new_service);
builder
.with_service(app_node_server.clone().into_v1_grpc_server())
.with_service(app_node_server.into_v1_registration_grpc_server())
}
42 changes: 42 additions & 0 deletions control-plane/agents/src/bin/core/app_node/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::controller::registry::Registry;
use agents::errors::SvcError;
use grpc::operations::{PaginatedResult, Pagination};
use stor_port::{
transport_api::ResourceKind,
types::v0::transport::{AppNode, AppNodeId, AppNodeState},
};

impl Registry {
/// Returns all app nodes from the registry.
pub(super) fn app_nodes(&self) -> Vec<AppNode> {
let app_node_specs = self.specs().app_node_specs();
let mut app_nodes = Vec::with_capacity(app_node_specs.len());
for spec in app_node_specs {
let state = AppNodeState::from(spec.clone());
app_nodes.push(AppNode::new(spec, Some(state)));
}
app_nodes
}
/// Returns the app nodes from the registry that match the given pagination.
pub(super) fn paginated_app_nodes(&self, pagination: &Pagination) -> PaginatedResult<AppNode> {
let app_node_specs = self.specs().paginated_app_node_specs(pagination);
let mut app_nodes = Vec::with_capacity(app_node_specs.len());
let last = app_node_specs.last();
for spec in app_node_specs.result() {
let state = AppNodeState::from(spec.clone());
app_nodes.push(AppNode::new(spec, Some(state)));
}
PaginatedResult::new(app_nodes, last)
}
/// Gets the app node from the registry with the given id.
pub(crate) fn app_node(&self, id: &AppNodeId) -> Result<AppNode, SvcError> {
let Some(spec) = self.specs().app_node_spec(id) else {
return Err(SvcError::NotFound {
kind: ResourceKind::AppNode,
id: id.to_string(),
});
};
let app_node_state = AppNodeState::from(spec.clone());
Ok(AppNode::new(spec, Some(app_node_state)))
}
}
111 changes: 111 additions & 0 deletions control-plane/agents/src/bin/core/app_node/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::controller::registry::Registry;
use agents::errors::SvcError;
use grpc::{
context::Context,
operations::{
app_node::traits::{AppNodeInfo, AppNodeOperations, AppNodeRegisterInfo},
Pagination,
},
};
use stor_port::{
transport_api::{v0::AppNodes, ReplyError},
types::v0::transport::{AppNode, DeregisterAppNode, Filter, RegisterAppNode},
};

/// App node service.
#[derive(Debug, Clone)]
pub(crate) struct Service {
registry: Registry,
}

impl Service {
/// Creates a new app node service.
pub(crate) fn new(registry: Registry) -> Self {
Self { registry }
}

/// Registers an app node.
async fn register(&self, registration: &RegisterAppNode) -> Result<(), SvcError> {
self.registry.register_app_node(registration).await?;
Ok(())
}

/// Deregisters an app node.
async fn deregister(&self, deregistration: &DeregisterAppNode) -> Result<(), SvcError> {
self.registry.deregister_app_node(deregistration).await?;
Ok(())
}

/// Gets an app node.
async fn get_app_node(&self, filter: Filter) -> Result<AppNode, SvcError> {
match filter {
Filter::AppNode(id) => {
let app_node = self.registry.app_node(&id)?;
Ok(app_node)
}
_ => Err(SvcError::InvalidFilter { filter }),
}
}

/// Gets all app nodes.
async fn list_app_nodes(&self, pagination: Option<Pagination>) -> Result<AppNodes, SvcError> {
let mut last_result = true;
let filtered_app_nodes = match pagination {
None => self.registry.app_nodes(),
Some(ref pagination) => {
let paginated_app_nodes = self.registry.paginated_app_nodes(pagination);
last_result = paginated_app_nodes.last();
paginated_app_nodes.result()
}
};

Ok(AppNodes {
entries: filtered_app_nodes,
next_token: match last_result {
true => None,
false => pagination
.clone()
.map(|p| p.starting_token() + p.max_entries()),
},
})
}
}

#[tonic::async_trait]
impl AppNodeOperations for Service {
async fn get(&self, req: Filter, _ctx: Option<Context>) -> Result<AppNode, ReplyError> {
let app_nodes = self.get_app_node(req).await?;
Ok(app_nodes)
}

async fn list(
&self,
pagination: Option<Pagination>,
_ctx: Option<Context>,
) -> Result<AppNodes, ReplyError> {
let app_nodes = self.list_app_nodes(pagination).await?;
Ok(app_nodes)
}

async fn register_app_node(
&self,
registration: &dyn AppNodeRegisterInfo,
_ctx: Option<Context>,
) -> Result<(), ReplyError> {
let req = registration.into();
let service = self.clone();
Context::spawn(async move { service.register(&req).await }).await??;
Ok(())
}

async fn deregister_app_node(
&self,
deregistration: &dyn AppNodeInfo,
_ctx: Option<Context>,
) -> Result<(), ReplyError> {
let req = deregistration.into();
let service = self.clone();
Context::spawn(async move { service.deregister(&req).await }).await??;
Ok(())
}
}
115 changes: 115 additions & 0 deletions control-plane/agents/src/bin/core/app_node/specs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use crate::controller::{
registry::Registry,
resources::operations_helper::{ResourceSpecs, ResourceSpecsLocked},
};
use agents::errors::SvcError;
use grpc::operations::{app_node::traits::AppNodeRegisterInfo, PaginatedResult, Pagination};
use stor_port::{
pstor::ObjectKey,
types::v0::{
store::app_node::{AppNodeSpec, AppNodeSpecKey},
transport::{AppNodeId, DeregisterAppNode, RegisterAppNode},
},
};

impl ResourceSpecs {
/// Get a copy of all app node specs.
pub(crate) fn app_node_specs(&self) -> Vec<AppNodeSpec> {
self.app_nodes.values().map(|v| v.lock().clone()).collect()
}
/// Get a subset of app node specs based on the pagination argument.
pub(crate) fn paginated_app_node_specs(
&self,
pagination: &Pagination,
) -> PaginatedResult<AppNodeSpec> {
let mut last_result = false;
let num_app_nodes = self.app_nodes.len() as u64;
let max_entries = pagination.max_entries();
let offset = std::cmp::min(pagination.starting_token(), num_app_nodes);

let length = match offset + max_entries >= num_app_nodes {
true => {
last_result = true;
num_app_nodes - offset
}
false => pagination.max_entries(),
};

PaginatedResult::new(self.app_nodes.paginate(offset, length), last_result)
}
}
impl ResourceSpecsLocked {
/// Get a subset of app nodes based on the pagination argument.
pub(crate) fn paginated_app_node_specs(
&self,
pagination: &Pagination,
) -> PaginatedResult<AppNodeSpec> {
let specs = self.read();
specs.paginated_app_node_specs(pagination)
}

/// Get a copy of all app node specs.
pub(crate) fn app_node_specs(&self) -> Vec<AppNodeSpec> {
let specs = self.read();
specs.app_node_specs()
}

/// Get an app node spec using id.
pub(crate) fn app_node_spec(&self, id: &AppNodeId) -> Option<AppNodeSpec> {
let specs = self.read();
specs.app_nodes.get(id).map(|spec| spec.lock().clone())
}

/// Remove the app node spec from registry.
fn remove_app_node_spec(&self, id: &AppNodeId) {
let mut specs = self.write();
specs.app_nodes.remove(id);
}

/// Create an app node spec for the incoming request from csi instance.
pub(crate) async fn register_app_node(
&self,
registry: &Registry,
req: &RegisterAppNode,
) -> Result<(), SvcError> {
let (changed, spec_to_persist) = {
let mut specs = self.write();
match specs.app_nodes.get(&req.app_node_id()) {
Some(app_node_rsc) => {
let mut app_node_spec = app_node_rsc.lock();
let changed = app_node_spec.endpoint != req.grpc_endpoint()
|| app_node_spec.labels != req.labels();

app_node_spec.endpoint = req.grpc_endpoint();
(changed, app_node_spec.clone())
}
None => {
let app_node = AppNodeSpec::new(
req.app_node_id().clone(),
req.grpc_endpoint(),
req.labels.clone(),
);
specs.app_nodes.insert(app_node.clone());
(true, app_node)
}
}
};
if changed {
registry.store_obj(&spec_to_persist).await?;
}
Ok(())
}

/// Delete an app node spec from the registry for the incoming request from csi instance.
pub(crate) async fn deregister_app_node(
&self,
registry: &Registry,
req: &DeregisterAppNode,
) -> Result<(), SvcError> {
registry
.delete_kv(&<DeregisterAppNode as Into<AppNodeSpecKey>>::into(req.clone()).key())
.await?;
self.remove_app_node_spec(&req.id);
Ok(())
}
}
20 changes: 19 additions & 1 deletion control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use stor_port::{
registry::{ControlPlaneService, CoreRegistryConfig, NodeRegistration},
volume::InitiatorAC,
},
transport::{HostNqn, NodeId},
transport::{DeregisterAppNode, HostNqn, NodeId, RegisterAppNode},
},
HostAccessControl,
};
Expand Down Expand Up @@ -501,4 +501,22 @@ impl Registry {
false => vec![],
}
}

/// Register a app node (ex: a csi node) with the control-plane.
pub(crate) async fn register_app_node(
&self,
app_node_spec: &RegisterAppNode,
) -> Result<(), SvcError> {
self.specs().register_app_node(self, app_node_spec).await?;
Ok(())
}

/// Deregister a app node (ex: a csi node) with the control-plane.
pub(crate) async fn deregister_app_node(
&self,
app_node: &DeregisterAppNode,
) -> Result<(), SvcError> {
self.specs().deregister_app_node(self, app_node).await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::controller::resources::ResourceUid;
use stor_port::types::v0::{store::app_node::AppNodeSpec, transport::AppNodeId};

impl ResourceUid for AppNodeSpec {
type Uid = AppNodeId;
fn uid(&self) -> &Self::Uid {
&self.id
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use stor_port::types::v0::store::{
AsOperationSequencer, OperationMode, OperationSequenceState, OperationSequencer,
};

mod app_node;
mod migration;
mod nexus;
mod node;
Expand Down
Loading

0 comments on commit b2ea4e1

Please sign in to comment.