Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
wip(backend): execute long task in background + check yaml config
Browse files Browse the repository at this point in the history
  • Loading branch information
evoxmusic committed Dec 26, 2023
1 parent bce3768 commit eed32d8
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 134 deletions.
3 changes: 3 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ edition = "2021"
axum = { version = "0.7.2", features = ["macros"] }
tower-http = { version = "0.5.0", features = ["cors"] }
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing = { version = "0.1.40", features = [] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
clap = { version = "4.4", features = ["derive"] }
chrono = "0.4"
which = "5.0.0"

# [dev-dependencies]
# tokio = { version = "1", features = ["rt-multi-thread", "test-util"] }
260 changes: 131 additions & 129 deletions backend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,133 @@ pub struct JobOutputResult {
pub output: serde_json::Value,
}

#[debug_handler]
pub async fn list_catalogs(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
) -> (StatusCode, Json<ResultsResponse<CatalogYamlConfig>>) {
(StatusCode::OK, Json(ResultsResponse { message: None, results: yaml_config.catalogs.clone() }))
}

#[debug_handler]
pub async fn list_catalog_services(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path(catalog_slug): Path<String>,
) -> (StatusCode, Json<ResultsResponse<CatalogServiceYamlConfig>>) {
let catalog = match find_catalog_by_slug(&yaml_config.catalogs, catalog_slug.as_str()) {
Some(catalog) => catalog,
None => return (StatusCode::NOT_FOUND, Json(ResultsResponse {
message: Some(format!("Catalog '{}' not found", catalog_slug)),
results: vec![],
}))
};

(StatusCode::OK, Json(ResultsResponse { message: None, results: catalog.services.clone().unwrap_or(vec![]) }))
}

#[debug_handler]
pub async fn exec_catalog_service_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
let _ = match check_json_payload_against_yaml_config_fields(
catalog_slug.as_str(),
service_slug.as_str(),
&req.payload,
&yaml_config,
) {
Ok(x) => x,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let (_, service) = match get_catalog_and_service(&yaml_config, catalog_slug.as_str(), service_slug.as_str()) {
Ok((catalog, service)) => (catalog, service),
Err(err) => return err
};

let mut job_results = JobResults {
user_fields_input: req.payload.clone(),
results: vec![],
};

for v in service.validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let _ = job_results.results.push(job_output_result);
}

(StatusCode::OK, Json(JobResponse { message: None, results: Some(job_results) }))
}

#[debug_handler]
pub async fn exec_catalog_service_post_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
let _ = match check_json_payload_against_yaml_config_fields(
catalog_slug.as_str(),
service_slug.as_str(),
&req.payload,
&yaml_config,
) {
Ok(x) => x,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let service = match get_catalog_and_service(&yaml_config, catalog_slug.as_str(), service_slug.as_str()) {
Ok((_, service)) => service,
Err(err) => return err
};

// execute validate scripts
for v in service.validate.as_ref().unwrap_or(&vec![]) {
let _ = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(_) => (),
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};
}


let service = service.clone();
// execute post validate scripts
let _ = tokio::spawn(async move {
let mut job_results = JobResults {
user_fields_input: req.payload.clone(),
results: vec![],
};

for v in service.post_validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => todo!("{}", err) // TODO persist error in database
};

let _ = job_results.results.push(job_output_result);
}

// TODO persist results in database
});


(StatusCode::NO_CONTENT, Json(JobResponse { message: Some("workflow executed".to_string()), results: None }))
}

fn find_catalog_by_slug<'a>(catalogs: &'a Vec<CatalogYamlConfig>, catalog_slug: &str) -> Option<&'a CatalogYamlConfig> {
catalogs.iter().find(|catalog| catalog.slug == catalog_slug)
}
Expand All @@ -50,7 +177,7 @@ fn find_catalog_service_by_slug<'a>(catalog: &'a CatalogYamlConfig, service_slug

/// Extract the job output from the environment variable TORII_JSON_OUTPUT and reset it to an empty JSON object
fn consume_job_output_result_from_json_output_env(service_slug: &str) -> JobOutputResult {
let r = match std::env::var("TORII_JSON_OUTPUT") {
let job_output_result = match std::env::var("TORII_JSON_OUTPUT") {
Ok(json_output) => JobOutputResult {
slug: service_slug.to_string(),
output: serde_json::from_str(json_output.as_str()).unwrap_or(serde_json::json!({})),
Expand All @@ -64,7 +191,7 @@ fn consume_job_output_result_from_json_output_env(service_slug: &str) -> JobOutp
// reset the environment variable
std::env::set_var("TORII_JSON_OUTPUT", "{}");

r
job_output_result
}

fn check_json_payload_against_yaml_config_fields(
Expand Down Expand Up @@ -102,29 +229,6 @@ fn check_json_payload_against_yaml_config_fields(
Ok(())
}

#[debug_handler]
pub async fn list_catalogs(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
) -> (StatusCode, Json<ResultsResponse<CatalogYamlConfig>>) {
(StatusCode::OK, Json(ResultsResponse { message: None, results: yaml_config.catalogs.clone() }))
}

#[debug_handler]
pub async fn list_catalog_services(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path(catalog_slug): Path<String>,
) -> (StatusCode, Json<ResultsResponse<CatalogServiceYamlConfig>>) {
let catalog = match find_catalog_by_slug(&yaml_config.catalogs, catalog_slug.as_str()) {
Some(catalog) => catalog,
None => return (StatusCode::NOT_FOUND, Json(ResultsResponse {
message: Some(format!("Catalog '{}' not found", catalog_slug)),
results: vec![],
}))
};

(StatusCode::OK, Json(ResultsResponse { message: None, results: catalog.services.clone().unwrap_or(vec![]) }))
}

async fn execute_command<'a, T>(
service_slug: &'a str,
external_command: &T,
Expand Down Expand Up @@ -179,8 +283,8 @@ async fn execute_command<'a, T>(

fn get_catalog_and_service<'a>(
yaml_config: &'a YamlConfig,
catalog_slug: &'a str,
service_slug: &'a str,
catalog_slug: &str,
service_slug: &str,
) -> Result<(&'a CatalogYamlConfig, &'a CatalogServiceYamlConfig), (StatusCode, Json<JobResponse>)> {
let catalog = match find_catalog_by_slug(&yaml_config.catalogs, catalog_slug) {
Some(catalog) => catalog,
Expand All @@ -201,108 +305,6 @@ fn get_catalog_and_service<'a>(
Ok((catalog, service))
}

#[debug_handler]
pub async fn exec_catalog_service_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
let _ = match check_json_payload_against_yaml_config_fields(
catalog_slug.as_str(),
service_slug.as_str(),
&req.payload,
&yaml_config,
) {
Ok(x) => x,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let (_, service) = match get_catalog_and_service(&yaml_config, catalog_slug.as_str(), service_slug.as_str()) {
Ok((catalog, service)) => (catalog, service),
Err(err) => return err
};

let mut job_results = JobResults {
user_fields_input: req.payload.clone(),
results: vec![],
};

for v in service.validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let _ = job_results.results.push(job_output_result);
}

(StatusCode::OK, Json(JobResponse { message: None, results: Some(job_results) }))
}

#[debug_handler]
pub async fn exec_catalog_service_post_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
let _ = match check_json_payload_against_yaml_config_fields(
catalog_slug.as_str(),
service_slug.as_str(),
&req.payload,
&yaml_config,
) {
Ok(x) => x,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let (_, service) = match get_catalog_and_service(&yaml_config, catalog_slug.as_str(), service_slug.as_str()) {
Ok((catalog, service)) => (catalog, service),
Err(err) => return err
};

// execute validate scripts
for v in service.validate.as_ref().unwrap_or(&vec![]) {
let _ = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(_) => (),
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};
}


let mut job_results = JobResults {
user_fields_input: req.payload.clone(),
results: vec![],
};

// execute post validate scripts
for v in service.post_validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(service_slug.as_str(), v, req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => return (StatusCode::BAD_REQUEST, Json(JobResponse {
message: Some(err),
results: None,
}))
};

let _ = job_results.results.push(job_output_result);
}

// TODO output_model and store results in database

(StatusCode::OK, Json(JobResponse { message: None, results: Some(job_results) }))
}

#[cfg(test)]
mod tests {
Expand Down
4 changes: 1 addition & 3 deletions backend/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@

pub(crate) enum QError {
}
pub(crate) enum QError {}
8 changes: 7 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ async fn main() {
}
};

// TODO check yaml_config validity - slug, name, description, services, fields, validate
match yaml_config.validate() {
Ok(_) => {}
Err(err) => {
error!("failed to validate config file: {}", err);
std::process::exit(1);
}
};

let app = Router::new()
.fallback(unknown_route)
Expand Down
Loading

0 comments on commit eed32d8

Please sign in to comment.