From 5ba639cbcb0a780a43244f53ce85f579687c5287 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sun, 14 Jan 2024 14:47:59 -0500 Subject: [PATCH] add back old deleted functions for ease of nanobot.rs migration --- src/lib.rs | 2157 ++++++++++++++++++++++++++++++++++++++++++++++- src/validate.rs | 281 +++++- 2 files changed, 2434 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 785e6f99..31e4b3b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,8 +24,9 @@ pub mod validate; lalrpop_mod!(pub valve_grammar); use crate::validate::{ - validate_row_tx, validate_rows_constraints, validate_rows_intra, validate_rows_trees, - validate_tree_foreign_keys, validate_under, with_tree_sql, QueryAsIf, QueryAsIfKind, ResultRow, + validate_row_old, validate_row_tx, validate_rows_constraints, validate_rows_intra, + validate_rows_intra_old, validate_rows_trees, validate_tree_foreign_keys, validate_under, + with_tree_sql, QueryAsIf, QueryAsIfKind, ResultRow, }; use crate::{ast::Expression, valve_grammar::StartParser}; use async_recursion::async_recursion; @@ -47,7 +48,9 @@ use regex::Regex; use serde_json::{json, Value as SerdeValue}; use sqlx::{ any::{AnyConnectOptions, AnyKind, AnyPool, AnyPoolOptions, AnyRow}, - query as sqlx_query, Acquire, Column, Row, Transaction, ValueRef, + query as sqlx_query, Acquire, Column, + Error::Configuration as SqlxCErr, + Row, Transaction, ValueRef, }; use std::{ collections::{BTreeMap, HashMap}, @@ -173,6 +176,2154 @@ impl std::fmt::Debug for ColumnRule { } } +pub fn read_config_files_old( + path: &str, + config_table: &str, +) -> (SerdeMap, SerdeMap, SerdeMap, SerdeMap) { + let special_table_types = json!({ + "table": {"required": true}, + "column": {"required": true}, + "datatype": {"required": true}, + "rule": {"required": false}, + }); + let special_table_types = special_table_types.as_object().unwrap(); + + // Initialize the special table entries in the specials config map: + let mut specials_config = SerdeMap::new(); + for t in special_table_types.keys() { + specials_config.insert(t.to_string(), SerdeValue::Null); + } + + // Load the table table from the given path: + let mut tables_config = SerdeMap::new(); + let rows = { + // Read in the configuration entry point (the "table table") from either a file or a + // database table. + if path.to_lowercase().ends_with(".tsv") { + read_tsv_into_vector(path) + } else { + read_db_table_into_vector(path, config_table) + } + }; + + for mut row in rows { + for column in vec!["table", "path", "type"] { + if !row.contains_key(column) || row.get(column) == None { + panic!("Missing required column '{}' reading '{}'", column, path); + } + } + + for column in vec!["table", "path"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + panic!("Missing required value for '{}' reading '{}'", column, path); + } + } + + for column in vec!["type"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + row.remove(&column.to_string()); + } + } + + if let Some(SerdeValue::String(row_type)) = row.get("type") { + if row_type == "table" { + let row_path = row.get("path").unwrap(); + if path.to_lowercase().ends_with(".tsv") && row_path != path { + panic!( + "Special 'table' path '{}' does not match this path '{}'", + row_path, path + ); + } + } + + if special_table_types.contains_key(row_type) { + match specials_config.get(row_type) { + Some(SerdeValue::Null) => (), + _ => panic!( + "Multiple tables with type '{}' declared in '{}'", + row_type, path + ), + } + let row_table = row.get("table").and_then(|t| t.as_str()).unwrap(); + specials_config.insert( + row_type.to_string(), + SerdeValue::String(row_table.to_string()), + ); + } else { + panic!("Unrecognized table type '{}' in '{}'", row_type, path); + } + } + + row.insert(String::from("column"), SerdeValue::Object(SerdeMap::new())); + let row_table = row.get("table").and_then(|t| t.as_str()).unwrap(); + tables_config.insert(row_table.to_string(), SerdeValue::Object(row)); + } + + // Check that all the required special tables are present + for (table_type, table_spec) in special_table_types.iter() { + if let Some(SerdeValue::Bool(true)) = table_spec.get("required") { + if let Some(SerdeValue::Null) = specials_config.get(table_type) { + panic!("Missing required '{}' table in '{}'", table_type, path); + } + } + } + + // Helper function for extracting special configuration (other than the main 'table' + // configuration) from either a file or a table in the database, depending on the value of + // `path`. When `path` ends in '.tsv', the path of the config table corresponding to + // `table_type` is looked up, the TSV is read, and the rows are returned. When `path` does not + // end in '.tsv', the table name corresponding to `table_type` is looked up in the database + // indicated by `path`, the table is read, and the rows are returned. + fn get_special_config( + table_type: &str, + specials_config: &SerdeMap, + tables_config: &SerdeMap, + path: &str, + ) -> Vec { + if path.to_lowercase().ends_with(".tsv") { + let table_name = specials_config + .get(table_type) + .and_then(|d| d.as_str()) + .unwrap(); + let path = String::from( + tables_config + .get(table_name) + .and_then(|t| t.get("path")) + .and_then(|p| p.as_str()) + .unwrap(), + ); + return read_tsv_into_vector(&path.to_string()); + } else { + let mut db_table = None; + for (table_name, table_config) in tables_config { + let this_type = table_config.get("type"); + if let Some(this_type) = this_type { + let this_type = this_type.as_str().unwrap(); + if this_type == table_type { + db_table = Some(table_name); + break; + } + } + } + if db_table == None { + panic!( + "Could not determine special table name for type '{}'.", + table_type + ); + } + let db_table = db_table.unwrap(); + read_db_table_into_vector(path, db_table) + } + } + + // Load datatype table + let mut datatypes_config = SerdeMap::new(); + let rows = get_special_config("datatype", &specials_config, &tables_config, path); + for mut row in rows { + for column in vec![ + "datatype", + "parent", + "condition", + "SQLite type", + "PostgreSQL type", + ] { + if !row.contains_key(column) || row.get(column) == None { + panic!("Missing required column '{}' reading '{}'", column, path); + } + } + + for column in vec!["datatype"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + panic!("Missing required value for '{}' reading '{}'", column, path); + } + } + + for column in vec!["parent", "condition", "SQLite type", "PostgreSQL type"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + row.remove(&column.to_string()); + } + } + + let dt_name = row.get("datatype").and_then(|d| d.as_str()).unwrap(); + datatypes_config.insert(dt_name.to_string(), SerdeValue::Object(row)); + } + + for dt in vec!["text", "empty", "line", "word"] { + if !datatypes_config.contains_key(dt) { + panic!("Missing required datatype: '{}'", dt); + } + } + + // Load column table + let rows = get_special_config("column", &specials_config, &tables_config, path); + for mut row in rows { + for column in vec!["table", "column", "label", "nulltype", "datatype"] { + if !row.contains_key(column) || row.get(column) == None { + panic!("Missing required column '{}' reading '{}'", column, path); + } + } + + for column in vec!["table", "column", "datatype"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + panic!("Missing required value for '{}' reading '{}'", column, path); + } + } + + for column in vec!["nulltype"] { + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + row.remove(&column.to_string()); + } + } + + let row_table = row.get("table").and_then(|t| t.as_str()).unwrap(); + if !tables_config.contains_key(row_table) { + panic!("Undefined table '{}' reading '{}'", row_table, path); + } + + if let Some(SerdeValue::String(nulltype)) = row.get("nulltype") { + if !datatypes_config.contains_key(nulltype) { + panic!("Undefined nulltype '{}' reading '{}'", nulltype, path); + } + } + + let datatype = row.get("datatype").and_then(|d| d.as_str()).unwrap(); + if !datatypes_config.contains_key(datatype) { + panic!("Undefined datatype '{}' reading '{}'", datatype, path); + } + + let row_table = row.get("table").and_then(|t| t.as_str()).unwrap(); + let column_name = row.get("column").and_then(|c| c.as_str()).unwrap(); + + let columns_config = tables_config + .get_mut(row_table) + .and_then(|t| t.get_mut("column")) + .and_then(|c| c.as_object_mut()) + .unwrap(); + columns_config.insert(column_name.to_string(), SerdeValue::Object(row)); + } + + // Load rule table if it exists + let mut rules_config = SerdeMap::new(); + if let Some(SerdeValue::String(table_name)) = specials_config.get("rule") { + let rows = get_special_config(table_name, &specials_config, &tables_config, path); + for row in rows { + for column in vec![ + "table", + "when column", + "when condition", + "then column", + "then condition", + "level", + "description", + ] { + if !row.contains_key(column) || row.get(column) == None { + panic!("Missing required column '{}' reading '{}'", column, path); + } + if row.get(column).and_then(|c| c.as_str()).unwrap() == "" { + panic!("Missing required value for '{}' reading '{}'", column, path); + } + } + + let row_table = row.get("table").and_then(|t| t.as_str()).unwrap(); + if !tables_config.contains_key(row_table) { + panic!("Undefined table '{}' reading '{}'", row_table, path); + } + + // Add the rule specified in the given row to the list of rules associated with the + // value of the when column: + let row_when_column = row.get("when column").and_then(|c| c.as_str()).unwrap(); + if !rules_config.contains_key(row_table) { + rules_config.insert(String::from(row_table), SerdeValue::Object(SerdeMap::new())); + } + + let table_rule_config = rules_config + .get_mut(row_table) + .and_then(|t| t.as_object_mut()) + .unwrap(); + if !table_rule_config.contains_key(row_when_column) { + table_rule_config.insert(String::from(row_when_column), SerdeValue::Array(vec![])); + } + let column_rule_config = table_rule_config + .get_mut(&row_when_column.to_string()) + .and_then(|w| w.as_array_mut()) + .unwrap(); + column_rule_config.push(SerdeValue::Object(row)); + } + } + + // Manually add the messsage table config: + tables_config.insert( + "message".to_string(), + json!({ + "table": "message", + "description": "Validation messages for all of the tables and columns", + "type": "message", + "column_order": [ + "table", + "row", + "column", + "value", + "level", + "rule", + "message", + ], + "column": { + "table": { + "table": "message", + "column": "table", + "description": "The table referred to by the message", + "datatype": "table_name", + "structure": "" + }, + "row": { + "table": "message", + "column": "row", + "description": "The row number of the table referred to by the message", + "datatype": "natural_number", + "structure": "" + }, + "column": { + "table": "message", + "column": "column", + "description": "The column of the table referred to by the message", + "datatype": "column_name", + "structure": "" + }, + "value": { + "table": "message", + "column": "value", + "description": "The value that is the reason for the message", + "datatype": "text", + "structure": "" + }, + "level": { + "table": "message", + "column": "level", + "description": "The severity of the violation", + "datatype": "word", + "structure": "" + }, + "rule": { + "table": "message", + "column": "rule", + "description": "The rule violated by the value", + "datatype": "CURIE", + "structure": "" + }, + "message": { + "table": "message", + "column": "message", + "description": "The message", + "datatype": "line", + "structure": "" + } + } + }), + ); + + // Manually add the history table config: + tables_config.insert( + "history".to_string(), + json!({ + "table": "history", + "description": "History of changes to the VALVE database", + "type": "history", + "column_order": [ + "table", + "row", + "from", + "to", + "user", + ], + "column": { + "table": { + "table": "history", + "column": "table", + "description": "The table referred to by the history entry", + "datatype": "table_name", + "structure": "", + }, + "row": { + "table": "history", + "column": "row", + "description": "The row number of the table referred to by the history entry", + "datatype": "natural_number", + "structure": "", + }, + "from": { + "table": "history", + "column": "from", + "description": "The initial value of the row", + "datatype": "text", + "structure": "", + }, + "to": { + "table": "history", + "column": "to", + "description": "The final value of the row", + "datatype": "text", + "structure": "", + }, + "summary": { + "table": "history", + "column": "summary", + "description": "Summarizes the changes to each column of the row", + "datatype": "text", + "structure": "", + }, + "user": { + "table": "history", + "column": "user", + "description": "User responsible for the change", + "datatype": "line", + "structure": "", + }, + "undone_by": { + "table": "history", + "column": "undone_by", + "description": "User who has undone the change. Null if it has not been undone", + "datatype": "line", + "structure": "", + }, + } + }), + ); + + // Finally, return all the configs: + ( + specials_config, + tables_config, + datatypes_config, + rules_config, + ) +} + +fn get_sql_for_standard_view_old(table: &str, pool: &AnyPool) -> (String, String) { + let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table); + let message_t; + if pool.any_kind() == AnyKind::Postgres { + drop_view_sql.push_str(" CASCADE"); + message_t = format!( + indoc! {r#" + ( + SELECT JSON_AGG(m)::TEXT FROM ( + SELECT "column", "value", "level", "rule", "message" + FROM "message" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + ORDER BY "column", "message_id" + ) m + ) + "#}, + t = table, + ); + } else { + message_t = format!( + indoc! {r#" + ( + SELECT NULLIF( + JSON_GROUP_ARRAY( + JSON_OBJECT( + 'column', "column", + 'value', "value", + 'level', "level", + 'rule', "rule", + 'message', "message" + ) + ), + '[]' + ) + FROM "message" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + ORDER BY "column", "message_id" + ) + "#}, + t = table, + ); + } + drop_view_sql.push_str(";"); + + let history_t; + if pool.any_kind() == AnyKind::Postgres { + history_t = format!( + indoc! {r#" + ( + SELECT '[' || STRING_AGG("summary", ',') || ']' + FROM ( + SELECT "summary" + FROM "history" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + AND "summary" IS DISTINCT FROM NULL + AND "undone_by" IS NOT DISTINCT FROM NULL + ORDER BY "history_id" + ) h + ) + "#}, + t = table, + ); + } else { + history_t = format!( + indoc! {r#" + ( + SELECT '[' || GROUP_CONCAT("summary") || ']' + FROM ( + SELECT "summary" + FROM "history" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + AND "summary" IS NOT NULL + AND "undone_by" IS NULL + ORDER BY "history_id" + ) h + ) + "#}, + t = table, + ); + } + + let create_view_sql = format!( + indoc! {r#" + CREATE VIEW "{t}_view" AS + SELECT + union_t.*, + {message_t} AS "message", + {history_t} AS "history" + FROM ( + SELECT * FROM "{t}" + UNION ALL + SELECT * FROM "{t}_conflict" + ) as union_t; + "#}, + t = table, + message_t = message_t, + history_t = history_t, + ); + + (drop_view_sql, create_view_sql) +} + +/// Given the tables configuration map, the name of a table and a database connection pool, +/// generate SQL for creating a more user-friendly version of the view than the one generated by +/// [get_sql_for_standard_view()]. Unlike the standard view generated by that function, the view +/// generated by this function (called my_table_text_view) always shows all of the values (which are +/// all rendered as text) of every column in the table, even when those values contain SQL datatype +/// errors. Like the function for generating a standard view, the SQL generated by this function is +/// returned in the form of a tuple of Strings, with the first string being a SQL statement +/// for dropping the view, and the second string being a SQL statement for creating it. +fn get_sql_for_text_view_old( + tables_config: &mut SerdeMap, + table: &str, + pool: &AnyPool, +) -> (String, String) { + let is_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS" + } else { + "IS NOT DISTINCT FROM" + }; + + let real_columns = tables_config + .get(table) + .and_then(|t| t.as_object()) + .and_then(|t| t.get("column")) + .and_then(|t| t.as_object()) + .and_then(|t| Some(t.keys())) + .and_then(|k| Some(k.map(|k| k.to_string()))) + .and_then(|t| Some(t.collect::>())) + .unwrap(); + + // Add a second "text view" such that the datatypes of all values are TEXT and appear + // directly in their corresponsing columns (rather than as NULLs) even when they have + // SQL datatype errors. + let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); + if pool.any_kind() == AnyKind::Postgres { + drop_view_sql.push_str(" CASCADE"); + } + + let mut inner_columns = real_columns + .iter() + .map(|c| { + format!( + r#"CASE + WHEN "{column}" {is_clause} NULL THEN ( + SELECT value + FROM "message" + WHERE "row" = "row_number" + AND "column" = '{column}' + AND "table" = '{table}' + ORDER BY "message_id" DESC + LIMIT 1 + ) + ELSE {casted_column} + END AS "{column}""#, + casted_column = if pool.any_kind() == AnyKind::Sqlite { + cast_column_sql_to_text(c, "non-text") + } else { + format!("\"{}\"::TEXT", c) + }, + column = c, + table = table, + ) + }) + .collect::>(); + + let mut outer_columns = real_columns + .iter() + .map(|c| format!("t.\"{}\"", c)) + .collect::>(); + + let inner_columns = { + let mut v = vec![ + "row_number".to_string(), + "message".to_string(), + "history".to_string(), + ]; + v.append(&mut inner_columns); + v + }; + + let outer_columns = { + let mut v = vec![ + "t.row_number".to_string(), + "t.message".to_string(), + "t.history".to_string(), + ]; + v.append(&mut outer_columns); + v + }; + + let create_view_sql = format!( + r#"CREATE VIEW "{table}_text_view" AS + SELECT {outer_columns} + FROM ( + SELECT {inner_columns} + FROM "{table}_view" + ) t"#, + outer_columns = outer_columns.join(", "), + inner_columns = inner_columns.join(", "), + table = table, + ); + + (drop_view_sql, create_view_sql) +} + +pub async fn configure_db_old( + tables_config: &mut SerdeMap, + datatypes_config: &mut SerdeMap, + pool: &AnyPool, + parser: &StartParser, + verbose: bool, + command: &ValveCommand, +) -> Result<(Vec, SerdeMap), sqlx::Error> { + // This is the SerdeMap that we will be returning: + let mut constraints_config = SerdeMap::new(); + constraints_config.insert(String::from("foreign"), SerdeValue::Object(SerdeMap::new())); + constraints_config.insert(String::from("unique"), SerdeValue::Object(SerdeMap::new())); + constraints_config.insert(String::from("primary"), SerdeValue::Object(SerdeMap::new())); + constraints_config.insert(String::from("tree"), SerdeValue::Object(SerdeMap::new())); + constraints_config.insert(String::from("under"), SerdeValue::Object(SerdeMap::new())); + + // Begin by reading in the TSV files corresponding to the tables defined in tables_config, and + // use that information to create the associated database tables, while saving constraint + // information to constrains_config. + let mut setup_statements = HashMap::new(); + for table_name in tables_config.keys().cloned().collect::>() { + let optional_path = tables_config + .get(&table_name) + .and_then(|r| r.get("path")) + .and_then(|p| p.as_str()); + + let mut path = None; + match optional_path { + None => { + // If an entry of the tables_config has no path then it is an internal table which + // need not be configured explicitly. Currently the only examples are the message + // and history tables. + if table_name != "message" && table_name != "history" { + panic!("No path defined for table {}", table_name); + } + continue; + } + Some(p) if !Path::new(p).is_file() => { + eprintln!("WARN: File does not exist {}", p); + } + Some(p) if Path::new(p).canonicalize().is_err() => { + eprintln!("WARN: File path could not be made canonical {}", p); + } + Some(p) => path = Some(p.to_string()), + }; + + let defined_columns: Vec = tables_config + .get(&table_name) + .and_then(|r| r.get("column")) + .and_then(|v| v.as_object()) + .and_then(|o| Some(o.keys())) + .and_then(|k| Some(k.cloned())) + .and_then(|k| Some(k.collect())) + .unwrap(); + + // We use column_order to explicitly indicate the order in which the columns should appear + // in the table, for later reference. The default is to preserve the order from the actual + // table file. If that does not exist, we use the ordering in defined_columns. + let mut column_order = vec![]; + if let Some(path) = path { + // Get the actual columns from the data itself. Note that we set has_headers to + // false(even though the files have header rows) in order to explicitly read the + // header row. + let mut rdr = csv::ReaderBuilder::new() + .has_headers(false) + .delimiter(b'\t') + .from_reader(File::open(path.clone()).unwrap_or_else(|err| { + panic!("Unable to open '{}': {}", path.clone(), err); + })); + let mut iter = rdr.records(); + if let Some(result) = iter.next() { + let actual_columns = result + .unwrap() + .iter() + .map(|c| c.to_string()) + .collect::>(); + // Make sure that the actual columns found in the table file, and the columns + // defined in the column config, exactly match in terms of their content: + for column_name in &actual_columns { + column_order.push(json!(column_name)); + if !defined_columns.contains(&column_name.to_string()) { + panic!( + "Column '{}.{}' not in column config", + table_name, column_name + ); + } + } + for column_name in &defined_columns { + if !actual_columns.contains(&column_name.to_string()) { + panic!( + "Defined column '{}.{}' not found in table", + table_name, column_name + ); + } + } + } else { + panic!("'{}' is empty", path); + } + } + + if column_order.is_empty() { + column_order = defined_columns.iter().map(|c| json!(c)).collect::>(); + } + tables_config + .get_mut(&table_name) + .and_then(|t| t.as_object_mut()) + .and_then(|o| { + o.insert( + String::from("column_order"), + SerdeValue::Array(column_order), + ) + }); + + // Create the table and its corresponding conflict table: + let mut table_statements = vec![]; + for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] { + let (mut statements, table_constraints) = + create_table_statement_old(tables_config, datatypes_config, parser, &table, &pool); + table_statements.append(&mut statements); + if !table.ends_with("_conflict") { + for constraint_type in vec!["foreign", "unique", "primary", "tree", "under"] { + let table_constraints = table_constraints.get(constraint_type).unwrap().clone(); + constraints_config + .get_mut(constraint_type) + .and_then(|o| o.as_object_mut()) + .and_then(|o| o.insert(table_name.to_string(), table_constraints)); + } + } + } + + let (drop_view_sql, create_view_sql) = get_sql_for_standard_view_old(&table_name, pool); + let (drop_text_view_sql, create_text_view_sql) = + get_sql_for_text_view_old(tables_config, &table_name, pool); + table_statements.push(drop_text_view_sql); + table_statements.push(drop_view_sql); + table_statements.push(create_view_sql); + table_statements.push(create_text_view_sql); + + setup_statements.insert(table_name.to_string(), table_statements); + } + + // Sort the tables according to their foreign key dependencies so that tables are always loaded + // after the tables they depend on. Ignore the internal message and history tables: + let sorted_tables = verify_table_deps_and_sort( + &setup_statements.keys().cloned().collect(), + &constraints_config, + ) + .0; + + if *command != ValveCommand::Config || verbose { + // Generate DDL for the history table: + let mut history_statements = vec![]; + history_statements.push({ + let mut sql = r#"DROP TABLE IF EXISTS "history""#.to_string(); + if pool.any_kind() == AnyKind::Postgres { + sql.push_str(" CASCADE"); + } + sql.push_str(";"); + sql + }); + history_statements.push(format!( + indoc! {r#" + CREATE TABLE "history" ( + {row_number} + "table" TEXT, + "row" BIGINT, + "from" TEXT, + "to" TEXT, + "summary" TEXT, + "user" TEXT, + "undone_by" TEXT, + {timestamp} + ); + "#}, + row_number = { + if pool.any_kind() == AnyKind::Sqlite { + "\"history_id\" INTEGER PRIMARY KEY," + } else { + "\"history_id\" SERIAL PRIMARY KEY," + } + }, + timestamp = { + if pool.any_kind() == AnyKind::Sqlite { + "\"timestamp\" TIMESTAMP DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))" + } else { + "\"timestamp\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP" + } + }, + )); + history_statements + .push(r#"CREATE INDEX "history_tr_idx" ON "history"("table", "row");"#.to_string()); + setup_statements.insert("history".to_string(), history_statements); + + // Generate DDL for the message table: + let mut message_statements = vec![]; + message_statements.push({ + let mut sql = r#"DROP TABLE IF EXISTS "message""#.to_string(); + if pool.any_kind() == AnyKind::Postgres { + sql.push_str(" CASCADE"); + } + sql.push_str(";"); + sql + }); + message_statements.push(format!( + indoc! {r#" + CREATE TABLE "message" ( + {} + "table" TEXT, + "row" BIGINT, + "column" TEXT, + "value" TEXT, + "level" TEXT, + "rule" TEXT, + "message" TEXT + ); + "#}, + { + if pool.any_kind() == AnyKind::Sqlite { + "\"message_id\" INTEGER PRIMARY KEY," + } else { + "\"message_id\" SERIAL PRIMARY KEY," + } + }, + )); + message_statements.push( + r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(), + ); + setup_statements.insert("message".to_string(), message_statements); + + // Add the message and history tables to the beginning of the list of tables to create + // (the message table in particular needs to be at the beginning since the table views all + // reference it). + let mut tables_to_create = vec!["message".to_string(), "history".to_string()]; + tables_to_create.append(&mut sorted_tables.clone()); + for table in &tables_to_create { + let table_statements = setup_statements.get(table).unwrap(); + if *command != ValveCommand::Config { + for stmt in table_statements { + sqlx_query(stmt) + .execute(pool) + .await + .expect(format!("The SQL statement: {} returned an error", stmt).as_str()); + } + } + if verbose { + let output = String::from(table_statements.join("\n")); + println!("{}\n", output); + } + } + } + + return Ok((sorted_tables, constraints_config)); +} + +/// Various VALVE commands, used with [valve()](valve). +#[derive(Debug, PartialEq, Eq)] +pub enum ValveCommand { + /// Configure but do not create or load. + Config, + /// Configure and create but do not load. + Create, + /// Configure, create, and load. + Load, +} + +/// Given a path to a configuration table (either a table.tsv file or a database containing a +/// table named "table"), and a directory in which to find/create a database: configure the +/// database using the configuration which can be looked up using the table table, and +/// optionally create and/or load it according to the value of `command` (see [ValveCommand]). +/// If the `verbose` flag is set to true, output status messages while loading. If `config_table` +/// is given and `table_table` indicates a database, query the table called `config_table` for the +/// table table information. Returns the configuration map as a String. If `initial_load` is set to +/// true, then (SQLite only) the database settings will be tuned for initial loading. Note that +/// these settings are unsafe and should be used for initial loading only, as data integrity will +/// not be guaranteed in the case of an interrupted transaction. +pub async fn valve_old( + table_table: &str, + database: &str, + command: &ValveCommand, + verbose: bool, + initial_load: bool, + config_table: &str, +) -> Result { + let parser = StartParser::new(); + + let (specials_config, mut tables_config, mut datatypes_config, rules_config) = + read_config_files_old(&table_table.to_string(), config_table); + + // To connect to a postgresql database listening to a unix domain socket: + // ---------------------------------------------------------------------- + // let connection_options = + // AnyConnectOptions::from_str("postgres:///testdb?host=/var/run/postgresql")?; + // + // To query the connection type at runtime via the pool: + // ----------------------------------------------------- + // let db_type = pool.any_kind(); + + let connection_options; + if database.starts_with("postgresql://") { + connection_options = AnyConnectOptions::from_str(database)?; + } else { + let connection_string; + if !database.starts_with("sqlite://") { + connection_string = format!("sqlite://{}?mode=rwc", database); + } else { + connection_string = database.to_string(); + } + connection_options = AnyConnectOptions::from_str(connection_string.as_str()).unwrap(); + } + + let pool = AnyPoolOptions::new() + .max_connections(5) + .connect_with(connection_options) + .await?; + if *command == ValveCommand::Load && pool.any_kind() == AnyKind::Sqlite { + sqlx_query("PRAGMA foreign_keys = ON") + .execute(&pool) + .await?; + if initial_load { + // These pragmas are unsafe but they are used during initial loading since data + // integrity is not a priority in this case. + sqlx_query("PRAGMA journal_mode = OFF") + .execute(&pool) + .await?; + sqlx_query("PRAGMA synchronous = 0").execute(&pool).await?; + sqlx_query("PRAGMA cache_size = 1000000") + .execute(&pool) + .await?; + sqlx_query("PRAGMA temp_store = MEMORY") + .execute(&pool) + .await?; + } + } + + let (sorted_table_list, constraints_config) = configure_db_old( + &mut tables_config, + &mut datatypes_config, + &pool, + &parser, + verbose, + command, + ) + .await?; + + let mut config = SerdeMap::new(); + config.insert( + String::from("special"), + SerdeValue::Object(specials_config.clone()), + ); + config.insert( + String::from("table"), + SerdeValue::Object(tables_config.clone()), + ); + config.insert( + String::from("datatype"), + SerdeValue::Object(datatypes_config.clone()), + ); + config.insert( + String::from("rule"), + SerdeValue::Object(rules_config.clone()), + ); + config.insert( + String::from("constraints"), + SerdeValue::Object(constraints_config.clone()), + ); + let mut sorted_table_serdevalue_list: Vec = vec![]; + for table in &sorted_table_list { + sorted_table_serdevalue_list.push(SerdeValue::String(table.to_string())); + } + config.insert( + String::from("sorted_table_list"), + SerdeValue::Array(sorted_table_serdevalue_list), + ); + + let compiled_datatype_conditions = get_compiled_datatype_conditions(&config, &parser); + let compiled_rule_conditions = + get_compiled_rule_conditions(&config, compiled_datatype_conditions.clone(), &parser); + + if *command == ValveCommand::Load { + if verbose { + eprintln!( + "{} - Processing {} tables.", + Utc::now(), + sorted_table_list.len() + ); + } + load_db_old( + &config, + &pool, + &compiled_datatype_conditions, + &compiled_rule_conditions, + verbose, + ) + .await?; + } + + let config = SerdeValue::Object(config); + Ok(config.to_string()) +} + +async fn validate_rows_inter_and_insert_old( + config: &SerdeMap, + pool: &AnyPool, + table_name: &String, + rows: &mut Vec, + chunk_number: usize, + messages_stats: &mut HashMap, + verbose: bool, +) -> Result<(), sqlx::Error> { + // First, do the tree validation: + validate_rows_trees(config, pool, table_name, rows) + .await + .unwrap(); + + // Try to insert the rows to the db first without validating unique and foreign constraints. + // If there are constraint violations this will cause a database error, in which case we then + // explicitly do the constraint validation and insert the resulting rows. + // Note that instead of passing messages_stats here, we are going to initialize an empty map + // and pass that instead. The reason is that if a database error gets thrown, and then we + // redo the validation later, some of the messages will be double-counted. So to avoid that + // we send an empty map here, and in the case of no database error, we will just add the + // contents of the temporary map to messages_stats (in the Ok branch of the match statement + // below). + let mut tmp_messages_stats = HashMap::new(); + tmp_messages_stats.insert("error".to_string(), 0); + tmp_messages_stats.insert("warning".to_string(), 0); + tmp_messages_stats.insert("info".to_string(), 0); + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + &mut tmp_messages_stats, + verbose, + pool, + ) + .await + .unwrap(); + + let main_sql = local_sql_syntax(&pool, &main_sql); + let mut main_query = sqlx_query(&main_sql); + for param in &main_params { + main_query = main_query.bind(param); + } + let main_result = main_query.execute(pool).await; + match main_result { + Ok(_) => { + let conflict_sql = local_sql_syntax(&pool, &conflict_sql); + let mut conflict_query = sqlx_query(&conflict_sql); + for param in &conflict_params { + conflict_query = conflict_query.bind(param); + } + conflict_query.execute(pool).await?; + + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); + } + message_query.execute(pool).await?; + + if verbose { + let curr_errors = messages_stats.get("error").unwrap(); + messages_stats.insert( + "error".to_string(), + curr_errors + tmp_messages_stats.get("error").unwrap(), + ); + let curr_warnings = messages_stats.get("warning").unwrap(); + messages_stats.insert( + "warning".to_string(), + curr_warnings + tmp_messages_stats.get("warning").unwrap(), + ); + let curr_infos = messages_stats.get("info").unwrap(); + messages_stats.insert( + "info".to_string(), + curr_infos + tmp_messages_stats.get("info").unwrap(), + ); + } + } + Err(_) => { + validate_rows_constraints(config, pool, table_name, rows) + .await + .unwrap(); + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + messages_stats, + verbose, + pool, + ) + .await + .unwrap(); + + let main_sql = local_sql_syntax(&pool, &main_sql); + let mut main_query = sqlx_query(&main_sql); + for param in &main_params { + main_query = main_query.bind(param); + } + main_query.execute(pool).await?; + + let conflict_sql = local_sql_syntax(&pool, &conflict_sql); + let mut conflict_query = sqlx_query(&conflict_sql); + for param in &conflict_params { + conflict_query = conflict_query.bind(param); + } + conflict_query.execute(pool).await?; + + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); + } + message_query.execute(pool).await?; + } + }; + + Ok(()) +} + +async fn validate_and_insert_chunks_old( + config: &SerdeMap, + pool: &AnyPool, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + table_name: &String, + chunks: &IntoChunks>, + headers: &csv::StringRecord, + messages_stats: &mut HashMap, + verbose: bool, +) -> Result<(), sqlx::Error> { + if !MULTI_THREADED { + for (chunk_number, chunk) in chunks.into_iter().enumerate() { + let mut rows: Vec<_> = chunk.collect(); + let mut intra_validated_rows = validate_rows_intra_old( + config, + compiled_datatype_conditions, + compiled_rule_conditions, + table_name, + headers, + &mut rows, + ); + validate_rows_inter_and_insert_old( + config, + pool, + table_name, + &mut intra_validated_rows, + chunk_number, + messages_stats, + verbose, + ) + .await?; + } + Ok(()) + } else { + // Here is how this works. First of all note that we are given a number of chunks of rows, + // where the number of rows in each chunk is determined by CHUNK_SIZE (defined above). We + // then divide the chunks into batches, where the number of chunks in each batch is + // determined by the number of CPUs present on the system. We then iterate over the + // batches one by one, assigning each chunk in a given batch to a worker thread whose + // job is to perform intra-row validation on that chunk. The workers work in parallel, one + // per CPU, and after all the workers have completed and their results have been collected, + // we then perform inter-row validation on the chunks in the batch, this time serially. + // Once this is done, we move on to the next batch and continue in this fashion. + let num_cpus = num_cpus::get(); + let batches = chunks.into_iter().chunks(num_cpus); + let mut chunk_number = 0; + for batch in batches.into_iter() { + let mut results = BTreeMap::new(); + crossbeam::scope(|scope| { + let mut workers = vec![]; + for chunk in batch.into_iter() { + let mut rows: Vec<_> = chunk.collect(); + workers.push(scope.spawn(move |_| { + validate_rows_intra_old( + config, + compiled_datatype_conditions, + compiled_rule_conditions, + table_name, + headers, + &mut rows, + ) + })); + } + + for worker in workers { + let result = worker.join().unwrap(); + results.insert(chunk_number, result); + chunk_number += 1; + } + }) + .expect("A child thread panicked"); + + for (chunk_number, mut intra_validated_rows) in results { + validate_rows_inter_and_insert_old( + config, + pool, + table_name, + &mut intra_validated_rows, + chunk_number, + messages_stats, + verbose, + ) + .await?; + } + } + + Ok(()) + } +} + +pub async fn get_record_to_undo_old(pool: &AnyPool) -> Result, sqlx::Error> { + // Look in the history table, get the row with the greatest ID, get the row number, + // from, and to, and determine whether the last operation was a delete, insert, or update. + let is_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS" + } else { + "IS NOT DISTINCT FROM" + }; + let sql = format!( + r#"SELECT * FROM "history" + WHERE "undone_by" {} NULL + ORDER BY "history_id" DESC LIMIT 1"#, + is_clause + ); + let query = sqlx_query(&sql); + let result_row = query.fetch_optional(pool).await?; + Ok(result_row) +} + +/// Given a database pool fetch the row in the history table that has been most recently marked as +/// undone. +pub async fn get_record_to_redo_old(pool: &AnyPool) -> Result, sqlx::Error> { + // Look in the history table, get the row with the greatest ID, get the row number, + // from, and to, and determine whether the last operation was a delete, insert, or update. + let is_not_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS NOT" + } else { + "IS DISTINCT FROM" + }; + let is_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS" + } else { + "IS NOT DISTINCT FROM" + }; + let sql = format!( + r#"SELECT * FROM "history" h1 + WHERE "undone_by" {is_not} NULL + AND NOT EXISTS ( + SELECT 1 FROM "history" h2 + WHERE h2.history_id > h1.history_id + AND "undone_by" {is} NULL + ) + ORDER BY "timestamp" DESC LIMIT 1"#, + is_not = is_not_clause, + is = is_clause + ); + let query = sqlx_query(&sql); + let result_row = query.fetch_optional(pool).await?; + Ok(result_row) +} + +/// Given a global configuration map, maps of compiled datatype and ruled conditions, a database +/// connection pool, and the user who initiated the undo, find the last recorded change to the +/// database and undo it, indicating in the history table that undo_user is responsible. +#[async_recursion] +pub async fn undo_old( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + undo_user: &str, +) -> Result<(), sqlx::Error> { + let last_change = match get_record_to_undo_old(pool).await? { + None => { + eprintln!("WARN: Nothing to undo."); + return Ok(()); + } + Some(r) => r, + }; + let history_id: i32 = last_change.get("history_id"); + let history_id = history_id as u16; + let table: &str = last_change.get("table"); + let row_number: i64 = last_change.get("row"); + let row_number = row_number as u32; + let from = get_json_from_row(&last_change, "from"); + let to = get_json_from_row(&last_change, "to"); + + match (from, to) { + (None, None) => { + return Err(SqlxCErr( + "Cannot redo unknown operation from None to None".into(), + )) + } + (None, Some(_)) => { + // Undo an insert: + let mut tx = pool.begin().await?; + + delete_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &row_number, + ) + .await + .unwrap(); + + switch_undone_state(undo_user, history_id, true, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + (Some(from), None) => { + // Undo a delete: + let mut tx = pool.begin().await?; + + insert_new_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &from, + Some(row_number), + false, + ) + .await + .unwrap(); + + switch_undone_state(undo_user, history_id, true, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + (Some(from), Some(_)) => { + // Undo an an update: + let mut tx = pool.begin().await?; + + update_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &from, + &row_number, + false, + false, + ) + .await + .unwrap(); + + switch_undone_state(undo_user, history_id, true, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + } + Ok(()) +} + +/// Given a global configuration map, maps of compiled datatype and ruled conditions, a database +/// connection pool, and the user who initiated the redo, find the last recorded change to the +/// database that was undone and redo it, indicating in the history table that redo_user is +/// responsible for the redo. +#[async_recursion] +pub async fn redo_old( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + redo_user: &str, +) -> Result<(), sqlx::Error> { + let last_undo = match get_record_to_redo_old(pool).await? { + None => { + eprintln!("WARN: Nothing to redo."); + return Ok(()); + } + Some(last_undo) => { + let undone_by = last_undo.try_get_raw("undone_by")?; + if undone_by.is_null() { + eprintln!("WARN: Nothing to redo."); + return Ok(()); + } + last_undo + } + }; + let history_id: i32 = last_undo.get("history_id"); + let history_id = history_id as u16; + let table: &str = last_undo.get("table"); + let row_number: i64 = last_undo.get("row"); + let row_number = row_number as u32; + let from = get_json_from_row(&last_undo, "from"); + let to = get_json_from_row(&last_undo, "to"); + + match (from, to) { + (None, None) => { + return Err(SqlxCErr( + "Cannot redo unknown operation from None to None".into(), + )) + } + (None, Some(to)) => { + // Redo an insert: + let mut tx = pool.begin().await?; + + insert_new_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &to, + Some(row_number), + false, + ) + .await + .unwrap(); + + switch_undone_state(redo_user, history_id, false, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + (Some(_), None) => { + // Redo a delete: + let mut tx = pool.begin().await?; + + delete_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &row_number, + ) + .await + .unwrap(); + + switch_undone_state(redo_user, history_id, false, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + (Some(_), Some(to)) => { + // Redo an an update: + let mut tx = pool.begin().await?; + + update_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &to, + &row_number, + false, + false, + ) + .await + .unwrap(); + + switch_undone_state(redo_user, history_id, false, &mut tx, pool) + .await + .unwrap(); + tx.commit().await?; + } + } + Ok(()) +} + +/// A wrapper around [insert_new_row_tx()] in which the following steps are also performed: +/// - A database transaction is created and then committed once the given new row has been inserted. +/// - The row is validated before insertion and the update to the database is recorded to the +/// history table indicating that the given user is responsible for the change. +#[async_recursion] +pub async fn insert_new_row_old( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + table: &str, + row: &SerdeMap, + new_row_number: Option, + user: &str, +) -> Result { + let mut tx = pool.begin().await?; + + let row = validate_row_old( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + Some(&mut tx), + table, + row, + new_row_number, + None, + ) + .await?; + + let rn = insert_new_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + &row, + new_row_number, + true, + ) + .await + .unwrap(); + + record_row_change(&mut tx, table, &rn, None, Some(&row), user) + .await + .unwrap(); + tx.commit().await?; + Ok(rn) +} + +/// A wrapper around [delete_row_tx()] in which the database transaction is implicitly created +/// and then committed once the given row has been deleted, and the change to the database is +/// recorded in the history table indicating that the given user is responsible for the change. +#[async_recursion] +pub async fn delete_row_old( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + table: &str, + row_number: &u32, + user: &str, +) -> Result<(), sqlx::Error> { + let mut tx = pool.begin().await?; + + let row = get_row_from_db(global_config, pool, &mut tx, &table, row_number) + .await + .unwrap(); + record_row_change(&mut tx, &table, row_number, Some(&row), None, user) + .await + .unwrap(); + + delete_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table, + row_number, + ) + .await + .unwrap(); + + tx.commit().await?; + Ok(()) +} + +/// A wrapper around [update_row_tx()] in which the database transaction is implicitly created +/// and then committed once the given row has been updated, the given row is validated before +/// the update, and the update is recorded to the history table indicating that the given user +/// is responsible for the change. +#[async_recursion] +pub async fn update_row_old( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + table_name: &str, + row: &SerdeMap, + row_number: &u32, + user: &str, +) -> Result<(), sqlx::Error> { + let mut tx = pool.begin().await?; + + // Get the old version of the row from the database so that we can later record it to the + // history table: + let old_row = get_row_from_db(global_config, pool, &mut tx, table_name, row_number) + .await + .unwrap(); + + let row = validate_row_old( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + Some(&mut tx), + table_name, + row, + Some(*row_number), + None, + ) + .await?; + + update_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + &mut tx, + table_name, + &row, + row_number, + true, + false, + ) + .await + .unwrap(); + + // Record the row update in the history table: + record_row_change( + &mut tx, + table_name, + row_number, + Some(&old_row), + Some(&row), + user, + ) + .await + .unwrap(); + + tx.commit().await?; + Ok(()) +} + +fn create_table_statement_old( + tables_config: &mut SerdeMap, + datatypes_config: &mut SerdeMap, + parser: &StartParser, + table_name: &String, + pool: &AnyPool, +) -> (Vec, SerdeValue) { + let mut drop_table_sql = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name); + if pool.any_kind() == AnyKind::Postgres { + drop_table_sql.push_str(" CASCADE"); + } + drop_table_sql.push_str(";"); + let mut statements = vec![drop_table_sql]; + let mut create_lines = vec![ + format!(r#"CREATE TABLE "{}" ("#, table_name), + String::from(r#" "row_number" BIGINT,"#), + ]; + + let normal_table_name; + if let Some(s) = table_name.strip_suffix("_conflict") { + normal_table_name = String::from(s); + } else { + normal_table_name = table_name.to_string(); + } + + let column_names = tables_config + .get(&normal_table_name) + .and_then(|t| t.get("column_order")) + .and_then(|c| c.as_array()) + .unwrap() + .iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(); + + let columns = tables_config + .get(normal_table_name.as_str()) + .and_then(|c| c.as_object()) + .and_then(|o| o.get("column")) + .and_then(|c| c.as_object()) + .unwrap(); + + let mut table_constraints = json!({ + "foreign": [], + "unique": [], + "primary": [], + "tree": [], + "under": [], + }); + + let mut colvals: Vec = vec![]; + for column_name in &column_names { + let column = columns + .get(column_name) + .and_then(|c| c.as_object()) + .unwrap(); + colvals.push(column.clone()); + } + + let c = colvals.len(); + let mut r = 0; + for row in colvals { + r += 1; + let sql_type = get_sql_type( + datatypes_config, + &row.get("datatype") + .and_then(|d| d.as_str()) + .and_then(|s| Some(s.to_string())) + .unwrap(), + pool, + ); + + if let None = sql_type { + panic!("Missing SQL type for {}", row.get("datatype").unwrap()); + } + let sql_type = sql_type.unwrap(); + + let short_sql_type = { + if sql_type.to_lowercase().as_str().starts_with("varchar(") { + "VARCHAR" + } else { + &sql_type + } + }; + + if pool.any_kind() == AnyKind::Postgres { + if !SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) { + panic!( + "Unrecognized PostgreSQL SQL type '{}' for datatype: '{}'. \ + Accepted SQL types for PostgreSQL are: {}", + sql_type, + row.get("datatype").and_then(|d| d.as_str()).unwrap(), + SQL_TYPES.join(", ") + ); + } + } else { + if !SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) { + panic!( + "Unrecognized SQLite SQL type '{}' for datatype '{}'. \ + Accepted SQL datatypes for SQLite are: {}", + sql_type, + row.get("datatype").and_then(|d| d.as_str()).unwrap(), + SQL_TYPES.join(", ") + ); + } + } + + let column_name = row.get("column").and_then(|s| s.as_str()).unwrap(); + let mut line = format!(r#" "{}" {}"#, column_name, sql_type); + let structure = row.get("structure").and_then(|s| s.as_str()); + if let Some(structure) = structure { + if structure != "" && !table_name.ends_with("_conflict") { + let parsed_structure = parser.parse(structure).unwrap(); + for expression in parsed_structure { + match *expression { + Expression::Label(value) if value == "primary" => { + line.push_str(" PRIMARY KEY"); + let primary_keys = table_constraints + .get_mut("primary") + .and_then(|v| v.as_array_mut()) + .unwrap(); + primary_keys.push(SerdeValue::String(column_name.to_string())); + } + Expression::Label(value) if value == "unique" => { + line.push_str(" UNIQUE"); + let unique_constraints = table_constraints + .get_mut("unique") + .and_then(|v| v.as_array_mut()) + .unwrap(); + unique_constraints.push(SerdeValue::String(column_name.to_string())); + } + Expression::Function(name, args) if name == "from" => { + if args.len() != 1 { + panic!("Invalid foreign key: {} for: {}", structure, table_name); + } + match &*args[0] { + Expression::Field(ftable, fcolumn) => { + let foreign_keys = table_constraints + .get_mut("foreign") + .and_then(|v| v.as_array_mut()) + .unwrap(); + let foreign_key = json!({ + "column": column_name, + "ftable": ftable, + "fcolumn": fcolumn, + }); + foreign_keys.push(foreign_key); + } + _ => { + panic!("Invalid foreign key: {} for: {}", structure, table_name) + } + }; + } + Expression::Function(name, args) if name == "tree" => { + if args.len() != 1 { + panic!( + "Invalid 'tree' constraint: {} for: {}", + structure, table_name + ); + } + match &*args[0] { + Expression::Label(child) => { + let child_datatype = columns + .get(child) + .and_then(|c| c.get("datatype")) + .and_then(|d| d.as_str()); + if let None = child_datatype { + panic!( + "Could not determine SQL datatype for {} of tree({})", + child, child + ); + } + let child_datatype = child_datatype.unwrap(); + let parent = column_name; + let child_sql_type = get_sql_type( + datatypes_config, + &child_datatype.to_string(), + pool, + ) + .unwrap(); + if sql_type != child_sql_type { + panic!( + "SQL type '{}' of '{}' in 'tree({})' for table \ + '{}' doe snot match SQL type: '{}' of parent: '{}'.", + child_sql_type, + child, + child, + table_name, + sql_type, + parent + ); + } + let tree_constraints = table_constraints + .get_mut("tree") + .and_then(|t| t.as_array_mut()) + .unwrap(); + let entry = json!({"parent": column_name, + "child": child}); + tree_constraints.push(entry); + } + _ => { + panic!( + "Invalid 'tree' constraint: {} for: {}", + structure, table_name + ); + } + }; + } + Expression::Function(name, args) if name == "under" => { + let generic_error = format!( + "Invalid 'under' constraint: {} for: {}", + structure, table_name + ); + if args.len() != 2 { + panic!("{}", generic_error); + } + match (&*args[0], &*args[1]) { + (Expression::Field(ttable, tcolumn), Expression::Label(value)) => { + let under_constraints = table_constraints + .get_mut("under") + .and_then(|u| u.as_array_mut()) + .unwrap(); + let entry = json!({"column": column_name, + "ttable": ttable, + "tcolumn": tcolumn, + "value": value}); + under_constraints.push(entry); + } + (_, _) => panic!("{}", generic_error), + }; + } + _ => panic!( + "Unrecognized structure: {} for {}.{}", + structure, table_name, column_name + ), + }; + } + } + } + if r >= c + && table_constraints + .get("foreign") + .and_then(|v| v.as_array()) + .and_then(|v| Some(v.is_empty())) + .unwrap() + { + line.push_str(""); + } else { + line.push_str(","); + } + create_lines.push(line); + } + + let foreign_keys = table_constraints + .get("foreign") + .and_then(|v| v.as_array()) + .unwrap(); + let num_fkeys = foreign_keys.len(); + for (i, fkey) in foreign_keys.iter().enumerate() { + create_lines.push(format!( + r#" FOREIGN KEY ("{}") REFERENCES "{}"("{}"){}"#, + fkey.get("column").and_then(|s| s.as_str()).unwrap(), + fkey.get("ftable").and_then(|s| s.as_str()).unwrap(), + fkey.get("fcolumn").and_then(|s| s.as_str()).unwrap(), + if i < (num_fkeys - 1) { "," } else { "" } + )); + } + create_lines.push(String::from(");")); + // We are done generating the lines for the 'create table' statement. Join them and add the + // result to the statements to return: + statements.push(String::from(create_lines.join("\n"))); + + // Loop through the tree constraints and if any of their associated child columns do not already + // have an associated unique or primary index, create one implicitly here: + let tree_constraints = table_constraints + .get("tree") + .and_then(|v| v.as_array()) + .unwrap(); + for tree in tree_constraints { + let unique_keys = table_constraints + .get("unique") + .and_then(|v| v.as_array()) + .unwrap(); + let primary_keys = table_constraints + .get("primary") + .and_then(|v| v.as_array()) + .unwrap(); + let tree_child = tree.get("child").and_then(|c| c.as_str()).unwrap(); + if !unique_keys.contains(&SerdeValue::String(tree_child.to_string())) + && !primary_keys.contains(&SerdeValue::String(tree_child.to_string())) + { + statements.push(format!( + r#"CREATE UNIQUE INDEX "{}_{}_idx" ON "{}"("{}");"#, + table_name, tree_child, table_name, tree_child + )); + } + } + + // Finally, create a further unique index on row_number: + statements.push(format!( + r#"CREATE UNIQUE INDEX "{}_row_number_idx" ON "{}"("row_number");"#, + table_name, table_name + )); + + return (statements, table_constraints); +} + +/// Given a configuration map, a database connection pool, a parser, HashMaps representing +/// compiled datatype and rule conditions, and a HashMap representing parsed structure conditions, +/// read in the data TSV files corresponding to each configured table, then validate and load all of +/// the corresponding data rows. If the verbose flag is set to true, output progress messages to +/// stderr during load. +async fn load_db_old( + config: &SerdeMap, + pool: &AnyPool, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + verbose: bool, +) -> Result<(), sqlx::Error> { + let mut table_list = vec![]; + for table in config + .get("sorted_table_list") + .and_then(|l| l.as_array()) + .unwrap() + { + table_list.push(table.as_str().and_then(|s| Some(s.to_string())).unwrap()); + } + let table_list = table_list; // Change the table_list to read only after populating it. + let num_tables = table_list.len(); + let mut total_errors = 0; + let mut total_warnings = 0; + let mut total_infos = 0; + let mut table_num = 1; + for table_name in table_list { + if verbose { + eprintln!( + "{} - Loading table {}/{}: {}", + Utc::now(), + table_num, + num_tables, + table_name + ); + } + table_num += 1; + let path = String::from( + config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|o| o.get(&table_name)) + .and_then(|n| n.get("path")) + .and_then(|p| p.as_str()) + .unwrap(), + ); + let mut rdr = csv::ReaderBuilder::new() + .has_headers(false) + .delimiter(b'\t') + .from_reader(File::open(path.clone()).unwrap_or_else(|err| { + panic!("Unable to open '{}': {}", path.clone(), err); + })); + + // Extract the headers, which we will need later: + let mut records = rdr.records(); + let headers; + if let Some(result) = records.next() { + headers = result.unwrap(); + } else { + panic!("'{}' is empty", path); + } + + for header in headers.iter() { + if header.trim().is_empty() { + panic!( + "One or more of the header fields is empty for table '{}'", + table_name + ); + } + } + + // HashMap used to report info about the number of error/warning/info messages for this + // table when the verbose flag is set to true: + let mut messages_stats = HashMap::new(); + messages_stats.insert("error".to_string(), 0); + messages_stats.insert("warning".to_string(), 0); + messages_stats.insert("info".to_string(), 0); + + // Split the data into chunks of size CHUNK_SIZE before passing them to the validation + // logic: + let chunks = records.chunks(CHUNK_SIZE); + validate_and_insert_chunks_old( + config, + pool, + compiled_datatype_conditions, + compiled_rule_conditions, + &table_name, + &chunks, + &headers, + &mut messages_stats, + verbose, + ) + .await?; + + // We need to wait until all of the rows for a table have been loaded before validating the + // "foreign" constraints on a table's trees, since this checks if the values of one column + // (the tree's parent) are all contained in another column (the tree's child): + // We also need to wait before validating a table's "under" constraints. Although the tree + // associated with such a constraint need not be defined on the same table, it can be. + let mut recs_to_update = validate_tree_foreign_keys(config, pool, None, &table_name, None) + .await + .unwrap(); + recs_to_update.append( + &mut validate_under(config, pool, None, &table_name, None) + .await + .unwrap(), + ); + + for record in recs_to_update { + let row_number = record.get("row_number").unwrap(); + let column_name = record.get("column").and_then(|s| s.as_str()).unwrap(); + let value = record.get("value").and_then(|s| s.as_str()).unwrap(); + let level = record.get("level").and_then(|s| s.as_str()).unwrap(); + let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); + let message = record.get("message").and_then(|s| s.as_str()).unwrap(); + + let sql = local_sql_syntax( + &pool, + &format!( + r#"INSERT INTO "message" + ("table", "row", "column", "value", "level", "rule", "message") + VALUES ({}, {}, {}, {}, {}, {}, {})"#, + SQL_PARAM, row_number, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM + ), + ); + let mut query = sqlx_query(&sql); + query = query.bind(&table_name); + query = query.bind(&column_name); + query = query.bind(&value); + query = query.bind(&level); + query = query.bind(&rule); + query = query.bind(&message); + query.execute(pool).await?; + + if verbose { + // Add the generated message to messages_stats: + let messages = vec![json!({ + "message": message, + "level": level, + })]; + add_message_counts(&messages, &mut messages_stats); + } + } + + if verbose { + // Output a report on the messages generated to stderr: + let errors = messages_stats.get("error").unwrap(); + let warnings = messages_stats.get("warning").unwrap(); + let infos = messages_stats.get("info").unwrap(); + let status_message = format!( + "{} errors, {} warnings, and {} information messages generated for {}", + errors, warnings, infos, table_name + ); + eprintln!("{} - {}", Utc::now(), status_message); + total_errors += errors; + total_warnings += warnings; + total_infos += infos; + } + } + + if verbose { + eprintln!( + "{} - Loading complete with {} errors, {} warnings, and {} information messages", + Utc::now(), + total_errors, + total_warnings, + total_infos + ); + } + + Ok(()) +} + /// Main entrypoint for the Valve API. #[derive(Debug)] pub struct Valve { diff --git a/src/validate.rs b/src/validate.rs index b58d639a..f4c2b35d 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -1,7 +1,10 @@ use chrono::Utc; use indexmap::IndexMap; use serde_json::{json, Value as SerdeValue}; -use sqlx::{any::AnyPool, query as sqlx_query, Acquire, Row, Transaction, ValueRef}; +use sqlx::{ + any::AnyPool, query as sqlx_query, Acquire, Error::Configuration as SqlxCErr, Row, Transaction, + ValueRef, +}; use std::collections::HashMap; use crate::{ @@ -46,6 +49,200 @@ pub struct QueryAsIf { pub row: Option, } +pub async fn validate_row_old( + config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + tx: Option<&mut Transaction<'_, sqlx::Any>>, + table_name: &str, + row: &SerdeMap, + row_number: Option, + query_as_if: Option<&QueryAsIf>, +) -> Result { + // Fallback to a default transaction if it is not given. Since we do not commit before it falls + // out of scope the transaction will be rolled back at the end of this function. And since this + // function is read-only the rollback is trivial and therefore inconsequential. + let default_tx = &mut pool.begin().await?; + let tx = match tx { + Some(tx) => tx, + None => default_tx, + }; + + // Initialize the result row with the values from the given row: + let mut result_row = ResultRow { + row_number: row_number, + contents: IndexMap::new(), + }; + + for (column, cell) in row.iter() { + let nulltype = match cell.get("nulltype") { + None => None, + Some(SerdeValue::String(s)) => Some(s.to_string()), + _ => { + return Err(SqlxCErr( + format!("No string 'nulltype' in cell: {:?}.", cell).into(), + )) + } + }; + let value = match cell.get("value") { + Some(SerdeValue::String(s)) => s.to_string(), + Some(SerdeValue::Number(n)) => format!("{}", n), + _ => { + return Err(SqlxCErr( + format!("No string/number 'value' in cell: {:#?}.", cell).into(), + )) + } + }; + let valid = match cell.get("valid").and_then(|v| v.as_bool()) { + Some(b) => b, + None => { + return Err(SqlxCErr( + format!("No bool 'valid' in cell: {:?}.", cell).into(), + )) + } + }; + let messages = match cell.get("messages").and_then(|m| m.as_array()) { + Some(a) => a.to_vec(), + None => { + return Err(SqlxCErr( + format!("No array 'messages' in cell: {:?}.", cell).into(), + )) + } + }; + let result_cell = ResultCell { + nulltype: nulltype, + value: value, + valid: valid, + messages: messages, + }; + result_row.contents.insert(column.to_string(), result_cell); + } + + // We check all the cells for nulltype first, since the rules validation requires that we + // have this information for all cells. + for (column_name, cell) in result_row.contents.iter_mut() { + validate_cell_nulltype( + config, + compiled_datatype_conditions, + &table_name.to_string(), + column_name, + cell, + ); + } + + let context = result_row.clone(); + for (column_name, cell) in result_row.contents.iter_mut() { + validate_cell_rules( + config, + compiled_rule_conditions, + &table_name.to_string(), + column_name, + &context, + cell, + ); + + if cell.nulltype == None { + validate_cell_datatype( + config, + compiled_datatype_conditions, + &table_name.to_string(), + column_name, + cell, + ); + + // We don't do any further validation on cells that have SQL type violations because + // they can result in database errors when, for instance, we compare a numeric with a + // non-numeric type. + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if !is_sql_type_error(&sql_type, &cell.value) { + // TODO: Pass the query_as_if parameter to validate_cell_trees. + validate_cell_trees( + config, + pool, + Some(tx), + &table_name.to_string(), + column_name, + cell, + &context, + &vec![], + ) + .await + .unwrap(); + validate_cell_foreign_constraints( + config, + pool, + Some(tx), + &table_name.to_string(), + column_name, + cell, + query_as_if, + ) + .await + .unwrap(); + validate_cell_unique_constraints( + config, + pool, + Some(tx), + &table_name.to_string(), + column_name, + cell, + &vec![], + row_number, + ) + .await + .unwrap(); + } + } + } + + // TODO: Possibly propagate `query_as_if` down into this function: + let mut violations = validate_tree_foreign_keys( + config, + pool, + Some(tx), + &table_name.to_string(), + Some(&context.clone()), + ) + .await + .unwrap(); + violations.append( + // TODO: Possibly propagate `query_as_if` down into this function: + &mut validate_under( + config, + pool, + Some(tx), + &table_name.to_string(), + Some(&context.clone()), + ) + .await + .unwrap(), + ); + + for violation in violations.iter_mut() { + let vrow_number = violation.get("row_number").unwrap().as_i64().unwrap() as u32; + if Some(vrow_number) == row_number || (row_number == None && Some(vrow_number) == Some(0)) { + let column = violation.get("column").and_then(|s| s.as_str()).unwrap(); + let level = violation.get("level").and_then(|s| s.as_str()).unwrap(); + let rule = violation.get("rule").and_then(|s| s.as_str()).unwrap(); + let message = violation.get("message").and_then(|s| s.as_str()).unwrap(); + let result_cell = &mut result_row.contents.get_mut(column).unwrap(); + result_cell.messages.push(json!({ + "level": level, + "rule": rule, + "message": message, + })); + if result_cell.valid { + result_cell.valid = false; + } + } + } + + let result_row = remove_duplicate_messages(&result_row_to_config_map(&result_row)).unwrap(); + Ok(result_row) +} + /// Given a config map, maps of compiled datatype and rule conditions, a database connection /// pool, a table name, a row to validate and a row number in the case where the row already /// exists, perform both intra- and inter-row validation and return the validated row. @@ -1591,6 +1788,88 @@ async fn validate_cell_trees( Ok(()) } +pub fn validate_rows_intra_old( + config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + table_name: &String, + headers: &csv::StringRecord, + rows: &Vec>, +) -> Vec { + let mut result_rows = vec![]; + for row in rows { + match row { + Err(err) => eprintln!("Error while processing row for '{}': {}", table_name, err), + Ok(row) => { + let mut result_row = ResultRow { + row_number: None, + contents: IndexMap::new(), + }; + for (i, value) in row.iter().enumerate() { + let result_cell = ResultCell { + nulltype: None, + value: String::from(value), + valid: true, + messages: vec![], + }; + let column = headers.get(i).unwrap(); + result_row.contents.insert(column.to_string(), result_cell); + } + + let column_names = config + .get("table") + .and_then(|t| t.get(table_name)) + .and_then(|t| t.get("column_order")) + .and_then(|c| c.as_array()) + .unwrap() + .iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(); + + // We begin by determining the nulltype of all of the cells, since the rules + // validation step requires that all cells have this information. + for column_name in &column_names { + let cell = result_row.contents.get_mut(column_name).unwrap(); + validate_cell_nulltype( + config, + compiled_datatype_conditions, + table_name, + &column_name, + cell, + ); + } + + for column_name in &column_names { + let context = result_row.clone(); + let cell = result_row.contents.get_mut(column_name).unwrap(); + validate_cell_rules( + config, + compiled_rule_conditions, + table_name, + &column_name, + &context, + cell, + ); + + if cell.nulltype == None { + validate_cell_datatype( + config, + compiled_datatype_conditions, + table_name, + &column_name, + cell, + ); + } + } + result_rows.push(result_row); + } + }; + } + + // Finally return the result rows: + result_rows +} + /// Given a config map, a db connection pool, a table name, a column name, a cell to validate, /// the row, `context`, to which the cell belongs, and a list of previously validated rows, /// check the cell value against any unique-type keys that have been defined for the column.