diff --git a/.github/workflows/controller.yaml b/.github/workflows/controller.yaml index 97fecd17..6888c8bb 100644 --- a/.github/workflows/controller.yaml +++ b/.github/workflows/controller.yaml @@ -10,4 +10,4 @@ jobs: rustci: uses: "./.github/workflows/rust-ci.yaml" with: - directory: "controller" + directory: "controller" \ No newline at end of file diff --git a/.github/workflows/rust-ci.yaml b/.github/workflows/rust-ci.yaml index 69046aaa..56690b52 100644 --- a/.github/workflows/rust-ci.yaml +++ b/.github/workflows/rust-ci.yaml @@ -38,4 +38,4 @@ jobs: run: cargo fmt --check - name: Run lint - run: cargo clippy -- -D warnings + run: cargo clippy -- -D warnings \ No newline at end of file diff --git a/controller/.gitignore b/controller/.gitignore index 9f970225..2c7428ad 100644 --- a/controller/.gitignore +++ b/controller/.gitignore @@ -1 +1,2 @@ -target/ \ No newline at end of file +target/ +db/ diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 6c1935fe..20960682 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -10,15 +10,21 @@ tokio = { version = "1", features = ["full"] } axum = "0.6.19" tokio-stream = "0.1.6" serde_json = "1.0.104" -serde = {version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive"] } validator = { version = "0.16.1", features = ["derive"] } anyhow = "1.0.75" thiserror = "1.0.47" pretty_env_logger = "0.5.0" log = "0.4.20" +kv = { version = "0.24.0", features = ["json-value"] } +dotenv = "0.15.0" +once_cell = "1.18.0" +orka-proto = { path = "../proto" } -[dependencies.syn] -version = "2.0.28" - -[build-dependencies] -tonic-build = "0.9" +[dependencies.uuid] +version = "1.4.1" +features = [ + "v4", # Lets you generate random UUIDs + "fast-rng", # Use a faster (but still sufficiently random) RNG + "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs +] diff --git a/controller/README.md b/controller/README.md index d91e3219..3b2acef2 100644 --- a/controller/README.md +++ b/controller/README.md @@ -32,7 +32,7 @@ $ export PATH="$PATH:$HOME/.local/bin" The `orka` controller uses the [pretty_env_logger](https://docs.rs/pretty_env_logger/latest/pretty_env_logger/) crate to log all helpful information. The log level can be set by setting the `RUST_LOG` environment variable. For example, to set the log level to `trace` you can run the following command : ``` -export RUST_LOG=trace +export RUST_LOG=info ``` @@ -40,12 +40,8 @@ export RUST_LOG=trace ### Run the server and client GRPC -1. Run the server GRPC : +1. Run the server the controller : ``` -cargo run --bin server +cargo run --bin orka-controller ``` -2. Run the client GRPC : -``` -cargo run --bin client -``` \ No newline at end of file diff --git a/controller/build.rs b/controller/build.rs deleted file mode 100644 index efb8e816..00000000 --- a/controller/build.rs +++ /dev/null @@ -1,4 +0,0 @@ -fn main() -> Result<(), Box> { - tonic_build::compile_protos("../proto/scheduler.proto")?; - Ok(()) -} diff --git a/controller/src/client.rs b/controller/src/client.rs index d4f1bedb..d84bcb84 100644 --- a/controller/src/client.rs +++ b/controller/src/client.rs @@ -1,11 +1,9 @@ -use scheduler::scheduling_service_client::SchedulingServiceClient; -use scheduler::SchedulingRequest; +use orka_proto::scheduler_controller::SchedulingRequest; +use orka_proto::scheduler_controller::{self, scheduling_service_client::SchedulingServiceClient}; use tonic::transport::Channel; -use log::trace; +use tonic::Streaming; -pub mod scheduler { - tonic::include_proto!("orkascheduler"); -} +use orka_proto::scheduler_controller::{WorkloadInstance, WorkloadStatus}; pub struct Client { client: SchedulingServiceClient, @@ -20,16 +18,29 @@ impl Client { pub async fn schedule_workload( &mut self, scheduling_request: SchedulingRequest, - ) -> Result<(), Box> { - let request = scheduling_request; + ) -> Result, tonic::Status> { + let response = self.client.schedule(scheduling_request).await?; - let response = self.client.schedule(request).await?; + let stream = response.into_inner(); - let mut stream = response.into_inner(); + Ok(stream) + } - while let Some(status) = stream.message().await? { - trace!("STATUS={:?}", status); - } - Ok(()) + pub async fn stop_instance( + &mut self, + instance: WorkloadInstance, + ) -> Result { + let response = self.client.stop(instance).await?; + + Ok(response.into_inner()) } -} \ No newline at end of file + + pub async fn destroy_instance( + &mut self, + instance: WorkloadInstance, + ) -> Result { + let response = self.client.destroy(instance).await?; + + Ok(response.into_inner()) + } +} diff --git a/controller/src/errors.rs b/controller/src/errors.rs index 30adc3c5..f82c2c48 100644 --- a/controller/src/errors.rs +++ b/controller/src/errors.rs @@ -14,6 +14,15 @@ pub enum ApiError { #[error("Serialization error")] SerializationError(#[from] serde_json::Error), + + #[error("Database error")] + DatabaseError(#[from] kv::Error), + + #[error("Scheduling error")] + SchedulingError(#[from] tonic::Status), + + #[error("Instance not created")] + InstanceNotCreated { message: String }, } impl IntoResponse for ApiError { @@ -24,6 +33,9 @@ impl IntoResponse for ApiError { } ApiError::ClientConnectError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), ApiError::SerializationError(e) => (StatusCode::BAD_REQUEST, e.to_string()), + ApiError::DatabaseError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), + ApiError::SchedulingError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), + ApiError::InstanceNotCreated { message } => (StatusCode::BAD_REQUEST, message), }; let payload = json!({ diff --git a/controller/src/lib.rs b/controller/src/lib.rs new file mode 100644 index 00000000..e91e5578 --- /dev/null +++ b/controller/src/lib.rs @@ -0,0 +1,5 @@ +pub mod client; +pub mod errors; +pub mod routes; +pub mod store; +pub mod types; diff --git a/controller/src/main.rs b/controller/src/main.rs index e6c02592..875817e1 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -1,23 +1,32 @@ mod client; mod errors; mod routes; +mod store; mod types; -use crate::client::scheduler; - +use orka_proto::scheduler_controller::{self, WorkloadInstance}; +use store::kv_manager::DB_STORE; +use store::kv_manager::{KeyValueBatch, DB_BATCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{Request, Response, Status}; use axum::Router; -use scheduler::scheduling_service_server::{SchedulingService, SchedulingServiceServer}; -use scheduler::{SchedulingRequest, WorkloadStatus}; +use orka_proto::scheduler_controller::scheduling_service_server::SchedulingService; +use orka_proto::scheduler_controller::workload_status::Resources; +use orka_proto::scheduler_controller::{ + workload_status::Status as DeploymentStatus, SchedulingRequest, WorkloadStatus, +}; use std::net::SocketAddr; +use std::thread; +use std::time::Duration; use tokio::task; use axum::routing::{delete, post}; -use log::info; -use routes::instances::{delete_instance, get_instances, get_specific_instance, post_instance}; +use log::{error, info}; +use routes::instances::{ + delete_instance, delete_instance_force, get_instances, get_specific_instance, post_instance, +}; use routes::workloads::{delete_workload, get_specific_workload, get_workloads, post_workload}; #[derive(Debug, Default)] @@ -27,33 +36,69 @@ pub struct Scheduler {} impl SchedulingService for Scheduler { type ScheduleStream = ReceiverStream>; + async fn stop( + &self, + request: Request, + ) -> Result, Status> { + info!("{:?}", request); + Ok(Response::new(scheduler_controller::Empty {})) + } + + async fn destroy( + &self, + request: Request, + ) -> Result, Status> { + info!("{:?}", request); + Ok(Response::new(scheduler_controller::Empty {})) + } + async fn schedule( &self, request: Request, ) -> Result, Status> { - info!("Got a request: {:?}", request); + info!("{:?}", request); let (sender, receiver) = mpsc::channel(4); + let workload = request.into_inner().workload.unwrap(); + tokio::spawn(async move { let fake_statuses_response = vec![ WorkloadStatus { - name: "Workload 1".to_string(), - status_code: 0, - message: "Your workload is WAITING".to_string(), - ..Default::default() + instance_id: workload.instance_id.clone(), + status: Some(DeploymentStatus { + code: 0, + message: Some("The workload is waiting".to_string()), + }), + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, WorkloadStatus { - name: "Workload 1".to_string(), - status_code: 1, - message: "Your workload is RUNNING".to_string(), - ..Default::default() + instance_id: workload.instance_id.clone(), + status: Some(DeploymentStatus { + code: 1, + message: Some("The workload is running".to_string()), + }), + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, WorkloadStatus { - name: "Workload 2".to_string(), - status_code: 2, - message: "Your workload is TERMINATED".to_string(), - ..Default::default() + instance_id: workload.instance_id, + status: Some(DeploymentStatus { + code: 2, + message: Some("The workload is terminated".to_string()), + }), + resource_usage: Some(Resources { + cpu: 2, + memory: 3, + disk: 4, + }), }, ]; @@ -84,18 +129,18 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); // Initialize grpc - let grpc_addr = "[::1]:50051".parse()?; - let scheduler = Scheduler::default(); - - // Spawn the gRPC server as a tokio task - let grpc_thread = task::spawn(async move { - info!("gRPC server running at: {}", grpc_addr); - Server::builder() - .add_service(SchedulingServiceServer::new(scheduler)) - .serve(grpc_addr) - .await - .unwrap(); - }); + // let grpc_addr = "[::1]:50051".parse()?; + // let scheduler = Scheduler::default(); + + // // Spawn the gRPC server as a tokio task + // let grpc_thread = task::spawn(async move { + // info!("gRPC server running at: {}", grpc_addr); + // Server::builder() + // .add_service(SchedulingServiceServer::new(scheduler)) + // .serve(grpc_addr) + // .await + // .unwrap(); + // }); // Initialize http let http_addr = SocketAddr::from(([127, 0, 0, 1], 3000)); @@ -109,6 +154,10 @@ async fn main() -> Result<(), Box> { .route( "/instances/:id", delete(delete_instance).get(get_specific_instance), + ) + .route( + "/instances/:id/force", + delete(delete_instance_force).get(get_specific_instance), ); // Spawn the HTTP server as a tokio task @@ -120,8 +169,34 @@ async fn main() -> Result<(), Box> { .unwrap(); }); - // Wait for both servers to finish - tokio::try_join!(grpc_thread, http_thread)?; + // Create a thread to sync the batch with the database + let db_thread = task::spawn(async move { + loop { + thread::sleep(Duration::from_secs(5)); + let kv_batch = DB_BATCH.lock(); + let kv_store = DB_STORE.lock(); + match kv_batch { + Ok(mut kvbatch) => { + match kv_store { + Ok(store) => { + store + .instances_bucket() + .unwrap() + .batch(kvbatch.batch.clone()) + .unwrap(); + // Clear batch after update + *kvbatch = KeyValueBatch::new(); + } + Err(e) => error!("{}", e), + } + } + Err(e) => error!("{}", e), + } + } + }); + + // Wait for both servers and a db thread to finish + tokio::try_join!(http_thread, db_thread)?; Ok(()) } diff --git a/controller/src/routes/instances.rs b/controller/src/routes/instances.rs index 620ce5fe..dcc8e405 100644 --- a/controller/src/routes/instances.rs +++ b/controller/src/routes/instances.rs @@ -1,36 +1,130 @@ +use std::sync::Arc; + +use crate::client::Client; use crate::errors::ApiError; +use crate::store::kv_manager::{DB_BATCH, DB_STORE}; use crate::types::instance_request::InstanceRequest; +use crate::types::instance_status::InstanceStatus; +use axum::extract::Path; use axum::Json; +use log::{error, trace, warn}; +use orka_proto::scheduler_controller::{SchedulingRequest, Workload, WorkloadInstance}; use serde_json::{self, json, Value}; use validator::Validate; pub async fn get_instances(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => retrieve A JSON array of instances ids - }); - Ok(Json(json!({"instances": "[]"}))) + let kv_store = DB_STORE.lock().unwrap(); + let instance_list = kv_store.select_instances()?; + Ok(Json(json!({ "instances": instance_list }))) } -pub async fn get_specific_instance(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => retrieve the instance needed from hashmap - }); - Ok(Json(json!({"description": "A instance description file"}))) +pub async fn get_specific_instance( + Path(id): Path, +) -> anyhow::Result, ApiError> { + let kv_store = DB_STORE.lock().unwrap(); + let instance = kv_store.instances_bucket()?.get(&id)?; + match instance { + None => Ok(Json(json!({"description": "Instance not found"}))), + Some(instance_status) => Ok(Json(json!({"description": instance_status.as_ref()}))), + } } -pub async fn delete_instance(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => remove instance from hashmap and stops it using it's id - }); - Ok(Json(json!({"description": "Deleted"}))) +pub async fn delete_instance(Path(id): Path) -> anyhow::Result, ApiError> { + let mut client = Client::new().await?; + + let instance = WorkloadInstance { + instance_id: (*id).to_string(), + }; + + client.stop_instance(instance).await?; + + let kv_store = DB_STORE.lock().unwrap(); + + let instance = kv_store.instances_bucket()?.remove(&id)?; + + match instance { + Some(_inst) => Ok(Json(json!({"description": "Deleted"}))), + None => Ok(Json(json!({"description": "Instance not found"}))), + } +} + +pub async fn delete_instance_force( + Path(id): Path, +) -> anyhow::Result, ApiError> { + let mut client = Client::new().await?; + + let instance = WorkloadInstance { + instance_id: (*id).to_string(), + }; + + client.destroy_instance(instance).await?; + + let kv_store = DB_STORE.lock().unwrap(); + + let instance = kv_store.instances_bucket()?.remove(&id)?; + + match instance { + Some(_inst) => Ok(Json(json!({"description": "Deleted"}))), + None => Ok(Json(json!({"description": "Instance not found"}))), + } } pub async fn post_instance(body: String) -> anyhow::Result, ApiError> { - // Create a new Workload Request object out of the body + // Create a new Instance Request object out of the body let json_body: InstanceRequest = serde_json::from_str(&body)?; // Validate the request json_body.validate()?; - // TODO: Creates an instance based on an existing workload id - Ok(Json(json!({"description": "Instance created"}))) + + let kv_store = Arc::clone(&DB_STORE); + let workload_request = kv_store + .lock() + .unwrap() + .workloads_bucket()? + .get(&json_body.workload_id)?; + drop(kv_store); + + match workload_request { + None => Ok(Json(json!({"description": "Workload not found"}))), + Some(json_request) => { + // Create a grpc workload object + let workload = Workload::from(json_request.0.workload); + + // We spawn a thread to handle the request + let mut client = Client::new().await?; + + let instance_id = (*workload.instance_id).to_string(); + + let request = SchedulingRequest { + workload: Some(workload), + }; + + let mut stream = client.schedule_workload(request).await?; + + stream.message().await.map_err(|e| { + warn!("Error while creating the instance: {:?}", e); + ApiError::InstanceNotCreated { + message: format!("Instance not created {:?}", e), + } + })?; + + tokio::spawn(async move { + while let Ok(Some(status)) = stream.message().await { + trace!("STATUS={:?}", status); + let result = DB_BATCH.lock().unwrap().batch.set( + &status.instance_id, + &kv::Json(InstanceStatus::from(&status)), + ); + match result { + Ok(()) => {} + Err(e) => error!("{}", e), + } + } + }); + + Ok(Json(json!({ + "description": format!("Instance creation started, id: {}", instance_id) + }))) + } + } } diff --git a/controller/src/routes/mod.rs b/controller/src/routes/mod.rs index ea896eac..b57c7bca 100644 --- a/controller/src/routes/mod.rs +++ b/controller/src/routes/mod.rs @@ -1,2 +1,2 @@ pub mod instances; -pub mod workloads; \ No newline at end of file +pub mod workloads; diff --git a/controller/src/routes/workloads.rs b/controller/src/routes/workloads.rs index 0f869e80..c90b8cbd 100644 --- a/controller/src/routes/workloads.rs +++ b/controller/src/routes/workloads.rs @@ -1,74 +1,62 @@ +use crate::errors::ApiError; +use crate::store::kv_manager::*; use crate::types::workload_request::WorkloadRequest; -use crate::{ - client::{ - scheduler::{ - workload::Type, - SchedulingRequest, Workload, - }, - Client, - }, - errors::ApiError, -}; -use axum::Json; -use log::info; +use axum::{extract::Path, Json}; use serde_json::{self, json, Value}; + +use uuid::Uuid; + use validator::Validate; -use crate::client::scheduler::workload; pub async fn get_workloads(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => retrieve list of workloads from hashmap - }); - Ok(Json(json!({"workloads": "[]"}))) + // Init the database + let kv_store = DB_STORE.lock().unwrap(); + + let workloads = kv_store.select_workloads()?; + + Ok(Json(json!({ "workloads": workloads }))) } -pub async fn get_specific_workload(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => retrieve the workload needed from hashmap - }); - Ok(Json(json!({"description": "A workload description file"}))) +pub async fn get_specific_workload( + Path(id): Path, +) -> anyhow::Result, ApiError> { + let kv_store = DB_STORE.lock().unwrap(); + + let workload = kv_store.workloads_bucket()?.get(&id)?; + + match workload { + None => Ok(Json(json!({"description": "Workload not found"}))), + Some(workload_description) => { + Ok(Json(json!({"description": workload_description.as_ref()}))) + } + } } -pub async fn delete_workload(_body: String) -> anyhow::Result, ApiError> { - tokio::spawn(async move { - // TODO: Implement => remove workload from hashmap - }); - Ok(Json(json!({"description": "Deleted"}))) +pub async fn delete_workload(Path(id): Path) -> anyhow::Result, ApiError> { + let kv_store = DB_STORE.lock().unwrap(); + kv_store.workloads_bucket()?.remove(&id)?; + Ok(Json(json!({"description": "Workload deleted"}))) } pub async fn post_workload(body: String) -> anyhow::Result, ApiError> { - // We spawn a thread to handle the request - let mut client = Client::new().await?; + // Init the database + let kv_store = DB_STORE.lock().unwrap(); + // Create a new Workload Request object out of the body let json_body: WorkloadRequest = serde_json::from_str(&body)?; // Validate if the workload request is valid json_body.validate()?; - // Extract the env variable table - let mut environment = Vec::new(); - if !json_body.workload.environment.is_empty() { - for env in json_body.workload.environment.iter() { - environment.push(env.clone()); - } - } - - // Create a grpc workload object - let workload = Workload { - name: json_body.workload.name, - r#type: Type::Container.into(), - image: json_body.workload.image, - environment, - resource_limits: Some(workload::Resources::default()), - }; - - let request = SchedulingRequest { - workload: Some(workload), - }; + // Generate a new uuid + let id_with_prefix = format!("workload-{}-{}", json_body.workload.name, Uuid::new_v4()); - let response = client.schedule_workload(request).await.unwrap(); + // Store the workload request in the database + kv_store + .workloads_bucket()? + .set(&id_with_prefix, &kv::Json(json_body))?; - info!("RESPONSE={:?}", response); - // TODO: Handle the grpc response and if OK save data and send response to cli - Ok(Json(json!({"description": "Created"}))) + Ok(Json(json!({ + "Your workload is successfully created ": id_with_prefix + }))) } diff --git a/controller/src/store/kv_manager.rs b/controller/src/store/kv_manager.rs new file mode 100644 index 00000000..ea29571c --- /dev/null +++ b/controller/src/store/kv_manager.rs @@ -0,0 +1,94 @@ +use std::{ + ops::{Deref, DerefMut}, + sync::{Arc, Mutex}, +}; + +use crate::types::{instance_status::InstanceStatus, workload_request::WorkloadRequest}; +use kv::*; +use once_cell::sync::Lazy; + +pub static DB_BATCH: Lazy> = Lazy::new(|| Mutex::new(KeyValueBatch::new())); + +pub struct KeyValueBatch { + pub batch: Batch>, +} + +impl KeyValueBatch { + pub fn new() -> Self { + Self { + batch: Batch::new(), + } + } +} + +impl Default for KeyValueBatch { + fn default() -> Self { + Self::new() + } +} + +impl Deref for KeyValueBatch { + type Target = Batch>; + fn deref(&self) -> &Self::Target { + &self.batch + } +} + +impl DerefMut for KeyValueBatch { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.batch + } +} + +pub static DB_STORE: Lazy>> = + Lazy::new(|| Arc::new(Mutex::new(KeyValueStore::new()))); + +pub struct KeyValueStore { + store: Store, +} + +impl KeyValueStore { + pub fn new() -> Self { + // Configure the database + let cfg = Config::new("./db/controller"); + + // Open the key/value store + let store = Store::new(cfg).unwrap(); + + Self { store } + } + + pub fn workloads_bucket(&self) -> Result>, Error> { + self.store + .bucket::>(Some("workloads")) + } + + pub fn instances_bucket(&self) -> Result>, Error> { + self.store + .bucket::>(Some("instances")) + } + + // Get an array of workload ids + pub fn select_workloads(&self) -> Result, Error> { + let mut workloads: Vec = Vec::new(); + for workload in self.workloads_bucket()?.iter() { + workloads.push(workload?.key()?); + } + Ok(workloads) + } + + // Get an array of instance names + pub fn select_instances(&self) -> Result, Error> { + let mut instances: Vec = Vec::new(); + for instance in self.instances_bucket()?.iter() { + instances.push(instance?.key()?); + } + Ok(instances) + } +} + +impl Default for KeyValueStore { + fn default() -> Self { + Self::new() + } +} diff --git a/controller/src/store/mod.rs b/controller/src/store/mod.rs new file mode 100644 index 00000000..ce4b2371 --- /dev/null +++ b/controller/src/store/mod.rs @@ -0,0 +1 @@ +pub mod kv_manager; diff --git a/controller/src/types/instance_request.rs b/controller/src/types/instance_request.rs index b9553e2d..9902d576 100644 --- a/controller/src/types/instance_request.rs +++ b/controller/src/types/instance_request.rs @@ -1,7 +1,7 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use validator::Validate; -#[derive(Debug, Validate, Deserialize)] +#[derive(Debug, Validate, Deserialize, Serialize)] pub struct InstanceRequest { #[validate(length(min = 1))] pub workload_id: String, diff --git a/controller/src/types/instance_status.rs b/controller/src/types/instance_status.rs new file mode 100644 index 00000000..96628995 --- /dev/null +++ b/controller/src/types/instance_status.rs @@ -0,0 +1,94 @@ +use serde::{Deserialize, Serialize}; +use validator::Validate; + +use orka_proto::scheduler_controller::{ + workload_status::{Resources, Status}, + WorkloadStatus, +}; + +#[derive(Debug, Validate, Deserialize, Serialize, Clone)] +pub struct InstanceStatus { + #[validate(length(min = 1))] + pub name: String, + pub status_code: InstanceStatusCode, + pub resource_usage: InstanceResources, +} + +impl From<&WorkloadStatus> for InstanceStatus { + fn from(status: &WorkloadStatus) -> Self { + InstanceStatus { + name: (*status.instance_id).to_string(), + status_code: InstanceStatusCode::from(status.status.clone()), + resource_usage: InstanceResources::from(status.resource_usage.clone()), + } + } +} + +#[derive(Debug, Validate, Deserialize, Serialize, Clone)] +pub struct InstanceResources { + pub cpu: i32, + + pub memory: i32, + + pub disk: i32, +} + +impl From> for InstanceResources { + fn from(resources: Option) -> Self { + match resources { + Some(res) => InstanceResources { + cpu: res.cpu, + memory: res.memory, + disk: res.disk, + }, + None => InstanceResources { + cpu: 0, + memory: 0, + disk: 0, + }, + } + } +} + +#[derive(Debug, Validate, Deserialize, Serialize, Clone)] +pub struct InstanceStatusCode { + pub code: Code, + pub message: String, +} + +impl From> for InstanceStatusCode { + fn from(status: Option) -> Self { + match status { + Some(st) => InstanceStatusCode { + code: Code::from_i32(st.code), + message: if st.message.is_some() { + st.message.unwrap() + } else { + String::from("") + }, + }, + None => InstanceStatusCode { + code: Code::Waiting, + message: String::from("No status found"), + }, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum Code { + Waiting, + Running, + Terminated, +} + +impl Code { + fn from_i32(value: i32) -> Code { + match value { + 0 => Code::Waiting, + 1 => Code::Running, + 2 => Code::Terminated, + _ => panic!("Unknown value: {}", value), + } + } +} diff --git a/controller/src/types/mod.rs b/controller/src/types/mod.rs index 7c8ae142..83f3cc40 100644 --- a/controller/src/types/mod.rs +++ b/controller/src/types/mod.rs @@ -1,2 +1,3 @@ pub mod instance_request; +pub mod instance_status; pub mod workload_request; diff --git a/controller/src/types/workload_request.rs b/controller/src/types/workload_request.rs index 13ba2d2a..75d22431 100644 --- a/controller/src/types/workload_request.rs +++ b/controller/src/types/workload_request.rs @@ -1,24 +1,30 @@ use serde::{Deserialize, Serialize}; +use uuid::Uuid; use validator::{Validate, ValidationError}; -#[derive(Debug, Validate, Deserialize)] +use orka_proto::scheduler_controller::{ + self, + workload::{Resources, Type}, +}; + +#[derive(Debug, Validate, Deserialize, Serialize, Clone)] pub struct WorkloadRequest { pub version: String, pub workload: Workload, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub enum WorkloadKind { Container, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub enum WorkloadRegistry { Docker, Podman, Ghcr, } -#[derive(Debug, Validate, Deserialize)] +#[derive(Debug, Validate, Deserialize, Serialize, Clone)] pub struct Workload { #[validate(custom = "validate_workload_kind")] pub kind: WorkloadKind, @@ -51,4 +57,17 @@ fn validate_workload_registry(registry: &WorkloadRegistry) -> Result<(), Validat WorkloadRegistry::Podman => Ok(()), WorkloadRegistry::Ghcr => Ok(()), } -} \ No newline at end of file +} + +impl From for scheduler_controller::Workload { + fn from(workload: Workload) -> scheduler_controller::Workload { + // Create a grpc workload object + scheduler_controller::Workload { + instance_id: format!("instance-{}-{}", workload.name, Uuid::new_v4()), + r#type: Type::Container.into(), + image: workload.image, + environment: workload.environment, + resource_limits: Some(Resources::default()), + } + } +} diff --git a/docs/proposals/controller/api_definition.yaml b/docs/proposals/controller/api_definition.yaml index 7a56524f..1319bd68 100644 --- a/docs/proposals/controller/api_definition.yaml +++ b/docs/proposals/controller/api_definition.yaml @@ -17,6 +17,12 @@ paths: application/json: schema: $ref: '#/components/schemas/Workload' + '4XX': + description: Error response (e.g. yaml description file is invalid) + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' post: summary: Creates a workload (only container workload type for now) requestBody: @@ -27,8 +33,12 @@ paths: responses: '201': description: Created - '400': - description: Yaml description file is invalid + '4XX': + description: Error response (e.g. yaml description file is invalid) + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /workloads/{id}: get: summary: Returns a single workload by id. @@ -39,11 +49,23 @@ paths: application/json: schema: $ref: '#/components/schemas/Workload' + '4XX': + description: Error response + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' delete: summary: Deletes a workload by id responses: '204': description: Deleted + '4XX': + description: Error response + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' parameters: - name: id in: path @@ -63,6 +85,12 @@ paths: type: array items: type: string + '4XX': + description: Error response + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' post: summary: Creates an instance based on an existing workload id requestBody: @@ -76,8 +104,12 @@ paths: responses: '201': description: Created - '400': + '4XX': description: Couldn't create an instance based on the workload file + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' /instances/{id}: get: summary: Returns a single instance status by id. @@ -88,11 +120,23 @@ paths: application/json: schema: $ref: '#/components/schemas/InstanceStatus' + '4XX': + description: Error response + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' delete: summary: Deletes an instance by id responses: '204': description: Deleted + '4XX': + description: Error response + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' parameters: - name: id in: path @@ -102,6 +146,13 @@ paths: type: string components: schemas: + ErrorResponse: + type: object + properties: + status: + type: string + message: + type: string Workload: type: object properties: diff --git a/proto/src/scheduler/controller.proto b/proto/src/scheduler/controller.proto index 8c18cfdc..98b6ba9d 100644 --- a/proto/src/scheduler/controller.proto +++ b/proto/src/scheduler/controller.proto @@ -37,7 +37,7 @@ message WorkloadStatus { TERMINATED = 2; } - uint32 code = 1; + StatusCode code = 1; optional string message = 2; }