From 1a7226e2c630ca88a02847f973d16dec53327803 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 28 Oct 2023 11:59:28 -0400 Subject: [PATCH] tweak batch loading and update validation functions to handle new null/conflict logic --- src/lib.rs | 492 ++++++++++------------ src/validate.rs | 366 +++++----------- test/expected/messages.tsv | 1 + test/expected/messages_a1.tsv | 1 + test/expected/messages_after_api_test.tsv | 1 + 5 files changed, 322 insertions(+), 539 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 392af3fc..fc0cfc91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1358,7 +1358,7 @@ pub fn query_column_with_message_value(table: &str, column: &str, pool: &AnyPool LIMIT 1 ) ELSE {casted_column} - END AS "{column}_with_mval""#, + END AS "{column}""#, casted_column = if pool.any_kind() == AnyKind::Sqlite { cast_column_sql_to_text(column, "non-text") } else { @@ -1392,7 +1392,7 @@ pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &A let mut outer_columns = real_columns .iter() - .map(|c| format!("t.\"{}_with_mval\"", c)) + .map(|c| format!("t.\"{}\"", c)) .collect::>(); let inner_columns = { @@ -1437,7 +1437,7 @@ pub async fn get_affected_rows( // which the value of the column is the same as `value` let sql = { format!( - r#"{main_query} WHERE "{column}_with_mval" = '{value}'{except}"#, + r#"{main_query} WHERE "{column}" = '{value}'{except}"#, main_query = query_with_message_values(table, global_config, pool), column = column, value = value, @@ -1476,7 +1476,6 @@ pub async fn get_affected_rows( "valid": true, "messages": json!([]), }); - let cname = cname.strip_suffix("_with_mval").unwrap(); table_row.insert(cname.to_string(), json!(cell)); } } @@ -1542,10 +1541,6 @@ pub async fn get_row_from_db( } else { value = String::from(""); } - let cname = match cname.strip_suffix("_with_mval") { - None => cname.clone(), - Some(cname) => cname, - }; let column_messages = messages .iter() .filter(|m| m.get("column").unwrap().as_str() == Some(cname)) @@ -2402,22 +2397,7 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec bool { - vec![ - "key:primary", - "key:unique", - "key:foreign", - "tree:child-unique", - "tree:foreign", - "under:not-in-tree", - "under:not-under", - ] - .contains(&rule) -} - -/// TODO: Add a docstring here +/// Given a SQL type and a value, return true if the value does not conform to the SQL type. fn is_sql_type_error(sql_type: &str, value: &str) -> bool { let sql_type = sql_type.to_lowercase(); if sql_type == "numeric" { @@ -2566,75 +2546,54 @@ pub async fn insert_new_row_tx( let mut insert_columns = vec![]; let mut insert_values = vec![]; let mut insert_params = vec![]; - let mut messages = vec![]; + let mut all_messages = vec![]; let sorted_datatypes = get_sorted_datatypes(global_config); let conflict_columns = get_conflict_columns(global_config, table); let mut use_conflict_table = false; for (column, cell) in row.iter() { insert_columns.append(&mut vec![format!(r#""{}""#, column)]); + let cell = cell .as_object() .ok_or(SqlxCErr(format!("Cell {:?} is not an object", cell).into()))?; - let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).ok_or(SqlxCErr( + let valid = cell.get("valid").and_then(|v| v.as_bool()).ok_or(SqlxCErr( format!("No bool named 'valid' in {:?}", cell).into(), ))?; - let cell_value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( + let value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( format!("No string named 'value' in {:?}", cell).into(), ))?; + let messages = sort_messages( + &sorted_datatypes, + cell.get("messages") + .and_then(|m| m.as_array()) + .ok_or(SqlxCErr( + format!("No array named 'messages' in {:?}", cell).into(), + ))?, + ); - if !use_conflict_table && !cell_valid && conflict_columns.contains(&json!(column)) { - use_conflict_table = true; - } - let mut insert_null = false; - if !cell_valid { - let cell_messages = sort_messages( - &sorted_datatypes, - cell.get("messages") - .and_then(|m| m.as_array()) - .ok_or(SqlxCErr( - format!("No array named 'messages' in {:?}", cell).into(), - ))?, - ); - for cell_message in cell_messages { - let level = cell_message - .get("level") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'level' in {:?}", cell).into()))?; - let rule = cell_message - .get("rule") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; - let is_db_key_error = is_db_key_error(&rule); - let sql_type = get_sql_type_from_global_config(global_config, table, column, pool) - .ok_or(SqlxCErr( - format!("Could not get SQL type for {}.{}", table, column).into(), - ))?; - let is_sql_type_error = is_sql_type_error(&sql_type, cell_value); - if level == "error" && (is_db_key_error || is_sql_type_error) { - if !use_conflict_table || is_sql_type_error { - insert_null = true; - } - } - messages.push(json!({ - "column": column, - "value": cell_value, - "level": cell_message.get("level").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'level' in {:?}", cell_message).into()) - )?, - "rule": cell_message.get("rule").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'rule' in {:?}", cell_message).into()) - )?, - "message": cell_message.get("message").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'message' in {:?}", cell_message).into()) - )?, - })); - } + for message in messages { + all_messages.push(json!({ + "column": column, + "value": value, + "level": message.get("level").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'level' in {:?}", message).into()) + )?, + "rule": message.get("rule").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'rule' in {:?}", message).into()) + )?, + "message": message.get("message").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'message' in {:?}", message).into()) + )?, + })); } - if insert_null { + let sql_type = get_sql_type_from_global_config(global_config, table, column, pool).ok_or( + SqlxCErr(format!("Could not get SQL type for {}.{}", table, column).into()), + )?; + if is_sql_type_error(&sql_type, value) { insert_values.push(String::from("NULL")); } else { let sql_type = get_sql_type_from_global_config(&global_config, &table, &column, pool) @@ -2642,7 +2601,11 @@ pub async fn insert_new_row_tx( format!("Unable to determine SQL type for {}.{}", table, column).into(), ))?; insert_values.push(cast_sql_param_from_text(&sql_type)); - insert_params.push(String::from(cell_value)); + insert_params.push(String::from(value)); + } + + if !use_conflict_table && !valid && conflict_columns.contains(&json!(column)) { + use_conflict_table = true; } } @@ -2689,7 +2652,7 @@ pub async fn insert_new_row_tx( query.execute(tx.acquire().await?).await?; // Next add any validation messages to the message table: - for m in messages { + for m in all_messages { let column = m.get("column").and_then(|c| c.as_str()).unwrap(); let value = m.get("value").and_then(|c| c.as_str()).unwrap(); let level = m.get("level").and_then(|c| c.as_str()).unwrap(); @@ -3305,7 +3268,7 @@ pub fn get_sql_type_from_global_config( .and_then(|c| c.get("datatype")) .and_then(|d| d.as_str()) .and_then(|d| Some(d.to_string())) - .unwrap(); + .expect(&format!("Could not get datatype for {}.{}", table, column)); get_sql_type(&dt_config, &dt, pool) } @@ -4018,137 +3981,149 @@ async fn make_inserts( pool: &AnyPool, ) -> Result< ( - (String, Vec, String, Vec), - (String, Vec, String, Vec), + String, + Vec, + String, + Vec, + String, + Vec, ), sqlx::Error, > { + fn is_conflict_row(row: &ResultRow, conflict_columns: &Vec) -> bool { + for (column, cell) in &row.contents { + let column = SerdeValue::String(column.to_string()); + if !cell.valid && conflict_columns.contains(&column) { + return true; + } + } + return false; + } + fn generate_sql( config: &SerdeMap, - table_name: &String, - column_names: &Vec, - rows: &Vec, + main_table: &String, + columns: &Vec, + rows: &mut Vec, + chunk_number: usize, messages_stats: &mut HashMap, verbose: bool, pool: &AnyPool, - ) -> (String, Vec, String, Vec) { - let normal_table_name = match table_name.strip_suffix("_conflict") { - None => table_name.clone(), - Some(base) => base.to_string(), - }; - let mut lines = vec![]; - let mut params = vec![]; + ) -> ( + String, + Vec, + String, + Vec, + String, + Vec, + ) { + let mut main_lines = vec![]; + let mut main_params = vec![]; + let mut conflict_lines = vec![]; + let mut conflict_params = vec![]; let mut message_lines = vec![]; let mut message_params = vec![]; let sorted_datatypes = get_sorted_datatypes(config); - for row in rows.iter() { - let mut values = vec![format!("{}", row.row_number.unwrap())]; - for column in column_names { + let conflict_columns = get_conflict_columns(config, main_table); + for (i, row) in rows.iter_mut().enumerate() { + // enumerate begins at 0 but we need to begin at 1: + let i = i + 1; + row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32); + let use_conflict_table = is_conflict_row(&row, &conflict_columns); + let mut row_values = vec![format!("{}", row.row_number.unwrap())]; + let mut row_params = vec![]; + for column in columns { let cell = row.contents.get(column).unwrap(); // Insert the value of the cell into the column unless inserting it will cause a db // error or it has the nulltype field set, in which case insert NULL: - let mut insert_null = false; - if cell.nulltype != None { - insert_null = true; - } else if !cell.valid { - for cell_message in &cell.messages { - let level = cell_message.get("level").and_then(|l| l.as_str()).unwrap(); - let rule = cell_message.get("rule").and_then(|l| l.as_str()).unwrap(); - let is_db_key_error = is_db_key_error(&rule); - let sql_type = get_sql_type_from_global_config( - config, - &normal_table_name, - column, - pool, - ) - .unwrap(); - let is_sql_type_error = is_sql_type_error(&sql_type, &cell.value); - if level == "error" && (is_db_key_error || is_sql_type_error) { - if !table_name.ends_with("_conflict") || is_sql_type_error { - insert_null = true; - break; - } - } - } - } - if !insert_null { - let sql_type = - get_sql_type_from_global_config(&config, &normal_table_name, &column, pool) - .unwrap(); - values.push(cast_sql_param_from_text(&sql_type)); - params.push(cell.value.clone()); + let sql_type = + get_sql_type_from_global_config(config, &main_table, column, pool).unwrap(); + if cell.nulltype != None || is_sql_type_error(&sql_type, &cell.value) { + row_values.push(String::from("NULL")); } else { - values.push(String::from("NULL")); + row_values.push(cast_sql_param_from_text(&sql_type)); + row_params.push(cell.value.clone()); } - // If the cell isn't valid, generate values and params to be used for the insert to - // the message table: - if !cell.valid { - if verbose { - add_message_counts(&cell.messages, messages_stats); - } - for message in sort_messages(&sorted_datatypes, &cell.messages) { - let row = row.row_number.unwrap().to_string(); - let message_values = vec![ - SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, - ]; - - let message = message.as_object().unwrap(); - message_params.push(normal_table_name.clone()); - message_params.push(column.clone()); - message_params.push(cell.value.clone()); - message_params.push( - message - .get("level") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - message_params.push( - message - .get("rule") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - message_params.push( - message - .get("message") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - let line = message_values.join(", "); - let line = format!("({})", line); - message_lines.push(line); - } + // Generate values and params to be used for the insert to the message table: + if verbose { + add_message_counts(&cell.messages, messages_stats); + } + + for message in sort_messages(&sorted_datatypes, &cell.messages) { + let row = row.row_number.unwrap().to_string(); + let message_values = vec![ + SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, + ]; + + let message = message.as_object().unwrap(); + message_params.push(main_table.clone()); + message_params.push(column.clone()); + message_params.push(cell.value.clone()); + message_params.push( + message + .get("level") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + message_params.push( + message + .get("rule") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + message_params.push( + message + .get("message") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + let line = message_values.join(", "); + let line = format!("({})", line); + message_lines.push(line); } } - let line = values.join(", "); + let line = row_values.join(", "); let line = format!("({})", line); - lines.push(line); + if use_conflict_table { + conflict_lines.push(line); + conflict_params.append(&mut row_params); + } else { + main_lines.push(line); + main_params.append(&mut row_params); + } } // Generate the SQL output for the insert to the table: - let mut output = String::from(""); - if !lines.is_empty() { - output.push_str(&format!( - r#"INSERT INTO "{}" ("row_number", {}) VALUES"#, - table_name, - { - let mut all_columns = vec![]; - for column_name in column_names { - let quoted_column_name = format!(r#""{}""#, column_name); - all_columns.push(quoted_column_name); + fn get_table_output(lines: &Vec, table: &str, columns: &Vec) -> String { + let mut output = String::from(""); + if !lines.is_empty() { + output.push_str(&format!( + r#"INSERT INTO "{}" ("row_number", {}) VALUES"#, + table, + { + let mut quoted_columns = vec![]; + for column in columns { + let quoted_column = format!(r#""{}""#, column); + quoted_columns.push(quoted_column); + } + quoted_columns.join(", ") } - all_columns.join(", ") - } - )); - output.push_str("\n"); - output.push_str(&lines.join(",\n")); - output.push_str(";"); + )); + output.push_str("\n"); + output.push_str(&lines.join(",\n")); + output.push_str(";"); + } + output } + let main_output = get_table_output(&main_lines, &main_table, &columns); + let conflict_table = format!("{}_conflict", main_table); + let conflict_output = get_table_output(&conflict_lines, &conflict_table, &columns); + // Generate the output for the insert to the message table: let mut message_output = String::from(""); if !message_lines.is_empty() { @@ -4161,31 +4136,14 @@ async fn make_inserts( message_output.push_str(";"); } - (output, params, message_output, message_params) - } - - fn has_error_in_conflict_column(row: &ResultRow, conflict_columns: &Vec) -> bool { - for (column, cell) in &row.contents { - let column = SerdeValue::String(column.to_string()); - if !cell.valid && conflict_columns.contains(&column) { - return true; - } - } - return false; - } - - let conflict_columns = get_conflict_columns(config, table_name); - let mut main_rows = vec![]; - let mut conflict_rows = vec![]; - for (i, row) in rows.iter_mut().enumerate() { - // enumerate begins at 0 but we need to begin at 1: - let i = i + 1; - row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32); - if has_error_in_conflict_column(&row, &conflict_columns) { - conflict_rows.push(row.clone()); - } else { - main_rows.push(row.clone()); - } + ( + main_output, + main_params, + conflict_output, + conflict_params, + message_output, + message_params, + ) } // Use the "column_order" field of the table config for this table to retrieve the column names @@ -4200,34 +4158,25 @@ async fn make_inserts( .map(|v| v.as_str().unwrap().to_string()) .collect::>(); - let (main_sql, main_params, main_message_sql, main_message_params) = generate_sql( - &config, - &table_name, - &column_names, - &main_rows, - messages_stats, - verbose, - pool, - ); - let (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params) = + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = generate_sql( &config, - &format!("{}_conflict", table_name), + &table_name, &column_names, - &conflict_rows, + rows, + chunk_number, messages_stats, verbose, pool, ); Ok(( - (main_sql, main_params, main_message_sql, main_message_params), - ( - conflict_sql, - conflict_params, - conflict_message_sql, - conflict_message_params, - ), + main_sql, + main_params, + conflict_sql, + conflict_params, + message_sql, + message_params, )) } @@ -4260,19 +4209,17 @@ async fn validate_rows_inter_and_insert( tmp_messages_stats.insert("error".to_string(), 0); tmp_messages_stats.insert("warning".to_string(), 0); tmp_messages_stats.insert("info".to_string(), 0); - let ( - (main_sql, main_params, main_message_sql, main_message_params), - (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params), - ) = make_inserts( - config, - table_name, - rows, - chunk_number, - &mut tmp_messages_stats, - verbose, - pool, - ) - .await?; + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + &mut tmp_messages_stats, + verbose, + pool, + ) + .await?; let main_sql = local_sql_syntax(&pool, &main_sql); let mut main_query = sqlx_query(&main_sql); @@ -4289,19 +4236,12 @@ async fn validate_rows_inter_and_insert( } conflict_query.execute(pool).await?; - let main_message_sql = local_sql_syntax(&pool, &main_message_sql); - let mut main_message_query = sqlx_query(&main_message_sql); - for param in &main_message_params { - main_message_query = main_message_query.bind(param); + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); } - main_message_query.execute(pool).await?; - - let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql); - let mut conflict_message_query = sqlx_query(&conflict_message_sql); - for param in &conflict_message_params { - conflict_message_query = conflict_message_query.bind(param); - } - conflict_message_query.execute(pool).await?; + message_query.execute(pool).await?; if verbose { let curr_errors = messages_stats.get("error").unwrap(); @@ -4323,20 +4263,17 @@ async fn validate_rows_inter_and_insert( } Err(_) => { validate_rows_constraints(config, pool, table_name, rows).await?; - - let ( - (main_sql, main_params, main_message_sql, main_message_params), - (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params), - ) = make_inserts( - config, - table_name, - rows, - chunk_number, - messages_stats, - verbose, - pool, - ) - .await?; + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + messages_stats, + verbose, + pool, + ) + .await?; let main_sql = local_sql_syntax(&pool, &main_sql); let mut main_query = sqlx_query(&main_sql); @@ -4352,19 +4289,12 @@ async fn validate_rows_inter_and_insert( } conflict_query.execute(pool).await?; - let main_message_sql = local_sql_syntax(&pool, &main_message_sql); - let mut main_message_query = sqlx_query(&main_message_sql); - for param in &main_message_params { - main_message_query = main_message_query.bind(param); - } - main_message_query.execute(pool).await?; - - let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql); - let mut conflict_message_query = sqlx_query(&conflict_message_sql); - for param in &conflict_message_params { - conflict_message_query = conflict_message_query.bind(param); + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); } - conflict_message_query.execute(pool).await?; + message_query.execute(pool).await?; } }; diff --git a/src/validate.rs b/src/validate.rs index 5d476be7..326b9eca 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -9,8 +9,8 @@ use std::collections::HashMap; use crate::{ ast::Expression, cast_column_sql_to_text, cast_sql_param_from_text, get_column_value, - get_sql_type_from_global_config, local_sql_syntax, ColumnRule, CompiledCondition, - ParsedStructure, SerdeMap, SQL_PARAM, + get_sql_type_from_global_config, is_sql_type_error, local_sql_syntax, ColumnRule, + CompiledCondition, ParsedStructure, SerdeMap, SQL_PARAM, }; /// Represents a particular cell in a particular row of data with vaildation results. @@ -157,10 +157,12 @@ pub async fn validate_row( cell, ); - // We don't do any further validation on cells that have datatype violations because + // We don't do any further validation on cells that have SQL type violations because // they can result in database errors when, for instance, we compare a numeric with a // non-numeric type. - if cell.valid || !contains_dt_violation(&cell.messages) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if !is_sql_type_error(&sql_type, &cell.value) { // TODO: Pass the query_as_if parameter to validate_cell_trees. validate_cell_trees( config, @@ -451,6 +453,7 @@ pub async fn validate_under( .and_then(|t| t.as_array()) .unwrap(); + let view_name = format!("{}_view", table_name); for ukey in ukeys { let ukey = ukey.as_object().unwrap(); let tree_table = ukey.get("ttable").and_then(|tt| tt.as_str()).unwrap(); @@ -479,7 +482,8 @@ pub async fn validate_under( let mut extra_clause; let mut params; if let Some(ref extra_row) = extra_row { - (extra_clause, params) = select_with_extra_row(&config, extra_row, table_name, pool); + (extra_clause, params) = + select_with_extra_row(&config, extra_row, table_name, &view_name, pool); } else { extra_clause = String::new(); params = vec![]; @@ -497,9 +501,9 @@ pub async fn validate_under( // understood to be under themselves. let effective_table; if !extra_clause.is_empty() { - effective_table = format!("{}_ext", table_name); + effective_table = format!("{}_ext", view_name); } else { - effective_table = table_name.clone(); + effective_table = view_name.clone(); } let effective_tree; @@ -535,35 +539,30 @@ pub async fn validate_under( let sql = local_sql_syntax( &pool, &format!( - r#"{} {} + r#"{tree_sql} {extra_clause} SELECT "row_number", - "{}"."{}", + "{effective_table}"."{column}", CASE - WHEN "{}"."{}" IN ( - SELECT "{}" FROM "{}" + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_child}" FROM "{effective_tree}" ) THEN 1 ELSE 0 END AS "is_in_tree", CASE - WHEN "{}"."{}" IN ( - SELECT "{}" FROM "tree" + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_parent}" FROM "tree" ) THEN 0 ELSE 1 END AS "is_under" - FROM "{}""#, - tree_sql, - extra_clause, - effective_table, - column, - effective_table, - column, - tree_child, - effective_tree, - effective_table, - column, - tree_parent, - effective_table, + FROM "{effective_table}""#, + tree_sql = tree_sql, + extra_clause = extra_clause, + effective_table = effective_table, + column = column, + tree_child = tree_child, + effective_tree = effective_tree, + tree_parent = tree_parent, ), ); @@ -591,87 +590,33 @@ pub async fn validate_under( } let raw_column_val = row.try_get_raw(format!(r#"{}"#, column).as_str()).unwrap(); - let mut column_val = String::from(""); - if raw_column_val.is_null() { - // If the column value already contains a different error, its value will be null - // and it will be returned by the above query regardless of whether it actually - // violates the tree's foreign constraint. So we check the value from the message - // table instead: - let message_sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT "value", "level", "rule", "message" - FROM "message" - WHERE "table" = {} - AND "row" = {} - AND "column" = {}"#, - SQL_PARAM, SQL_PARAM, SQL_PARAM - ), - ); - let mut message_query = sqlx_query(&message_sql); - message_query = message_query.bind(&table_name); - message_query = message_query.bind(&row_number); - message_query = message_query.bind(column); - let message_rows = { - if let None = tx { - message_query.fetch_all(pool).await? - } else { - message_query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - // If there are no rows in the message table then the cell is legitimately empty and - // we can skip this row: - if message_rows.is_empty() { - continue; - } - - let mut has_dt_violation = false; - for mrow in &message_rows { - let rule: &str = mrow.get_unchecked("rule"); - if rule.starts_with("datatype:") { - has_dt_violation = true; - break; - } else { - let value: &str = mrow.get_unchecked("value"); - column_val = value.to_string(); - } - } - // If the value in the column has already been deemed to be invalid because - // of a datatype error, then just skip this row. This is to avoid potential database - // errors that might arise if we compare a numeric with a non-numeric type. - if has_dt_violation { - continue; + if !raw_column_val.is_null() { + let column_val = get_column_value(&row, &column, &sql_type); + // We use i32 instead of i64 (which we use for row_number) here because, unlike + // row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints. + let is_in_tree: i32 = row.get("is_in_tree"); + let is_under: i32 = row.get("is_under"); + if is_in_tree == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-in-tree", + "message": format!("Value '{}' of column {} is not in {}.{}", + column_val, column, tree_table, tree_child).as_str(), + })); + } else if is_under == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-under", + "message": format!("Value '{}' of column {} is not under '{}'", + column_val, column, uval.clone()).as_str(), + })); } - } else { - column_val = get_column_value(&row, &column, &sql_type); - } - - // We use i32 instead of i64 (which we use for row_number) here because, unlike - // row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints. - let is_in_tree: i32 = row.get("is_in_tree"); - let is_under: i32 = row.get("is_under"); - if is_in_tree == 0 { - results.push(json!({ - "row_number": row_number as u32, - "column": column, - "value": column_val, - "level": "error", - "rule": "under:not-in-tree", - "message": format!("Value '{}' of column {} is not in {}.{}", - column_val, column, tree_table, tree_child).as_str(), - })); - } else if is_under == 0 { - results.push(json!({ - "row_number": row_number as u32, - "column": column, - "value": column_val, - "level": "error", - "rule": "under:not-under", - "message": format!("Value '{}' of column {} is not under '{}'", - column_val, column, uval.clone()).as_str(), - })); } } } @@ -699,6 +644,7 @@ pub async fn validate_tree_foreign_keys( .and_then(|t| t.as_array()) .unwrap(); + let view_name = format!("{}_view", table_name); let mut results = vec![]; for tkey in tkeys { let tkey = tkey.as_object().unwrap(); @@ -709,7 +655,8 @@ pub async fn validate_tree_foreign_keys( let with_clause; let params; if let Some(ref extra_row) = extra_row { - (with_clause, params) = select_with_extra_row(&config, extra_row, table_name, pool); + (with_clause, params) = + select_with_extra_row(&config, extra_row, table_name, &view_name, pool); } else { with_clause = String::new(); params = vec![]; @@ -717,29 +664,27 @@ pub async fn validate_tree_foreign_keys( let effective_table_name; if !with_clause.is_empty() { - effective_table_name = format!("{}_ext", table_name); + effective_table_name = format!("{}_ext", view_name); } else { - effective_table_name = table_name.clone(); + effective_table_name = view_name.clone(); } let sql = local_sql_syntax( &pool, &format!( - r#"{} + r#"{with_clause} SELECT - t1."row_number", t1."{}" - FROM "{}" t1 + t1."row_number", t1."{parent_col}" + FROM "{effective_table_name}" t1 WHERE NOT EXISTS ( SELECT 1 - FROM "{}" t2 - WHERE t2."{}" = t1."{}" + FROM "{effective_table_name}" t2 + WHERE t2."{child_col}" = t1."{parent_col}" )"#, - with_clause, - parent_col, - effective_table_name, - effective_table_name, - child_col, - parent_col + with_clause = with_clause, + parent_col = parent_col, + effective_table_name = effective_table_name, + child_col = child_col, ), ); @@ -767,99 +712,18 @@ pub async fn validate_tree_foreign_keys( let raw_parent_val = row .try_get_raw(format!(r#"{}"#, parent_col).as_str()) .unwrap(); - let mut parent_val = String::from(""); if !raw_parent_val.is_null() { - parent_val = get_column_value(&row, &parent_col, &parent_sql_type); - } else { - // If the parent column already contains a different error, its value will be null - // and it will be returned by the above query regardless of whether it actually - // violates the tree's foreign constraint. So we check the value from the message - // table instead: - let message_sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT "value", "level", "rule", "message" - FROM "message" - WHERE "table" = {} - AND "row" = {} - AND "column" = {}"#, - SQL_PARAM, SQL_PARAM, SQL_PARAM - ), - ); - let mut message_query = sqlx_query(&message_sql); - message_query = message_query.bind(&table_name); - message_query = message_query.bind(&row_number); - message_query = message_query.bind(parent_col); - let message_rows = { - if let None = tx { - message_query.fetch_all(pool).await? - } else { - message_query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - // If there are no rows in the message table then the cell is legitimately empty and - // we can skip this row: - if message_rows.is_empty() { - continue; - } - - let mut has_dt_violation = false; - for mrow in &message_rows { - let rule: &str = mrow.get_unchecked("rule"); - if rule.starts_with("datatype:") { - has_dt_violation = true; - break; - } else { - let value: &str = mrow.get_unchecked("value"); - parent_val = value.to_string(); - } - } - // If the value in the parent column has already been deemed to be invalid because - // of a datatype error, then just skip this row. This is to avoid potential database - // errors that might arise if we compare a numeric with a non-numeric type. - if has_dt_violation { - continue; - } - - // Otherwise check if the value from the message table is in the child column. If it - // is there then we are fine, and we can go on to the next row. - let sql_type = - get_sql_type_from_global_config(&config, &table_name, &parent_col, pool) - .unwrap(); - let sql_param = cast_sql_param_from_text(&sql_type); - let sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT 1 FROM "{}" WHERE "{}" = {} LIMIT 1"#, - table_name, child_col, sql_param - ), - ); - let query = sqlx_query(&sql).bind(parent_val.to_string()); - let rows = { - if let None = tx { - query.fetch_all(pool).await? - } else { - query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - if rows.len() > 0 { - continue; - } + let parent_val = get_column_value(&row, &parent_col, &parent_sql_type); + results.push(json!({ + "row_number": row_number as u32, + "column": parent_col, + "value": parent_val, + "level": "error", + "rule": "tree:foreign", + "message": format!("Value '{}' of column {} is not in column {}", + parent_val, parent_col, child_col).as_str(), + })); } - - results.push(json!({ - "row_number": row_number as u32, - "column": parent_col, - "value": parent_val, - "level": "error", - "rule": "tree:foreign", - "message": format!("Value '{}' of column {} is not in column {}", - parent_val, parent_col, child_col).as_str(), - })); } } @@ -894,9 +758,11 @@ pub async fn validate_rows_trees( let context = row.clone(); let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells - // that have datatype violations. We exclude the latter because they can result in + // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - if cell.nulltype == None && (cell.valid || !contains_dt_violation(&cell.messages)) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_trees( config, pool, @@ -951,9 +817,11 @@ pub async fn validate_rows_constraints( for column_name in &column_names { let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells - // that have datatype violations. We exclude the latter because they can result in + // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - if cell.nulltype == None && (cell.valid || !contains_dt_violation(&cell.messages)) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_foreign_constraints( config, pool, @@ -1139,28 +1007,13 @@ fn result_row_to_config_map(incoming: &ResultRow) -> SerdeMap { outgoing } -/// Given a message list, determine if it contains a message corresponding to a dataype violation -fn contains_dt_violation(messages: &Vec) -> bool { - let mut contains_dt_violation = false; - for m in messages { - if m.get("rule") - .and_then(|r| r.as_str()) - .unwrap_or_else(|| "") - .starts_with("datatype:") - { - contains_dt_violation = true; - break; - } - } - contains_dt_violation -} - /// Generate a SQL Select clause that is a union of: (a) the literal values of the given extra row, /// and (b) a Select statement over `table_name` of all the fields in the extra row. fn select_with_extra_row( config: &SerdeMap, extra_row: &ResultRow, - table_name: &str, + table: &str, + effective_table: &str, pool: &AnyPool, ) -> (String, Vec) { let extra_row_len = extra_row.contents.keys().len(); @@ -1173,7 +1026,7 @@ fn select_with_extra_row( let mut second_select = String::from(r#"SELECT "row_number", "#); for (i, (key, content)) in extra_row.contents.iter().enumerate() { - let sql_type = get_sql_type_from_global_config(&config, &table_name, &key, pool).unwrap(); + let sql_type = get_sql_type_from_global_config(&config, &table, &key, pool).unwrap(); let sql_param = cast_sql_param_from_text(&sql_type); // enumerate() begins from 0 but we need to begin at 1: let i = i + 1; @@ -1184,7 +1037,7 @@ fn select_with_extra_row( first_select.push_str(", "); second_select.push_str(", "); } else { - second_select.push_str(format!(r#" FROM "{}""#, table_name).as_str()); + second_select.push_str(format!(r#" FROM "{}""#, effective_table).as_str()); } } @@ -1195,7 +1048,7 @@ fn select_with_extra_row( ( format!( r#"WITH "{}_ext" AS ({} UNION ALL {})"#, - table_name, first_select, second_select + effective_table, first_select, second_select ), params, ) @@ -1235,25 +1088,20 @@ fn with_tree_sql( let sql = format!( r#"WITH RECURSIVE "tree" AS ( - {} - SELECT "{}", "{}" - FROM "{}" - {} + {extra_clause} + SELECT "{child_col}", "{parent_col}" + FROM "{effective_table_name}" + {under_sql} UNION ALL - SELECT "t1"."{}", "t1"."{}" - FROM "{}" AS "t1" - JOIN "tree" AS "t2" ON "t2"."{}" = "t1"."{}" + SELECT "t1"."{child_col}", "t1"."{parent_col}" + FROM "{effective_table_name}" AS "t1" + JOIN "tree" AS "t2" ON "t2"."{parent_col}" = "t1"."{child_col}" )"#, - extra_clause, - child_col, - parent_col, - effective_table_name, - under_sql, - child_col, - parent_col, - effective_table_name, - parent_col, - child_col + extra_clause = extra_clause, + child_col = child_col, + parent_col = parent_col, + effective_table_name = effective_table_name, + under_sql = under_sql, ); (sql, params) @@ -1795,6 +1643,7 @@ async fn validate_cell_trees( get_sql_type_from_global_config(&config, &table_name, &parent_col, pool).unwrap(); let parent_sql_param = cast_sql_param_from_text(&parent_sql_type); let parent_val = cell.value.clone(); + let view_name = format!("{}_view", table_name); for tkey in tkeys { let child_col = tkey.get("child").and_then(|c| c.as_str()).unwrap(); let child_sql_type = @@ -1830,10 +1679,10 @@ async fn validate_cell_trees( let table_name_ext; let extra_clause; if prev_selects.is_empty() { - table_name_ext = table_name.clone(); + table_name_ext = view_name.clone(); extra_clause = String::from(""); } else { - table_name_ext = format!("{}_ext", table_name); + table_name_ext = format!("{}_ext", view_name); extra_clause = format!( r#"WITH "{}" AS ( SELECT "{}", "{}" @@ -1841,7 +1690,7 @@ async fn validate_cell_trees( UNION ALL {} )"#, - table_name_ext, child_col, parent_col, table_name, prev_selects + table_name_ext, child_col, parent_col, view_name, prev_selects ); } @@ -1994,15 +1843,16 @@ async fn validate_cell_unique_constraints( } if is_primary || is_unique || is_tree_child { + let view_name = format!("{}_view", table_name); let mut with_sql = String::new(); - let except_table = format!("{}_exc", table_name); + let except_table = format!("{}_exc", view_name); if let Some(row_number) = row_number { with_sql = format!( r#"WITH "{}" AS ( SELECT * FROM "{}" WHERE "row_number" != {} ) "#, - except_table, table_name, row_number + except_table, view_name, row_number ); } @@ -2010,7 +1860,7 @@ async fn validate_cell_unique_constraints( if !with_sql.is_empty() { query_table = except_table; } else { - query_table = table_name.to_string(); + query_table = view_name.to_string(); } let sql_type = diff --git a/test/expected/messages.tsv b/test/expected/messages.tsv index 353d0906..1ed440af 100644 --- a/test/expected/messages.tsv +++ b/test/expected/messages.tsv @@ -30,6 +30,7 @@ table3 6 parent error tree:foreign Value 'owl:Thing' of column parent is not in table3 7 source error datatype:nonspace source should be text without whitespace CO B table3 7 source error datatype:prefix source should be a prefix for a CURIE CO B table3 7 source error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 7 source error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 8 id error key:unique Values of id must be unique COB:0000013 table3 10 id error key:unique Values of id must be unique VO:0000001 table3 10 label error key:primary Values of label must be unique vaccine diff --git a/test/expected/messages_a1.tsv b/test/expected/messages_a1.tsv index 04153339..8f94c1f4 100644 --- a/test/expected/messages_a1.tsv +++ b/test/expected/messages_a1.tsv @@ -30,6 +30,7 @@ table3 E6 error tree:foreign Value 'owl:Thing' of column parent is not in column table3 A7 error datatype:nonspace source should be text without whitespace CO B table3 A7 error datatype:prefix source should be a prefix for a CURIE CO B table3 A7 error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 A7 error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 B8 error key:unique Values of id must be unique COB:0000013 table3 B10 error key:unique Values of id must be unique VO:0000001 table3 C10 error key:primary Values of label must be unique vaccine diff --git a/test/expected/messages_after_api_test.tsv b/test/expected/messages_after_api_test.tsv index a11ccb0f..f7fe4d42 100644 --- a/test/expected/messages_after_api_test.tsv +++ b/test/expected/messages_after_api_test.tsv @@ -32,6 +32,7 @@ table3 6 parent error tree:foreign Value 'owl:Thing' of column parent is not in table3 7 source error datatype:nonspace source should be text without whitespace CO B table3 7 source error datatype:prefix source should be a prefix for a CURIE CO B table3 7 source error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 7 source error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 8 id error key:unique Values of id must be unique COB:0000013 table3 10 id error key:unique Values of id must be unique VO:0000001 table3 10 label error key:primary Values of label must be unique vaccine