From fd4e716cbdfde04cd8286ad00d6ee05986381a5a Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 15 Dec 2023 18:52:06 -0500 Subject: [PATCH] move load_db() to load_tables() in the new Valve API --- src/lib.rs | 420 ++++++++++++++++++++++++++--------------------------- 1 file changed, 206 insertions(+), 214 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e69f636b..9530458d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,6 +187,26 @@ impl Valve { // TODO: Error type should be ConfigError let pool = get_pool_from_connection_string(database).await?; + if pool.any_kind() == AnyKind::Sqlite { + sqlx_query("PRAGMA foreign_keys = ON") + .execute(&pool) + .await?; + if initial_load { + // These pragmas are unsafe but they are used during initial loading since data + // integrity is not a priority in this case. + sqlx_query("PRAGMA journal_mode = OFF") + .execute(&pool) + .await?; + sqlx_query("PRAGMA synchronous = 0").execute(&pool).await?; + sqlx_query("PRAGMA cache_size = 1000000") + .execute(&pool) + .await?; + sqlx_query("PRAGMA temp_store = MEMORY") + .execute(&pool) + .await?; + } + } + let parser = StartParser::new(); let ( specials_config, @@ -931,6 +951,12 @@ impl Valve { pub async fn drop_tables(&self, tables: Vec<&str>) -> Result<&Self, sqlx::Error> { // DatabaseError + // TODO: This will work fine when the table list is all of the tables in the db, + // but in the case of a partial list, then there is a risk that some of them have + // dependencies on tables not in the list. What we need to do is grab the complete + // list of tables from self.global_config.sorted_table_list and use it as a reference + // for which tables need to be dropped. + for table in tables { if table != "message" && table != "history" { let sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); @@ -962,6 +988,12 @@ impl Valve { pub async fn truncate_tables(&self, tables: Vec<&str>) -> Result<&Self, sqlx::Error> { // ConfigOrDatabaseError + // TODO: This will work fine when the table list is all of the tables in the db, + // but in the case of a partial list, then there is a risk that some of them have + // dependencies on tables not in the list. What we need to do is grab the complete + // list of tables from self.global_config.sorted_table_list and use it as a reference + // for which tables need to be dropped. + self.create_missing_tables().await?; // We must use CASCADE in the case of PostgreSQL since we cannot truncate a table, T, that @@ -995,67 +1027,194 @@ impl Valve { pub async fn load_all_tables(&self, _validate: bool) -> Result<&Self, sqlx::Error> { // DatabaseError - self.create_missing_tables().await?; - self.truncate_all_tables().await?; - - if self.pool.any_kind() == AnyKind::Sqlite { - sqlx_query("PRAGMA foreign_keys = ON") - .execute(&self.pool) - .await?; - if self.initial_load { - // These pragmas are unsafe but they are used during initial loading since data - // integrity is not a priority in this case. - sqlx_query("PRAGMA journal_mode = OFF") - .execute(&self.pool) - .await?; - sqlx_query("PRAGMA synchronous = 0") - .execute(&self.pool) - .await?; - sqlx_query("PRAGMA cache_size = 1000000") - .execute(&self.pool) - .await?; - sqlx_query("PRAGMA temp_store = MEMORY") - .execute(&self.pool) - .await?; - } - } - + let table_list = self.get_tables_ordered_for_creation(); if self.verbose { - valve_log!( - "Processing {} tables.", - self.global_config - .get("sorted_table_list") - .and_then(|l| l.as_array()) - .unwrap() - .len() - ); + valve_log!("Processing {} tables.", table_list.len()); } - load_db( - &self.global_config, - &self.pool, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - self.verbose, - ) - .await?; - - Ok(self) + self.load_tables(table_list, true).await } /// Given a vector of table names, /// load those tables in the given order. /// If `validate` is false, just try to insert all rows. /// Return an error on invalid table name or database problem. + /// Given a configuration map, a database connection pool, a parser, HashMaps representing + /// compiled datatype and rule conditions, and a HashMap representing parsed structure + /// conditions, read in the data TSV files corresponding to each configured table, then validate + /// and load all of the corresponding data rows. If the verbose flag is set to true, output + /// progress messages to stderr during load. pub async fn load_tables( &self, - tables: Vec<&str>, + table_list: Vec<&str>, _validate: bool, ) -> Result<&Self, sqlx::Error> { // ConfigOrDatabaseError + self.create_missing_tables().await?; - self.truncate_tables(tables).await?; - if 1 == 1 { - todo!(); + let mut list_for_deletion = table_list.clone(); + list_for_deletion.reverse(); + self.truncate_tables(list_for_deletion).await?; + + let num_tables = table_list.len(); + let mut total_errors = 0; + let mut total_warnings = 0; + let mut total_infos = 0; + let mut table_num = 1; + for table_name in table_list { + if table_name == "message" || table_name == "history" { + continue; + } + let table_name = table_name.to_string(); + let path = String::from( + self.global_config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|o| o.get(&table_name)) + .and_then(|n| n.get("path")) + .and_then(|p| p.as_str()) + .unwrap(), + ); + let mut rdr = { + match File::open(path.clone()) { + Err(e) => { + valve_log!("WARN: Unable to open '{}': {}", path.clone(), e); + continue; + } + Ok(table_file) => csv::ReaderBuilder::new() + .has_headers(false) + .delimiter(b'\t') + .from_reader(table_file), + } + }; + if self.verbose { + valve_log!("Loading table {}/{}: {}", table_num, num_tables, table_name); + } + table_num += 1; + + // Extract the headers, which we will need later: + let mut records = rdr.records(); + let headers; + if let Some(result) = records.next() { + headers = result.unwrap(); + } else { + panic!("'{}' is empty", path); + } + + for header in headers.iter() { + if header.trim().is_empty() { + panic!( + "One or more of the header fields is empty for table '{}'", + table_name + ); + } + } + + // HashMap used to report info about the number of error/warning/info messages for this + // table when the verbose flag is set to true: + let mut messages_stats = HashMap::new(); + messages_stats.insert("error".to_string(), 0); + messages_stats.insert("warning".to_string(), 0); + messages_stats.insert("info".to_string(), 0); + + // Split the data into chunks of size CHUNK_SIZE before passing them to the validation + // logic: + let chunks = records.chunks(CHUNK_SIZE); + validate_and_insert_chunks( + &self.global_config, + &self.pool, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &table_name, + &chunks, + &headers, + &mut messages_stats, + self.verbose, + ) + .await?; + + // We need to wait until all of the rows for a table have been loaded before validating the + // "foreign" constraints on a table's trees, since this checks if the values of one column + // (the tree's parent) are all contained in another column (the tree's child): + // We also need to wait before validating a table's "under" constraints. Although the tree + // associated with such a constraint need not be defined on the same table, it can be. + let mut recs_to_update = validate_tree_foreign_keys( + &self.global_config, + &self.pool, + None, + &table_name, + None, + ) + .await?; + recs_to_update.append( + &mut validate_under(&self.global_config, &self.pool, None, &table_name, None) + .await?, + ); + + for record in recs_to_update { + let row_number = record.get("row_number").unwrap(); + let column_name = record.get("column").and_then(|s| s.as_str()).unwrap(); + let value = record.get("value").and_then(|s| s.as_str()).unwrap(); + let level = record.get("level").and_then(|s| s.as_str()).unwrap(); + let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); + let message = record.get("message").and_then(|s| s.as_str()).unwrap(); + + let sql = local_sql_syntax( + &self.pool, + &format!( + r#"INSERT INTO "message" + ("table", "row", "column", "value", "level", "rule", "message") + VALUES ({}, {}, {}, {}, {}, {}, {})"#, + SQL_PARAM, + row_number, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM + ), + ); + let mut query = sqlx_query(&sql); + query = query.bind(&table_name); + query = query.bind(&column_name); + query = query.bind(&value); + query = query.bind(&level); + query = query.bind(&rule); + query = query.bind(&message); + query.execute(&self.pool).await?; + + if self.verbose { + // Add the generated message to messages_stats: + let messages = vec![json!({ + "message": message, + "level": level, + })]; + add_message_counts(&messages, &mut messages_stats); + } + } + + if self.verbose { + // Output a report on the messages generated to stderr: + let errors = messages_stats.get("error").unwrap(); + let warnings = messages_stats.get("warning").unwrap(); + let infos = messages_stats.get("info").unwrap(); + let status_message = format!( + "{} errors, {} warnings, and {} information messages generated for {}", + errors, warnings, infos, table_name + ); + valve_log!("{}", status_message); + total_errors += errors; + total_warnings += warnings; + total_infos += infos; + } + } + + if self.verbose { + valve_log!( + "Loading complete with {} errors, {} warnings, and {} information messages", + total_errors, + total_warnings, + total_infos + ); } Ok(self) } @@ -5363,170 +5522,3 @@ async fn validate_and_insert_chunks( Ok(()) } } - -/// Given a configuration map, a database connection pool, a parser, HashMaps representing -/// compiled datatype and rule conditions, and a HashMap representing parsed structure conditions, -/// read in the data TSV files corresponding to each configured table, then validate and load all of -/// the corresponding data rows. If the verbose flag is set to true, output progress messages to -/// stderr during load. -async fn load_db( - config: &SerdeMap, - pool: &AnyPool, - compiled_datatype_conditions: &HashMap, - compiled_rule_conditions: &HashMap>>, - verbose: bool, -) -> Result<(), sqlx::Error> { - let mut table_list = vec![]; - for table in config - .get("sorted_table_list") - .and_then(|l| l.as_array()) - .unwrap() - { - table_list.push(table.as_str().and_then(|s| Some(s.to_string())).unwrap()); - } - let table_list = table_list; // Change the table_list to read only after populating it. - let num_tables = table_list.len(); - let mut total_errors = 0; - let mut total_warnings = 0; - let mut total_infos = 0; - let mut table_num = 1; - for table_name in table_list { - let path = String::from( - config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|o| o.get(&table_name)) - .and_then(|n| n.get("path")) - .and_then(|p| p.as_str()) - .unwrap(), - ); - let mut rdr = { - match File::open(path.clone()) { - Err(e) => { - valve_log!("WARN: Unable to open '{}': {}", path.clone(), e); - continue; - } - Ok(table_file) => csv::ReaderBuilder::new() - .has_headers(false) - .delimiter(b'\t') - .from_reader(table_file), - } - }; - if verbose { - valve_log!("Loading table {}/{}: {}", table_num, num_tables, table_name); - } - table_num += 1; - - // Extract the headers, which we will need later: - let mut records = rdr.records(); - let headers; - if let Some(result) = records.next() { - headers = result.unwrap(); - } else { - panic!("'{}' is empty", path); - } - - for header in headers.iter() { - if header.trim().is_empty() { - panic!( - "One or more of the header fields is empty for table '{}'", - table_name - ); - } - } - - // HashMap used to report info about the number of error/warning/info messages for this - // table when the verbose flag is set to true: - let mut messages_stats = HashMap::new(); - messages_stats.insert("error".to_string(), 0); - messages_stats.insert("warning".to_string(), 0); - messages_stats.insert("info".to_string(), 0); - - // Split the data into chunks of size CHUNK_SIZE before passing them to the validation - // logic: - let chunks = records.chunks(CHUNK_SIZE); - validate_and_insert_chunks( - config, - pool, - compiled_datatype_conditions, - compiled_rule_conditions, - &table_name, - &chunks, - &headers, - &mut messages_stats, - verbose, - ) - .await?; - - // We need to wait until all of the rows for a table have been loaded before validating the - // "foreign" constraints on a table's trees, since this checks if the values of one column - // (the tree's parent) are all contained in another column (the tree's child): - // We also need to wait before validating a table's "under" constraints. Although the tree - // associated with such a constraint need not be defined on the same table, it can be. - let mut recs_to_update = - validate_tree_foreign_keys(config, pool, None, &table_name, None).await?; - recs_to_update.append(&mut validate_under(config, pool, None, &table_name, None).await?); - - for record in recs_to_update { - let row_number = record.get("row_number").unwrap(); - let column_name = record.get("column").and_then(|s| s.as_str()).unwrap(); - let value = record.get("value").and_then(|s| s.as_str()).unwrap(); - let level = record.get("level").and_then(|s| s.as_str()).unwrap(); - let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); - let message = record.get("message").and_then(|s| s.as_str()).unwrap(); - - let sql = local_sql_syntax( - &pool, - &format!( - r#"INSERT INTO "message" - ("table", "row", "column", "value", "level", "rule", "message") - VALUES ({}, {}, {}, {}, {}, {}, {})"#, - SQL_PARAM, row_number, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM - ), - ); - let mut query = sqlx_query(&sql); - query = query.bind(&table_name); - query = query.bind(&column_name); - query = query.bind(&value); - query = query.bind(&level); - query = query.bind(&rule); - query = query.bind(&message); - query.execute(pool).await?; - - if verbose { - // Add the generated message to messages_stats: - let messages = vec![json!({ - "message": message, - "level": level, - })]; - add_message_counts(&messages, &mut messages_stats); - } - } - - if verbose { - // Output a report on the messages generated to stderr: - let errors = messages_stats.get("error").unwrap(); - let warnings = messages_stats.get("warning").unwrap(); - let infos = messages_stats.get("info").unwrap(); - let status_message = format!( - "{} errors, {} warnings, and {} information messages generated for {}", - errors, warnings, infos, table_name - ); - valve_log!("{}", status_message); - total_errors += errors; - total_warnings += warnings; - total_infos += infos; - } - } - - if verbose { - valve_log!( - "Loading complete with {} errors, {} warnings, and {} information messages", - total_errors, - total_warnings, - total_infos - ); - } - - Ok(()) -}