From 60e1e61861d205261dad611a0751b69891f9d7ef Mon Sep 17 00:00:00 2001 From: DziyanaT Date: Thu, 31 Aug 2023 16:35:35 +0200 Subject: [PATCH 1/7] feat: save correct resource usage data to the db --- controller/src/client.rs | 2 +- controller/src/main.rs | 28 ++++++++++++++++++------ controller/src/routes/instances.rs | 2 +- controller/src/types/instance_status.rs | 25 +++++++++++---------- controller/src/types/workload_request.rs | 3 ++- 5 files changed, 39 insertions(+), 21 deletions(-) diff --git a/controller/src/client.rs b/controller/src/client.rs index 3d3c3a9..d84bcb8 100644 --- a/controller/src/client.rs +++ b/controller/src/client.rs @@ -1,5 +1,5 @@ -use orka_proto::scheduler_controller::{scheduling_service_client::SchedulingServiceClient, self}; use orka_proto::scheduler_controller::SchedulingRequest; +use orka_proto::scheduler_controller::{self, scheduling_service_client::SchedulingServiceClient}; use tonic::transport::Channel; use tonic::Streaming; diff --git a/controller/src/main.rs b/controller/src/main.rs index fd6010d..7a52c70 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -4,16 +4,18 @@ mod routes; mod store; mod types; - -use orka_proto::scheduler_controller::{WorkloadInstance, self}; +use orka_proto::scheduler_controller::{self, WorkloadInstance}; use store::kv_manager::{KeyValueBatch, KeyValueStore, DB_BATCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use axum::Router; -use orka_proto::scheduler_controller::scheduling_service_server::{SchedulingService}; -use orka_proto::scheduler_controller::{workload_status::Status as DeploymentStatus, SchedulingRequest, WorkloadStatus}; +use orka_proto::scheduler_controller::scheduling_service_server::SchedulingService; +use orka_proto::scheduler_controller::workload_status::Resources; +use orka_proto::scheduler_controller::{ + workload_status::Status as DeploymentStatus, SchedulingRequest, WorkloadStatus, +}; use std::net::SocketAddr; use std::thread; use std::time::Duration; @@ -67,7 +69,11 @@ impl SchedulingService for Scheduler { code: 0, message: Some("The workload is waiting".to_string()), }), - ..Default::default() + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, WorkloadStatus { instance_id: workload.instance_id.clone(), @@ -75,7 +81,11 @@ impl SchedulingService for Scheduler { code: 1, message: Some("The workload is running".to_string()), }), - ..Default::default() + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, WorkloadStatus { instance_id: workload.instance_id, @@ -83,7 +93,11 @@ impl SchedulingService for Scheduler { code: 2, message: Some("The workload is terminated".to_string()), }), - ..Default::default() + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, ]; diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index cf80601..5c091c0 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -1,4 +1,3 @@ -use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use crate::client::Client; use crate::errors::ApiError; use crate::store::kv_manager::{KeyValueStore, DB_BATCH}; @@ -7,6 +6,7 @@ use crate::types::instance_status::InstanceStatus; use axum::extract::Path; use axum::Json; use log::{error, trace}; +use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use serde_json::{self, json, Value}; use validator::Validate; diff --git a/controller/src/types/instance_status.rs b/controller/src/types/instance_status.rs index a92d511..a6fd7d6 100644 --- a/controller/src/types/instance_status.rs +++ b/controller/src/types/instance_status.rs @@ -19,11 +19,7 @@ impl From<&WorkloadStatus> for InstanceStatus { InstanceStatus { name: (*status.instance_id).to_string(), status_code: InstanceStatusCode::from(status.status.clone()), - resource_usage: InstanceResources { - cpu: 1, - memory: 1, - disk: 1, - }, + resource_usage: InstanceResources::from(status.resource_usage.clone()), } } } @@ -37,12 +33,19 @@ pub struct InstanceResources { pub disk: i32, } -impl From for InstanceResources { - fn from(res: Resources) -> Self { - InstanceResources { - cpu: res.cpu, - memory: res.memory, - disk: res.disk, +impl From> for InstanceResources { + fn from(resources: Option) -> Self { + match resources { + Some(res) => InstanceResources { + cpu: res.cpu, + memory: res.memory, + disk: res.disk, + }, + None => InstanceResources { + cpu: 0, + memory: 0, + disk: 0, + }, } } } diff --git a/controller/src/types/workload_request.rs b/controller/src/types/workload_request.rs index 71093ce..75d2243 100644 --- a/controller/src/types/workload_request.rs +++ b/controller/src/types/workload_request.rs @@ -3,7 +3,8 @@ use uuid::Uuid; use validator::{Validate, ValidationError}; use orka_proto::scheduler_controller::{ - workload::{Resources, Type}, self, + self, + workload::{Resources, Type}, }; #[derive(Debug, Validate, Deserialize, Serialize, Clone)] From 4aa5ba7ee1c311fbef9dd3822e4b1246c5a0b786 Mon Sep 17 00:00:00 2001 From: Mauran Date: Thu, 31 Aug 2023 17:51:48 +0200 Subject: [PATCH 2/7] fix(instance): send error instead of panick Signed-off-by: Mauran --- controller/src/errors.rs | 4 ++++ controller/src/routes/instances.rs | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/controller/src/errors.rs b/controller/src/errors.rs index 3eab702..450ad57 100644 --- a/controller/src/errors.rs +++ b/controller/src/errors.rs @@ -20,6 +20,9 @@ pub enum ApiError { #[error("Scheduling error")] SchedulingError(#[from] tonic::Status), + + #[error("Instance not created")] + InstanceNotCreated{message: String}, } impl IntoResponse for ApiError { @@ -32,6 +35,7 @@ impl IntoResponse for ApiError { ApiError::SerializationError(e) => (StatusCode::BAD_REQUEST, e.to_string()), ApiError::DatabaseError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), ApiError::SchedulingError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), + ApiError::InstanceNotCreated{message} => (StatusCode::BAD_REQUEST, message.to_string()), }; let payload = json!({ diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index 5c091c0..befaaa0 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -5,7 +5,7 @@ use crate::types::instance_request::InstanceRequest; use crate::types::instance_status::InstanceStatus; use axum::extract::Path; use axum::Json; -use log::{error, trace}; +use log::{trace, warn, error}; use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use serde_json::{self, json, Value}; use validator::Validate; @@ -94,8 +94,13 @@ pub async fn post_instance(body: String) -> anyhow::Result, ApiError let mut stream = client.schedule_workload(request).await?; + stream.message().await.map_err(|e|{ + warn!("Error while creating the instance: {:?}", e); + ApiError::InstanceNotCreated{message: format!("Instance not created {:?}", e)} + })?; + tokio::spawn(async move { - while let Some(status) = stream.message().await.unwrap() { + while let Ok(Some(status)) = stream.message().await { trace!("STATUS={:?}", status); let result = DB_BATCH.lock().unwrap().batch.set( &status.instance_id, From 6dcacb31736e9d18a90853bb73f8a5699fbca89e Mon Sep 17 00:00:00 2001 From: DziyanaT Date: Fri, 1 Sep 2023 09:44:54 +0200 Subject: [PATCH 3/7] wip: fixing db ownership error Signed-off-by: Mauran --- controller/src/main.rs | 5 +++-- controller/src/routes/instances.rs | 12 ++++++------ controller/src/routes/workloads.rs | 8 ++++---- controller/src/store/kv_manager.rs | 8 +++++--- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/controller/src/main.rs b/controller/src/main.rs index 7a52c70..fdda314 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -4,8 +4,9 @@ mod routes; mod store; mod types; +use store::kv_manager::DB_STORE; use orka_proto::scheduler_controller::{self, WorkloadInstance}; -use store::kv_manager::{KeyValueBatch, KeyValueStore, DB_BATCH}; +use store::kv_manager::{KeyValueBatch, DB_BATCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -173,7 +174,7 @@ async fn main() -> Result<(), Box> { loop { thread::sleep(Duration::from_secs(5)); let kv_batch = DB_BATCH.lock(); - let kv_store = KeyValueStore::new(); + let kv_store = DB_STORE.lock(); match kv_batch { Ok(mut kvbatch) => { match kv_store { diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index befaaa0..0122a8f 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -1,6 +1,6 @@ use crate::client::Client; use crate::errors::ApiError; -use crate::store::kv_manager::{KeyValueStore, DB_BATCH}; +use crate::store::kv_manager::{DB_BATCH, DB_STORE}; use crate::types::instance_request::InstanceRequest; use crate::types::instance_status::InstanceStatus; use axum::extract::Path; @@ -11,7 +11,7 @@ use serde_json::{self, json, Value}; use validator::Validate; pub async fn get_instances(_body: String) -> anyhow::Result, ApiError> { - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); let instance_list = kv_store.select_instances()?; Ok(Json(json!({ "instances": instance_list }))) } @@ -19,7 +19,7 @@ pub async fn get_instances(_body: String) -> anyhow::Result, ApiErro pub async fn get_specific_instance( Path(id): Path, ) -> anyhow::Result, ApiError> { - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); let instance = kv_store.instances_bucket()?.get(&id)?; match instance { None => Ok(Json(json!({"description": "Instance not found"}))), @@ -36,7 +36,7 @@ pub async fn delete_instance(Path(id): Path) -> anyhow::Result anyhow::Result, ApiError // Validate the request json_body.validate()?; - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); let workload_request = kv_store.workloads_bucket()?.get(&json_body.workload_id)?; match workload_request { diff --git a/controller/src/routes/workloads.rs b/controller/src/routes/workloads.rs index f36fab6..c90b8cb 100644 --- a/controller/src/routes/workloads.rs +++ b/controller/src/routes/workloads.rs @@ -10,7 +10,7 @@ use validator::Validate; pub async fn get_workloads(_body: String) -> anyhow::Result, ApiError> { // Init the database - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); let workloads = kv_store.select_workloads()?; @@ -20,7 +20,7 @@ pub async fn get_workloads(_body: String) -> anyhow::Result, ApiErro pub async fn get_specific_workload( Path(id): Path, ) -> anyhow::Result, ApiError> { - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); let workload = kv_store.workloads_bucket()?.get(&id)?; @@ -33,14 +33,14 @@ pub async fn get_specific_workload( } pub async fn delete_workload(Path(id): Path) -> anyhow::Result, ApiError> { - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); kv_store.workloads_bucket()?.remove(&id)?; Ok(Json(json!({"description": "Workload deleted"}))) } pub async fn post_workload(body: String) -> anyhow::Result, ApiError> { // Init the database - let kv_store = KeyValueStore::new()?; + let kv_store = DB_STORE.lock().unwrap(); // Create a new Workload Request object out of the body let json_body: WorkloadRequest = serde_json::from_str(&body)?; diff --git a/controller/src/store/kv_manager.rs b/controller/src/store/kv_manager.rs index fbc9f78..0f785b1 100644 --- a/controller/src/store/kv_manager.rs +++ b/controller/src/store/kv_manager.rs @@ -40,19 +40,21 @@ impl DerefMut for KeyValueBatch { } } +pub static DB_STORE: Lazy> = Lazy::new(|| Mutex::new(KeyValueStore::new())); + pub struct KeyValueStore { store: Store, } impl KeyValueStore { - pub fn new() -> Result { + pub fn new() -> Self { // Configure the database let cfg = Config::new("./db/controller"); // Open the key/value store - let store = Store::new(cfg)?; + let store = Store::new(cfg).unwrap(); - Ok(Self { store }) + Self { store } } pub fn workloads_bucket(&self) -> Result>, Error> { From 28d9958f5deeeca790dcb089e37f1d85c83bc146 Mon Sep 17 00:00:00 2001 From: DziyanaT Date: Fri, 1 Sep 2023 10:06:05 +0200 Subject: [PATCH 4/7] fix: add Arc to DB_STORE type Signed-off-by: Mauran --- controller/src/errors.rs | 4 ++-- controller/src/main.rs | 2 +- controller/src/routes/instances.rs | 19 ++++++++++++++----- controller/src/store/kv_manager.rs | 11 +++++++++-- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/controller/src/errors.rs b/controller/src/errors.rs index 450ad57..f82c2c4 100644 --- a/controller/src/errors.rs +++ b/controller/src/errors.rs @@ -22,7 +22,7 @@ pub enum ApiError { SchedulingError(#[from] tonic::Status), #[error("Instance not created")] - InstanceNotCreated{message: String}, + InstanceNotCreated { message: String }, } impl IntoResponse for ApiError { @@ -35,7 +35,7 @@ impl IntoResponse for ApiError { ApiError::SerializationError(e) => (StatusCode::BAD_REQUEST, e.to_string()), ApiError::DatabaseError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), ApiError::SchedulingError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), - ApiError::InstanceNotCreated{message} => (StatusCode::BAD_REQUEST, message.to_string()), + ApiError::InstanceNotCreated { message } => (StatusCode::BAD_REQUEST, message), }; let payload = json!({ diff --git a/controller/src/main.rs b/controller/src/main.rs index fdda314..875817e 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -4,8 +4,8 @@ mod routes; mod store; mod types; -use store::kv_manager::DB_STORE; use orka_proto::scheduler_controller::{self, WorkloadInstance}; +use store::kv_manager::DB_STORE; use store::kv_manager::{KeyValueBatch, DB_BATCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index 0122a8f..dcc8e40 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::client::Client; use crate::errors::ApiError; use crate::store::kv_manager::{DB_BATCH, DB_STORE}; @@ -5,7 +7,7 @@ use crate::types::instance_request::InstanceRequest; use crate::types::instance_status::InstanceStatus; use axum::extract::Path; use axum::Json; -use log::{trace, warn, error}; +use log::{error, trace, warn}; use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use serde_json::{self, json, Value}; use validator::Validate; @@ -74,8 +76,13 @@ pub async fn post_instance(body: String) -> anyhow::Result, ApiError // Validate the request json_body.validate()?; - let kv_store = DB_STORE.lock().unwrap(); - let workload_request = kv_store.workloads_bucket()?.get(&json_body.workload_id)?; + let kv_store = Arc::clone(&DB_STORE); + let workload_request = kv_store + .lock() + .unwrap() + .workloads_bucket()? + .get(&json_body.workload_id)?; + drop(kv_store); match workload_request { None => Ok(Json(json!({"description": "Workload not found"}))), @@ -94,9 +101,11 @@ pub async fn post_instance(body: String) -> anyhow::Result, ApiError let mut stream = client.schedule_workload(request).await?; - stream.message().await.map_err(|e|{ + stream.message().await.map_err(|e| { warn!("Error while creating the instance: {:?}", e); - ApiError::InstanceNotCreated{message: format!("Instance not created {:?}", e)} + ApiError::InstanceNotCreated { + message: format!("Instance not created {:?}", e), + } })?; tokio::spawn(async move { diff --git a/controller/src/store/kv_manager.rs b/controller/src/store/kv_manager.rs index 0f785b1..ea29571 100644 --- a/controller/src/store/kv_manager.rs +++ b/controller/src/store/kv_manager.rs @@ -1,6 +1,6 @@ use std::{ ops::{Deref, DerefMut}, - sync::Mutex, + sync::{Arc, Mutex}, }; use crate::types::{instance_status::InstanceStatus, workload_request::WorkloadRequest}; @@ -40,7 +40,8 @@ impl DerefMut for KeyValueBatch { } } -pub static DB_STORE: Lazy> = Lazy::new(|| Mutex::new(KeyValueStore::new())); +pub static DB_STORE: Lazy>> = + Lazy::new(|| Arc::new(Mutex::new(KeyValueStore::new()))); pub struct KeyValueStore { store: Store, @@ -85,3 +86,9 @@ impl KeyValueStore { Ok(instances) } } + +impl Default for KeyValueStore { + fn default() -> Self { + Self::new() + } +} From 14764f664522d0982ad3289cd9ac4f28dbab1929 Mon Sep 17 00:00:00 2001 From: DziyanaT Date: Fri, 1 Sep 2023 10:41:05 +0200 Subject: [PATCH 5/7] feat: match status code Signed-off-by: Mauran --- controller/src/types/instance_status.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/controller/src/types/instance_status.rs b/controller/src/types/instance_status.rs index a6fd7d6..0a0e287 100644 --- a/controller/src/types/instance_status.rs +++ b/controller/src/types/instance_status.rs @@ -52,7 +52,7 @@ impl From> for InstanceResources { #[derive(Debug, Validate, Deserialize, Serialize, Clone)] pub struct InstanceStatusCode { - pub code: i32, + pub code: Code, pub message: Option, } @@ -60,13 +60,32 @@ impl From> for InstanceStatusCode { fn from(status: Option) -> Self { match status { Some(st) => InstanceStatusCode { - code: st.code, + code: Code::from_i32(st.code), message: st.message, }, None => InstanceStatusCode { - code: 0, + code: Code::WAITING, message: Some(String::from("No status found")), }, } } } + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum Code { + WAITING = 0, + RUNNING = 1, + TERMINATED = 2, +} + +impl Code { + fn from_i32(value: i32) -> Code { + match value { + 0 => Code::WAITING, + 1 => Code::RUNNING, + 2 => Code::TERMINATED, + _ => panic!("Unknown value: {}", value), + } + } +} + From c736367a5f16002e0d8b400663962cd0324b562e Mon Sep 17 00:00:00 2001 From: DziyanaT Date: Fri, 1 Sep 2023 12:10:23 +0200 Subject: [PATCH 6/7] fix: status code mapping Signed-off-by: Mauran --- controller/src/types/instance_status.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/controller/src/types/instance_status.rs b/controller/src/types/instance_status.rs index 0a0e287..9662899 100644 --- a/controller/src/types/instance_status.rs +++ b/controller/src/types/instance_status.rs @@ -53,7 +53,7 @@ impl From> for InstanceResources { #[derive(Debug, Validate, Deserialize, Serialize, Clone)] pub struct InstanceStatusCode { pub code: Code, - pub message: Option, + pub message: String, } impl From> for InstanceStatusCode { @@ -61,11 +61,15 @@ impl From> for InstanceStatusCode { match status { Some(st) => InstanceStatusCode { code: Code::from_i32(st.code), - message: st.message, + message: if st.message.is_some() { + st.message.unwrap() + } else { + String::from("") + }, }, None => InstanceStatusCode { - code: Code::WAITING, - message: Some(String::from("No status found")), + code: Code::Waiting, + message: String::from("No status found"), }, } } @@ -73,19 +77,18 @@ impl From> for InstanceStatusCode { #[derive(Debug, Deserialize, Serialize, Clone)] pub enum Code { - WAITING = 0, - RUNNING = 1, - TERMINATED = 2, + Waiting, + Running, + Terminated, } impl Code { fn from_i32(value: i32) -> Code { match value { - 0 => Code::WAITING, - 1 => Code::RUNNING, - 2 => Code::TERMINATED, + 0 => Code::Waiting, + 1 => Code::Running, + 2 => Code::Terminated, _ => panic!("Unknown value: {}", value), } } } - From 585b6f896efc0fc412817426992139b64e038c80 Mon Sep 17 00:00:00 2001 From: Mauran Date: Thu, 31 Aug 2023 17:51:48 +0200 Subject: [PATCH 7/7] fix(instance): send error instead of panick Signed-off-by: Mauran --- controller/src/main.rs | 2 +- controller/src/routes/instances.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/controller/src/main.rs b/controller/src/main.rs index 875817e..fdda314 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -4,8 +4,8 @@ mod routes; mod store; mod types; -use orka_proto::scheduler_controller::{self, WorkloadInstance}; use store::kv_manager::DB_STORE; +use orka_proto::scheduler_controller::{self, WorkloadInstance}; use store::kv_manager::{KeyValueBatch, DB_BATCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index dcc8e40..3992a2d 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -7,7 +7,7 @@ use crate::types::instance_request::InstanceRequest; use crate::types::instance_status::InstanceStatus; use axum::extract::Path; use axum::Json; -use log::{error, trace, warn}; +use log::{trace, warn, error}; use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use serde_json::{self, json, Value}; use validator::Validate;