From efabdd47af7b44086d94af6a70cd0f49dd7ee3b1 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Thu, 21 Dec 2023 12:57:05 -0500 Subject: [PATCH] use table dependencies to determine deletion order --- src/lib.rs | 215 +++++++++++++++++++++++++----------------------- src/validate.rs | 9 +- 2 files changed, 119 insertions(+), 105 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d67e5291..6abfb11c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,10 +84,32 @@ lazy_static! { pub type SerdeMap = serde_json::Map; pub type ValveRow = serde_json::Map; +/// Write a debugging message to STDERR. #[macro_export] -macro_rules! valve_log { +macro_rules! debug { () => (eprintln!()); - ($($arg:tt)*) => (eprintln!("{} - {}", Utc::now(), format_args!($($arg)*))); + ($($arg:tt)*) => (eprintln!("{} - DEBUG {}", Utc::now(), format_args!($($arg)*))); +} + +/// Write an information message to STDERR. +#[macro_export] +macro_rules! info { + () => (eprintln!()); + ($($arg:tt)*) => (eprintln!("{} - INFO {}", Utc::now(), format_args!($($arg)*))); +} + +/// Write a warning message to STDERR. +#[macro_export] +macro_rules! warn { + () => (eprintln!()); + ($($arg:tt)*) => (eprintln!("{} - WARN {}", Utc::now(), format_args!($($arg)*))); +} + +/// Write an error message to STDERR. +#[macro_export] +macro_rules! error { + () => (eprintln!()); + ($($arg:tt)*) => (eprintln!("{} - ERROR {}", Utc::now(), format_args!($($arg)*))); } /// Represents a structure such as those found in the `structure` column of the `column` table in @@ -350,10 +372,10 @@ impl Valve { /// (1) the number of columns or any of their names differs from their configured values, or /// the order of database columns differs from the configured order; (2) The values in the /// table table differ from their configured values; (3) The SQL type of one or more columns - /// does not match the configured SQL type for that column; (3) All columns with a 'unique', - /// 'primary', or 'from(table, column)' in their column configuration are associated, in the - /// database, with a unique constraint, primary key, and foreign key, respectively, and vice - /// versa. + /// does not match the configured SQL type for that column; (3) Some column with a 'unique', + /// 'primary', or 'from(table, column)' in its column configuration fails to be associated, in + /// the database, with a unique constraint, primary key, or foreign key, respectively; or vice + /// versa; (4) The table does not exist in the database. async fn table_has_changed(&self, table: &str) -> Result { // A clojure that, given a parsed structure condition, a table and column name, and an // unsigned integer representing whether the given column, in the case of a SQLite database, @@ -596,7 +618,7 @@ impl Valve { let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; if rows.len() == 0 { if self.verbose { - valve_log!( + info!( "The table '{}' will be recreated as it does not exist in the database", table ); @@ -629,7 +651,7 @@ impl Valve { let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; if rows.len() == 0 { if self.verbose { - valve_log!( + info!( "The table '{}' will be recreated as it does not exist in the database", table ); @@ -659,12 +681,10 @@ impl Valve { .collect::>(); if db_column_order != configured_column_order { if self.verbose { - valve_log!( + info!( "The table '{}' will be recreated since the database columns: {:?} \ and/or their order does not match the configured columns: {:?}", - table, - db_column_order, - configured_column_order + table, db_column_order, configured_column_order ); } return Ok(true); @@ -696,7 +716,7 @@ impl Valve { let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; if rows.len() == 0 { if self.verbose { - valve_log!( + info!( "The table '{table}' will be recreated because the entries in the \ table table for '{table}' have changed.", table = table @@ -705,10 +725,7 @@ impl Valve { return Ok(true); } else if rows.len() > 1 { if self.verbose { - valve_log!( - "WARN more than one row was returned from the query '{}'", - sql - ); + warn!("More than one row was returned from the query '{}'", sql); } } } @@ -744,7 +761,7 @@ impl Valve { && (c.starts_with("varchar") || c.starts_with("character varying"))) { if self.verbose { - valve_log!( + info!( "The table '{}' will be recreated because the SQL type of column '{}', \ {}, does not match the configured value: {}", table, @@ -768,13 +785,11 @@ impl Valve { .unwrap(); if structure_has_changed(&parsed_structure, table, &cname, &pk)? { if self.verbose { - valve_log!( + info!( "The table '{}' will be recreated because the database \ constraints for column '{}' do not match the configured \ structure, '{}'", - table, - cname, - structure + table, cname, structure ); } return Ok(true); @@ -912,6 +927,18 @@ impl Valve { Ok(()) } + /// TODO: Add docstring here. + fn get_dependent_tables(&self, table: &str) -> Vec { + let mut dependent_tables = vec![]; + let direct_deps = self.table_dependencies.get(table).unwrap().to_vec(); + for direct_dep in direct_deps { + let mut indirect_deps = self.get_dependent_tables(&direct_dep); + dependent_tables.append(&mut indirect_deps); + dependent_tables.push(direct_dep); + } + dependent_tables + } + /// Create all configured database tables and views if they do not already exist as configured. pub async fn create_all_tables(&self) -> Result<&Self, sqlx::Error> { // DatabaseError @@ -920,24 +947,9 @@ impl Valve { let setup_statements = self.get_setup_statements().await?; let sorted_table_list = self.get_tables_ordered_for_creation(); - let mut once_dropped = false; - for (i, table) in sorted_table_list.iter().enumerate() { + for table in &sorted_table_list { if self.table_has_changed(*table).await? { - if self.interactive { - // TODO: Prompt the user to confirm whether she wants to automatically drop any - // flagged tables. - } - if !once_dropped { - // TODO: Rethink this. - let mut tables_to_drop = vec![""; sorted_table_list.len() - i]; - tables_to_drop.clone_from_slice(&sorted_table_list[i..]); - tables_to_drop.reverse(); - for table in tables_to_drop { - self.drop_tables(vec![table]).await?; - } - once_dropped = true; - } - + self.drop_tables(&vec![table]).await?; let table_statements = setup_statements.get(*table).unwrap(); for stmt in table_statements { self.execute_sql(stmt).await?; @@ -979,7 +991,7 @@ impl Valve { // DatabaseError // Drop all of the database tables in the reverse of their sorted order: - self.drop_tables(self.get_tables_ordered_for_deletion()) + self.drop_tables(&self.get_tables_ordered_for_deletion()) .await?; Ok(self) } @@ -987,21 +999,19 @@ impl Valve { /// Given a vector of table names, /// drop those tables, in the given order. /// Return an error on invalid table name or database problem. - pub async fn drop_tables(&self, tables: Vec<&str>) -> Result<&Self, sqlx::Error> { + pub async fn drop_tables(&self, tables: &Vec<&str>) -> Result<&Self, sqlx::Error> { // DatabaseError - // TODO: Re-think how this is done. let drop_list = { let mut drop_list = vec![]; - let drop_order = self.get_tables_ordered_for_deletion(); - for table in &tables { - let idx = drop_order.iter().position(|s| s == table).unwrap(); - for i in 0..idx + 1 { - let dep_table = drop_order[i]; + for table in tables { + let dependent_tables = self.get_dependent_tables(table); + for dep_table in dependent_tables { if !drop_list.contains(&dep_table) { - drop_list.push(drop_order[i]); + drop_list.push(dep_table.to_string()); } } + drop_list.push(table.to_string()); } drop_list }; @@ -1009,7 +1019,9 @@ impl Valve { if self.interactive { let auto_drops = drop_list .iter() - .filter(|t| !tables.contains(t) && !block_on(self.table_exists(t)).unwrap()) + .filter(|t| { + !tables.contains(&t.as_str()) && !block_on(self.table_exists(t)).unwrap() + }) .collect::>(); if auto_drops.len() > 0 { // TODO: prompt the user to confirm whether she wants to automatically drop @@ -1037,7 +1049,7 @@ impl Valve { pub async fn truncate_all_tables(&self) -> Result<&Self, sqlx::Error> { // DatabaseError - self.truncate_tables(self.get_tables_ordered_for_deletion()) + self.truncate_tables(&self.get_tables_ordered_for_deletion()) .await?; Ok(self) } @@ -1045,23 +1057,21 @@ impl Valve { /// Given a vector of table names, /// truncate those tables, in the given order. /// Return an error on invalid table name or database problem. - pub async fn truncate_tables(&self, tables: Vec<&str>) -> Result<&Self, sqlx::Error> { + pub async fn truncate_tables(&self, tables: &Vec<&str>) -> Result<&Self, sqlx::Error> { // ConfigOrDatabaseError self.create_all_tables().await?; - // TODO: Re-think how this is done. let truncate_list = { let mut truncate_list = vec![]; - let truncate_order = self.get_tables_ordered_for_deletion(); - for table in &tables { - let idx = truncate_order.iter().position(|s| s == table).unwrap(); - for i in 0..idx + 1 { - let dep_table = truncate_order[i]; + for table in tables { + let dependent_tables = self.get_dependent_tables(table); + for dep_table in dependent_tables { if !truncate_list.contains(&dep_table) { - truncate_list.push(truncate_order[i]); + truncate_list.push(dep_table.to_string()); } } + truncate_list.push(table.to_string()); } truncate_list }; @@ -1069,7 +1079,9 @@ impl Valve { if self.interactive { let auto_truncates = truncate_list .iter() - .filter(|t| !tables.contains(t) && !block_on(self.table_exists(t)).unwrap()) + .filter(|t| { + !tables.contains(&t.as_str()) && !block_on(self.table_exists(t)).unwrap() + }) .collect::>(); if auto_truncates.len() > 0 { // TODO: prompt the user to confirm whether she wants to automatically truncate @@ -1110,9 +1122,9 @@ impl Valve { let table_list = self.get_tables_ordered_for_creation(); if self.verbose { - valve_log!("Processing {} tables.", table_list.len()); + info!("Processing {} tables.", table_list.len()); } - self.load_tables(table_list, true).await + self.load_tables(&table_list, true).await } /// Given a vector of table names, @@ -1126,14 +1138,14 @@ impl Valve { /// progress messages to stderr during load. pub async fn load_tables( &self, - table_list: Vec<&str>, + table_list: &Vec<&str>, _validate: bool, ) -> Result<&Self, sqlx::Error> { // ConfigOrDatabaseError let mut list_for_deletion = table_list.clone(); list_for_deletion.reverse(); - self.truncate_tables(list_for_deletion).await?; + self.truncate_tables(&list_for_deletion).await?; let num_tables = table_list.len(); let mut total_errors = 0; @@ -1141,7 +1153,7 @@ impl Valve { let mut total_infos = 0; let mut table_num = 1; for table_name in table_list { - if table_name == "message" || table_name == "history" { + if *table_name == "message" || *table_name == "history" { continue; } let table_name = table_name.to_string(); @@ -1157,7 +1169,7 @@ impl Valve { let mut rdr = { match File::open(path.clone()) { Err(e) => { - valve_log!("WARN: Unable to open '{}': {}", path.clone(), e); + warn!("Unable to open '{}': {}", path.clone(), e); continue; } Ok(table_file) => csv::ReaderBuilder::new() @@ -1167,7 +1179,7 @@ impl Valve { } }; if self.verbose { - valve_log!("Loading table {}/{}: {}", table_num, num_tables, table_name); + info!("Loading table {}/{}: {}", table_num, num_tables, table_name); } table_num += 1; @@ -1281,7 +1293,7 @@ impl Valve { "{} errors, {} warnings, and {} information messages generated for {}", errors, warnings, infos, table_name ); - valve_log!("{}", status_message); + info!("{}", status_message); total_errors += errors; total_warnings += warnings; total_infos += infos; @@ -1289,11 +1301,9 @@ impl Valve { } if self.verbose { - valve_log!( + info!( "Loading complete with {} errors, {} warnings, and {} information messages", - total_errors, - total_warnings, - total_infos + total_errors, total_warnings, total_infos ); } Ok(self) @@ -1548,7 +1558,7 @@ impl Valve { // DatabaseError let last_change = match self.get_record_to_undo().await? { None => { - valve_log!("WARN: Nothing to undo."); + warn!("Nothing to undo."); return Ok(None); } Some(r) => r, @@ -1639,13 +1649,13 @@ impl Valve { // DatabaseError let last_undo = match self.get_record_to_redo().await? { None => { - valve_log!("WARN: Nothing to redo."); + warn!("Nothing to redo."); return Ok(None); } Some(last_undo) => { let undone_by = last_undo.try_get_raw("undone_by")?; if undone_by.is_null() { - valve_log!("WARN: Nothing to redo."); + warn!("Nothing to redo."); return Ok(None); } last_undo @@ -2262,10 +2272,10 @@ fn read_config_files( continue; } Some(p) if !Path::new(p).is_file() => { - valve_log!("WARN: File does not exist {}", p); + warn!("File does not exist {}", p); } Some(p) if Path::new(p).canonicalize().is_err() => { - valve_log!("WARN: File path could not be made canonical {}", p); + warn!("File path could not be made canonical {}", p); } Some(p) => path = Some(p.to_string()), }; @@ -2509,6 +2519,10 @@ fn read_config_files( &tables_config .keys() .cloned() + // TODO: Should we not remove message and history to the sorted table list here? If so, + // then we need to check if there is anywhere in the code where we assume that they are not + // in the list, and change it. One place where this is definitely assumed is in + // get_tables_ordered_for_creation() .filter(|m| m != "history" && m != "message") .collect(), &constraints_config, @@ -3232,10 +3246,9 @@ async fn get_rows_to_update( let updates_before = match query_as_if.kind { QueryAsIfKind::Add => { if let None = query_as_if.row { - valve_log!( - "WARN: No row in query_as_if: {:?} for {:?}", - query_as_if, - query_as_if.kind + warn!( + "No row in query_as_if: {:?} for {:?}", + query_as_if, query_as_if.kind ); } IndexMap::new() @@ -3268,10 +3281,9 @@ async fn get_rows_to_update( let updates_after = match &query_as_if.row { None => { if query_as_if.kind != QueryAsIfKind::Remove { - valve_log!( - "WARN: No row in query_as_if: {:?} for {:?}", - query_as_if, - query_as_if.kind + warn!( + "No row in query_as_if: {:?} for {:?}", + query_as_if, query_as_if.kind ); } IndexMap::new() @@ -3344,10 +3356,9 @@ async fn get_rows_to_update( let updates = match query_as_if.kind { QueryAsIfKind::Add => { if let None = query_as_if.row { - valve_log!( - "WARN: No row in query_as_if: {:?} for {:?}", - query_as_if, - query_as_if.kind + warn!( + "No row in query_as_if: {:?} for {:?}", + query_as_if, query_as_if.kind ); } IndexMap::new() @@ -3549,12 +3560,12 @@ fn get_json_from_row(row: &AnyRow, column: &str) -> Option { let value: &str = row.get(column); match serde_json::from_str::(value) { Err(e) => { - valve_log!("WARN: {}", e); + warn!("{}", e); None } Ok(SerdeValue::Object(value)) => Some(value), _ => { - valve_log!("WARN: {} is not an object.", value); + warn!("{} is not an object.", value); None } } @@ -4154,13 +4165,9 @@ fn read_tsv_into_vector(path: &str) -> Vec { let val = val.as_str().unwrap(); let trimmed_val = val.trim(); if trimmed_val != val { - valve_log!( - "Error: Value '{}' of column '{}' in row {} of table '{}' {}", - val, - col, - i, - path, - "has leading and/or trailing whitespace." + error!( + "Value '{}' of column '{}' in row {} of table '{}' {}", + val, col, i, path, "has leading and/or trailing whitespace." ); process::exit(1); } @@ -4627,15 +4634,19 @@ fn verify_table_deps_and_sort( match get_cycles(&dependency_graph) { Ok(sorted_table_list) => { - let mut dependencies = HashMap::new(); + let mut table_dependencies = HashMap::new(); for node in dependency_graph.nodes() { let neighbors = dependency_graph - .neighbors_directed(node, petgraph::Direction::Outgoing) + .neighbors_directed(node, petgraph::Direction::Incoming) .map(|n| n.to_string()) .collect::>(); - dependencies.insert(node.to_string(), neighbors); + table_dependencies.insert(node.to_string(), neighbors); } - return (sorted_table_list, dependencies); + // Add entries for the message and history tables: + table_dependencies.insert("message".to_string(), sorted_table_list.clone()); + table_dependencies.insert("history".to_string(), sorted_table_list.clone()); + + return (sorted_table_list, table_dependencies); } Err(cycles) => { let mut message = String::new(); @@ -5083,7 +5094,7 @@ fn add_message_counts(messages: &Vec, messages_stats: &mut HashMap valve_log!("ERROR while processing row for '{}': {}", table_name, err), + Err(err) => error!( + "While processing row for '{}', got error '{}'", + table_name, err + ), Ok(row) => { let mut result_row = ResultRow { row_number: None,