From 2dd98e99df3915252ce43c4fdafc5918a7aa6e20 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Mon, 12 Feb 2024 12:01:11 -0500 Subject: [PATCH] use new config structs while loading tables --- src/lib.rs | 53 +++++++++++++----------------------- src/main.rs | 35 ++++++++++++------------ src/validate.rs | 72 +++++++++++++------------------------------------ src/valve.rs | 23 +++------------- 4 files changed, 60 insertions(+), 123 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dd85412d..063bfe0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -917,7 +917,6 @@ pub fn get_compiled_rule_conditions( let tables_config = &config.table; let rules_config = &config.rule; for (rules_table, table_rules) in rules_config.iter() { - println!("Rules for {}", rules_table); for (column_rule_key, column_rules) in table_rules.iter() { for rule in column_rules { let table_columns = tables_config @@ -3088,7 +3087,7 @@ pub fn get_table_ddl( .collect::>() }; - let (primaries, uniques, foreigns, trees, unders) = { + let (primaries, uniques, foreigns, trees, _unders) = { // Conflict tables have no database constraints: if table_name.ends_with("_conflict") { (vec![], vec![], vec![], vec![], vec![]) @@ -3304,7 +3303,7 @@ pub fn sort_messages( /// and information messages generated are added to messages_stats, the contents of which will /// later be written to stderr. pub async fn make_inserts( - config_old: &SerdeMap, + config: &ValveConfig, table_name: &String, rows: &mut Vec, chunk_number: usize, @@ -3332,7 +3331,7 @@ pub async fn make_inserts( } fn generate_sql( - config_old: &SerdeMap, + config: &ValveConfig, main_table: &String, columns: &Vec, rows: &mut Vec, @@ -3354,9 +3353,8 @@ pub async fn make_inserts( let mut conflict_params = vec![]; let mut message_lines = vec![]; let mut message_params = vec![]; - let dummy_config_new = ValveConfig::default(); - let sorted_datatypes = get_sorted_datatypes(&dummy_config_new); - let conflict_columns = get_conflict_columns(&dummy_config_new, main_table); + let sorted_datatypes = get_sorted_datatypes(config); + let conflict_columns = get_conflict_columns(config, main_table); for (i, row) in rows.iter_mut().enumerate() { // enumerate begins at 0 but we need to begin at 1: let i = i + 1; @@ -3368,12 +3366,7 @@ pub async fn make_inserts( let cell = row.contents.get(column).unwrap(); // Insert the value of the cell into the column unless inserting it will cause a db // error or it has the nulltype field set, in which case insert NULL: - let sql_type = get_sql_type_from_global_config( - &ValveConfig::default(), - &main_table, - column, - pool, - ); + let sql_type = get_sql_type_from_global_config(config, &main_table, column, pool); if cell.nulltype != None || is_sql_type_error(&sql_type, &cell.value) { row_values.push(String::from("NULL")); } else { @@ -3485,21 +3478,13 @@ pub async fn make_inserts( // Use the "column_order" field of the table config for this table to retrieve the column names // in the correct order: - let column_names = config_old - .get("table") - .and_then(|t| t.get(table_name)) - .and_then(|t| t.get("column_order")) - .and_then(|c| c.as_array()) - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(); + let column_names = &config.table.get(table_name).unwrap().column_order; let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = generate_sql( - &config_old, + config, &table_name, - &column_names, + column_names, rows, chunk_number, messages_stats, @@ -3522,7 +3507,7 @@ pub async fn make_inserts( /// them to the table. If the verbose flag is set to true, error/warning/info stats will be /// collected in messages_stats and later written to stderr. pub async fn insert_chunk( - config_old: &SerdeMap, + config: &ValveConfig, pool: &AnyPool, table_name: &String, rows: &mut Vec, @@ -3534,7 +3519,7 @@ pub async fn insert_chunk( // First, do the tree validation. TODO: I don't remember why this needs to be done first, but // it does. Add a comment here explaining why. if validate { - validate_rows_trees(config_old, pool, table_name, rows).await?; + validate_rows_trees(config, pool, table_name, rows).await?; } // Try to insert the rows to the db first without validating unique and foreign constraints. @@ -3552,7 +3537,7 @@ pub async fn insert_chunk( tmp_messages_stats.insert("info".to_string(), 0); let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = make_inserts( - config_old, + config, table_name, rows, chunk_number, @@ -3604,7 +3589,7 @@ pub async fn insert_chunk( } Err(e) => { if validate { - validate_rows_constraints(config_old, pool, table_name, rows).await?; + validate_rows_constraints(config, pool, table_name, rows).await?; let ( main_sql, main_params, @@ -3613,7 +3598,7 @@ pub async fn insert_chunk( message_sql, message_params, ) = make_inserts( - config_old, + config, table_name, rows, chunk_number, @@ -3658,7 +3643,7 @@ pub async fn insert_chunk( /// to the table. If the verbose flag is set to true, error/warning/info stats will be collected in /// messages_stats and later written to stderr. pub async fn insert_chunks( - config_old: &SerdeMap, + config: &ValveConfig, pool: &AnyPool, compiled_datatype_conditions: &HashMap, compiled_rule_conditions: &HashMap>>, @@ -3675,7 +3660,7 @@ pub async fn insert_chunks( let mut intra_validated_rows = { let only_nulltype = !validate; validate_rows_intra( - config_old, + config, compiled_datatype_conditions, compiled_rule_conditions, table_name, @@ -3685,7 +3670,7 @@ pub async fn insert_chunks( ) }; insert_chunk( - config_old, + config, pool, table_name, &mut intra_validated_rows, @@ -3719,7 +3704,7 @@ pub async fn insert_chunks( workers.push(scope.spawn(move |_| { let only_nulltype = !validate; validate_rows_intra( - config_old, + config, compiled_datatype_conditions, compiled_rule_conditions, table_name, @@ -3740,7 +3725,7 @@ pub async fn insert_chunks( for (chunk_number, mut intra_validated_rows) in results { insert_chunk( - config_old, + config, pool, table_name, &mut intra_validated_rows, diff --git a/src/main.rs b/src/main.rs index ac9939b7..c2e93c80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,15 +3,17 @@ mod api_test; use crate::api_test::run_api_tests; use argparse::{ArgumentParser, Store, StoreTrue}; use ontodev_valve::{valve::Valve, valve::ValveError}; -use serde_json::{from_str, Value as SerdeValue}; use std::{env, process}; #[async_std::main] async fn main() -> Result<(), ValveError> { // Command line parameters and their default values. See below for descriptions. Note that some // of these are mutually exclusive. This is accounted for below. + // TODO: Use a more powerful command-line parser library that can automatically take care of // things like mutually exclusive options, since argparse doesn't seem to be able to do it. + + let mut ad_hoc = false; // TODO: Remove the ad_hoc parameter before merging this PR. let mut verbose = false; let mut api_test = false; let mut dump_config = false; @@ -33,6 +35,10 @@ async fn main() -> Result<(), ValveError> { // this block limits scope of borrows by ap.refer() method let mut ap = ArgumentParser::new(); ap.set_description(r#"Valve is a lightweight validation engine written in rust."#); + + // TODO: Remove the ad_hoc parameter before merging this PR. + ap.refer(&mut ad_hoc) + .add_option(&["--ad_hoc"], StoreTrue, r#"Do something ad hoc."#); ap.refer(&mut verbose).add_option( &["--verbose"], StoreTrue, @@ -138,6 +144,8 @@ async fn main() -> Result<(), ValveError> { let advice = format!("Run `{} --help` for command line usage.", program_name); let mutually_exclusive_options = vec![ + // TODO: Remove the ad_hoc parameter before merging this PR. + ad_hoc, api_test, dump_config, dump_schema, @@ -173,7 +181,11 @@ async fn main() -> Result<(), ValveError> { process::exit(1); } - if api_test { + // TODO: Remove the ad_hoc parameter before merging this PR. + if ad_hoc { + let valve = Valve::build(&source, &destination, verbose, initial_load).await?; + valve.save_all_tables(&None)?; + } else if api_test { run_api_tests(&source, &destination).await?; } else if save_all || save != "" { let valve = Valve::build(&source, &destination, verbose, initial_load).await?; @@ -192,21 +204,10 @@ async fn main() -> Result<(), ValveError> { } } else if dump_config { let valve = Valve::build(&source, &destination, verbose, initial_load).await?; - let mut config = valve.config.clone(); - let datatype_conditions = format!("{:?}", valve.datatype_conditions).replace(r"\", r"\\"); - let datatype_conditions: SerdeValue = from_str(&datatype_conditions).unwrap(); - config.insert(String::from("datatype_conditions"), datatype_conditions); - - let structure_conditions = format!("{:?}", valve.structure_conditions).replace(r"\", r"\\"); - let structure_conditions: SerdeValue = from_str(&structure_conditions).unwrap(); - config.insert(String::from("structure_conditions"), structure_conditions); - - let rule_conditions = format!("{:?}", valve.rule_conditions).replace(r"\", r"\\"); - let rule_conditions: SerdeValue = from_str(&rule_conditions).unwrap(); - config.insert(String::from("rule_conditions"), rule_conditions); - - let config = serde_json::to_string(&config).unwrap(); - println!("{}", config); + // TODO: Somehow convert this output to JSON. We will likely have to rewrite the display() + // functions for the structs involved. Note that this is required for the + // test/generate_random_test_data.py to work. + println!("{:#?}", valve); } else if dump_schema { let valve = Valve::build(&source, &destination, verbose, initial_load).await?; valve.dump_schema().await?; diff --git a/src/validate.rs b/src/validate.rs index a2291992..8fadec4b 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -7,7 +7,7 @@ use crate::{ ValveConfig, ValveDatatypeConfig, ValveError, ValveRow, ValveRuleConfig, ValveTreeConstraint, }, - ColumnRule, CompiledCondition, SerdeMap, + ColumnRule, CompiledCondition, }; use chrono::Utc; use indexmap::IndexMap; @@ -519,20 +519,12 @@ pub async fn validate_tree_foreign_keys( /// Given a config map, a database connection pool, a table name, and a number of rows to validate, /// perform tree validation on the rows and return the validated results. pub async fn validate_rows_trees( - config_old: &SerdeMap, + config: &ValveConfig, pool: &AnyPool, table_name: &String, rows: &mut Vec, ) -> Result<(), ValveError> { - let column_names = config_old - .get("table") - .and_then(|t| t.get(table_name)) - .and_then(|t| t.get("column_order")) - .and_then(|c| c.as_array()) - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(); + let column_names = &config.table.get(table_name).unwrap().column_order; let mut result_rows = vec![]; for row in rows { @@ -540,21 +532,16 @@ pub async fn validate_rows_trees( row_number: None, contents: IndexMap::new(), }; - for column_name in &column_names { + for column_name in column_names { let context = row.clone(); let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - let sql_type = get_sql_type_from_global_config( - &ValveConfig::default(), - table_name, - &column_name, - pool, - ); + let sql_type = get_sql_type_from_global_config(config, table_name, &column_name, pool); if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_trees( - &ValveConfig::default(), + config, pool, None, table_name, @@ -583,20 +570,12 @@ pub async fn validate_rows_trees( /// validate foreign and unique constraints, where the latter include primary and "tree child" keys /// (which imply unique constraints) and return the validated results. pub async fn validate_rows_constraints( - config_old: &SerdeMap, + config: &ValveConfig, pool: &AnyPool, table_name: &String, rows: &mut Vec, ) -> Result<(), ValveError> { - let column_names = config_old - .get("table") - .and_then(|t| t.get(table_name)) - .and_then(|t| t.get("column_order")) - .and_then(|c| c.as_array()) - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(); + let column_names = &config.table.get(table_name).unwrap().column_order; let mut result_rows = vec![]; for row in rows.iter_mut() { @@ -604,20 +583,15 @@ pub async fn validate_rows_constraints( row_number: None, contents: IndexMap::new(), }; - for column_name in &column_names { + for column_name in column_names { let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - let sql_type = get_sql_type_from_global_config( - &ValveConfig::default(), - table_name, - &column_name, - pool, - ); + let sql_type = get_sql_type_from_global_config(config, table_name, &column_name, pool); if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_foreign_constraints( - &ValveConfig::default(), + config, pool, None, table_name, @@ -628,7 +602,7 @@ pub async fn validate_rows_constraints( .await?; validate_cell_unique_constraints( - &ValveConfig::default(), + config, pool, None, table_name, @@ -657,7 +631,7 @@ pub async fn validate_rows_constraints( /// table, and a number of rows to validate, run intra-row validatation on all of the rows and /// return the validated versions. pub fn validate_rows_intra( - config_old: &SerdeMap, + config: &ValveConfig, compiled_datatype_conditions: &HashMap, compiled_rule_conditions: &HashMap>>, table_name: &String, @@ -688,22 +662,14 @@ pub fn validate_rows_intra( result_row.contents.insert(column.to_string(), result_cell); } - let column_names = config_old - .get("table") - .and_then(|t| t.get(table_name)) - .and_then(|t| t.get("column_order")) - .and_then(|c| c.as_array()) - .unwrap() - .iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(); + let column_names = &config.table.get(table_name).unwrap().column_order; // We begin by determining the nulltype of all of the cells, since the rules // validation step requires that all cells have this information. - for column_name in &column_names { + for column_name in column_names { let cell = result_row.contents.get_mut(column_name).unwrap(); validate_cell_nulltype( - &ValveConfig::default(), + config, compiled_datatype_conditions, table_name, &column_name, @@ -712,11 +678,11 @@ pub fn validate_rows_intra( } if !only_nulltype { - for column_name in &column_names { + for column_name in column_names { let context = result_row.clone(); let cell = result_row.contents.get_mut(column_name).unwrap(); validate_cell_rules( - &ValveConfig::default(), + config, compiled_rule_conditions, table_name, &column_name, @@ -726,7 +692,7 @@ pub fn validate_rows_intra( if cell.nulltype == None { validate_cell_datatype( - &ValveConfig::default(), + config, compiled_datatype_conditions, table_name, &column_name, diff --git a/src/valve.rs b/src/valve.rs index f9d8c483..ab8418ca 100644 --- a/src/valve.rs +++ b/src/valve.rs @@ -11,8 +11,8 @@ use crate::{ switch_undone_state, update_row_tx, validate::{validate_row_tx, validate_tree_foreign_keys, validate_under, with_tree_sql}, valve_grammar::StartParser, - verify_table_deps_and_sort, warn, ColumnRule, CompiledCondition, ParsedStructure, SerdeMap, - CHUNK_SIZE, SQL_PARAM, + verify_table_deps_and_sort, warn, ColumnRule, CompiledCondition, ParsedStructure, CHUNK_SIZE, + SQL_PARAM, }; use chrono::Utc; use csv::{QuoteStyle, ReaderBuilder, WriterBuilder}; @@ -172,7 +172,6 @@ pub struct ValveConfig { #[derive(Clone, Debug)] pub struct Valve { /// The valve configuration map. - pub config_old: SerdeMap, // TODO: Remove this field later. pub config: ValveConfig, /// The full list of tables managed by valve, in dependency order. pub sorted_table_list: Vec, @@ -276,10 +275,7 @@ impl Valve { let rule_conditions = get_compiled_rule_conditions(&config, &datatype_conditions, &parser); let structure_conditions = get_parsed_structure_conditions(&config, &parser); - let config_old = SerdeMap::new(); - Ok(Self { - config_old: config_old, config: config, sorted_table_list: sorted_table_list.clone(), table_dependencies_in: table_dependencies_in, @@ -1014,9 +1010,6 @@ impl Valve { self.load_tables(&table_list, validate).await } - ///////////////////////////////// - // TODO: - ///////////////////////////////// /// Given a vector of table names, load those tables in the given order. If `validate` is false, /// just try to insert all rows, irrespective of whether they are valid or not or will possibly /// trigger a db error. @@ -1044,15 +1037,7 @@ impl Valve { continue; } let table_name = table_name.to_string(); - let path = String::from( - self.config_old - .get("table") - .and_then(|t| t.as_object()) - .and_then(|o| o.get(&table_name)) - .and_then(|n| n.get("path")) - .and_then(|p| p.as_str()) - .unwrap(), - ); + let path = String::from(&self.config.table.get(&table_name).unwrap().path); let mut rdr = { match File::open(path.clone()) { Err(e) => { @@ -1099,7 +1084,7 @@ impl Valve { // logic: let chunks = records.chunks(CHUNK_SIZE); insert_chunks( - &self.config_old, + &self.config, &self.pool, &self.datatype_conditions, &self.rule_conditions,