From 08620dbba8d47e39cf0b58c9253c9f0c4b5d9162 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Tue, 30 Jan 2024 10:15:31 -0500 Subject: [PATCH] add new file valve.rs and refactor --- src/api_test.rs | 5 +- src/lib.rs | 1817 +--------------------------------------------- src/main.rs | 4 +- src/validate.rs | 12 +- src/valve.rs | 1828 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1852 insertions(+), 1814 deletions(-) create mode 100644 src/valve.rs diff --git a/src/api_test.rs b/src/api_test.rs index eba2acd5..e551a1da 100644 --- a/src/api_test.rs +++ b/src/api_test.rs @@ -1,4 +1,7 @@ -use ontodev_valve::{SerdeMap, Valve, ValveError}; +use ontodev_valve::{ + valve::{Valve, ValveError}, + SerdeMap, +}; use rand::distributions::{Alphanumeric, DistString, Distribution, Uniform}; use rand::{random, thread_rng}; use serde_json::json; diff --git a/src/lib.rs b/src/lib.rs index 51983c95..e498c14a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,20 +23,25 @@ extern crate lalrpop_util; pub mod ast; pub mod validate; +pub mod valve; lalrpop_mod!(pub valve_grammar); -use crate::validate::{ - validate_row_tx, validate_rows_constraints, validate_rows_intra, validate_rows_trees, - validate_tree_foreign_keys, validate_under, with_tree_sql, QueryAsIf, QueryAsIfKind, ResultRow, +use crate::{ + ast::Expression, + validate::{ + validate_row_tx, validate_rows_constraints, validate_rows_intra, validate_rows_trees, + QueryAsIf, QueryAsIfKind, ResultRow, + }, + valve::ValveError, + valve::ValveRow, + valve_grammar::StartParser, }; -use crate::{ast::Expression, valve_grammar::StartParser}; use async_recursion::async_recursion; use chrono::Utc; use crossbeam; -use csv::{QuoteStyle, ReaderBuilder, StringRecord, StringRecordsIter, WriterBuilder}; -use enquote::unquote; -use futures::{executor::block_on, TryStreamExt}; +use csv::{ReaderBuilder, StringRecord, StringRecordsIter}; +use futures::executor::block_on; use indexmap::IndexMap; use indoc::indoc; use itertools::{IntoChunks, Itertools}; @@ -78,11 +83,10 @@ lazy_static! { static ref SQL_TYPES: Vec<&'static str> = vec!["text", "varchar", "numeric", "integer", "real"]; } -/// Aliases for [serde_json::Map](..//serde_json/struct.Map.html). +/// Alias for [serde_json::Map](..//serde_json/struct.Map.html). // Note: serde_json::Map is // [backed by a BTreeMap by default](https://docs.serde.rs/serde_json/map/index.html) pub type SerdeMap = serde_json::Map; -pub type ValveRow = serde_json::Map; // TODO: Possibly replace these with the tracing library (see nanobot.rs). /// Write a debugging message to STDERR. @@ -176,1801 +180,6 @@ impl std::fmt::Debug for ColumnRule { } } -/// Main entrypoint for the Valve API. -#[derive(Clone, Debug)] -pub struct Valve { - /// The valve configuration map. - pub config: SerdeMap, - /// Pre-compiled datatype conditions. - pub compiled_datatype_conditions: HashMap, - /// Pre-compiled rule conditions. - pub compiled_rule_conditions: HashMap>>, - /// Parsed structure conditions: - pub parsed_structure_conditions: HashMap, - /// Lists of tables that depend on a given table, indexed by table. - pub table_dependencies_in: HashMap>, - /// Lists of tables that a given table depends on, indexed by table. - pub table_dependencies_out: HashMap>, - /// The database connection pool. - pub pool: AnyPool, - /// The user associated with this valve instance. - pub user: String, - /// Produce more logging output. - pub verbose: bool, -} - -#[derive(Debug)] -pub enum ValveError { - /// An error in the Valve configuration: - ConfigError(String), - /// An error that occurred while reading or writing to a CSV/TSV: - CsvError(csv::Error), - /// An error involving the data: - DataError(String), - /// An error generated by the underlying database: - DatabaseError(sqlx::Error), - /// An error in the inputs to a function: - InputError(String), - /// An error that occurred while reading/writing to stdio: - IOError(std::io::Error), - /// An error that occurred while serialising or deserialising to/from JSON: - SerdeJsonError(serde_json::Error), -} - -impl From for ValveError { - fn from(e: csv::Error) -> Self { - Self::CsvError(e) - } -} - -impl From for ValveError { - fn from(e: sqlx::Error) -> Self { - Self::DatabaseError(e) - } -} - -impl From for ValveError { - fn from(e: serde_json::Error) -> Self { - Self::SerdeJsonError(e) - } -} - -impl From for ValveError { - fn from(e: std::io::Error) -> Self { - Self::IOError(e) - } -} - -impl Valve { - /// Given a path to a table table, a path to a database, a flag for verbose output, and a flag - /// indicating whether the database should be configured for initial loading: Set up a database - /// connection, configure VALVE, and return a new Valve struct. - pub async fn build( - table_path: &str, - database: &str, - verbose: bool, - initial_load: bool, - ) -> Result { - let pool = get_pool_from_connection_string(database).await?; - if pool.any_kind() == AnyKind::Sqlite { - sqlx_query("PRAGMA foreign_keys = ON") - .execute(&pool) - .await?; - if initial_load { - // These pragmas are unsafe but they are used during initial loading since data - // integrity is not a priority in this case. - sqlx_query("PRAGMA journal_mode = OFF") - .execute(&pool) - .await?; - sqlx_query("PRAGMA synchronous = 0").execute(&pool).await?; - sqlx_query("PRAGMA cache_size = 1000000") - .execute(&pool) - .await?; - sqlx_query("PRAGMA temp_store = MEMORY") - .execute(&pool) - .await?; - } - } - - let parser = StartParser::new(); - let ( - specials_config, - tables_config, - datatypes_config, - rules_config, - constraints_config, - sorted_table_list, - table_dependencies_in, - table_dependencies_out, - ) = read_config_files(table_path, &parser, &pool); - - let mut config = SerdeMap::new(); - config.insert( - String::from("special"), - SerdeValue::Object(specials_config.clone()), - ); - config.insert( - String::from("table"), - SerdeValue::Object(tables_config.clone()), - ); - config.insert( - String::from("datatype"), - SerdeValue::Object(datatypes_config.clone()), - ); - config.insert( - String::from("rule"), - SerdeValue::Object(rules_config.clone()), - ); - config.insert( - String::from("constraints"), - SerdeValue::Object(constraints_config.clone()), - ); - let mut sorted_table_serdevalue_list: Vec = vec![]; - for table in &sorted_table_list { - sorted_table_serdevalue_list.push(SerdeValue::String(table.to_string())); - } - config.insert( - String::from("sorted_table_list"), - SerdeValue::Array(sorted_table_serdevalue_list), - ); - - let compiled_datatype_conditions = get_compiled_datatype_conditions(&config, &parser); - let compiled_rule_conditions = - get_compiled_rule_conditions(&config, compiled_datatype_conditions.clone(), &parser); - let parsed_structure_conditions = get_parsed_structure_conditions(&config, &parser); - - Ok(Self { - config: config, - compiled_datatype_conditions: compiled_datatype_conditions, - compiled_rule_conditions: compiled_rule_conditions, - parsed_structure_conditions: parsed_structure_conditions, - table_dependencies_in: table_dependencies_in, - table_dependencies_out: table_dependencies_out, - pool: pool, - user: String::from("VALVE"), - verbose: verbose, - }) - } - - /// Convenience function to retrieve the path to Valve's "table table", the main entrypoint - /// to Valve's configuration. - pub fn get_path(&self) -> String { - self.config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|t| t.get("table")) - .and_then(|t| t.as_object()) - .and_then(|t| t.get("path")) - .and_then(|p| p.as_str()) - .unwrap() - .to_string() - } - - /// Controls the maximum length of a username. - const USERNAME_MAX_LEN: usize = 20; - - /// Sets the user name, which must be a short, trimmed, string without newlines, for this Valve - /// instance. - pub fn set_user(&mut self, user: &str) -> Result<&mut Self, ValveError> { - if user.len() > Self::USERNAME_MAX_LEN { - return Err(ValveError::ConfigError(format!( - "Username '{}' is longer than {} characters.", - user, - Self::USERNAME_MAX_LEN - ))); - } else { - let user_regex = Regex::new(r#"^\S([^\n]*\S)*$"#).unwrap(); - if !user_regex.is_match(user) { - return Err(ValveError::ConfigError(format!( - "Username '{}' is not a short, trimmed, string without newlines.", - user, - ))); - } - } - self.user = user.to_string(); - Ok(self) - } - - /// Given a SQL string, execute it using the connection pool associated with the Valve instance. - async fn execute_sql(&self, sql: &str) -> Result<(), ValveError> { - sqlx_query(&sql).execute(&self.pool).await?; - Ok(()) - } - - /// Return the list of configured tables in sorted order, or reverse sorted order if the - /// reverse flag is set. - pub fn get_sorted_table_list(&self, reverse: bool) -> Vec<&str> { - let mut sorted_tables = self - .config - .get("sorted_table_list") - .and_then(|l| l.as_array()) - .and_then(|l| Some(l.iter().map(|i| i.as_str().unwrap()))) - .and_then(|l| Some(l.collect::>())) - .unwrap(); - if reverse { - sorted_tables.reverse(); - } - sorted_tables - } - - /// Given the name of a table, determine whether its current instantiation in the database - /// differs from the way it has been configured. The answer to this question is yes whenever - /// (1) the number of columns or any of their names differs from their configured values, or - /// the order of database columns differs from the configured order; (2) The SQL type of one or - /// more columns does not match the configured SQL type for that column; (3) Some column with a - /// 'unique', 'primary', or 'from(table, column)' in its column configuration fails to be - /// associated, in the database, with a unique constraint, primary key, or foreign key, - /// respectively; or vice versa; (4) The table does not exist in the database. - pub async fn table_has_changed(&self, table: &str) -> Result { - // A clojure that, given a parsed structure condition, a table and column name, and an - // unsigned integer representing whether the given column, in the case of a SQLite database, - // is a primary key (in the case of PostgreSQL, the sqlite_pk parameter is ignored): - // determine whether the structure of the column is properly reflected in the db. E.g., a - // `from(table.column)` struct should be associated with a foreign key, `primary` with a - // primary key, `unique` with a unique constraint. - let structure_has_changed = |pstruct: &Expression, - table: &str, - column: &str, - sqlite_pk: &u32| - -> Result { - // A clojure to determine whether the given column has the given constraint type, which - // can be one of 'UNIQUE', 'PRIMARY KEY', 'FOREIGN KEY': - let column_has_constraint_type = |constraint_type: &str| -> Result { - if self.pool.any_kind() == AnyKind::Postgres { - let sql = format!( - r#"SELECT 1 - FROM information_schema.table_constraints tco - JOIN information_schema.key_column_usage kcu - ON kcu.constraint_name = tco.constraint_name - AND kcu.constraint_schema = tco.constraint_schema - AND kcu.table_name = '{}' - WHERE tco.constraint_type = '{}' - AND kcu.column_name = '{}'"#, - table, constraint_type, column - ); - let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; - if rows.len() > 1 { - unreachable!(); - } - Ok(rows.len() == 1) - } else { - if constraint_type == "PRIMARY KEY" { - return Ok(*sqlite_pk == 1); - } else if constraint_type == "UNIQUE" { - let sql = format!(r#"PRAGMA INDEX_LIST("{}")"#, table); - for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { - let idx_name = row.get::("name"); - let unique = row.get::("unique") as u8; - if unique == 1 { - let sql = format!(r#"PRAGMA INDEX_INFO("{}")"#, idx_name); - let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; - if rows.len() == 1 { - let cname = rows[0].get::("name"); - if cname == column { - return Ok(true); - } - } - } - } - Ok(false) - } else if constraint_type == "FOREIGN KEY" { - let sql = format!(r#"PRAGMA FOREIGN_KEY_LIST("{}")"#, table); - for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { - let cname = row.get::("from"); - if cname == column { - return Ok(true); - } - } - Ok(false) - } else { - return Err(ValveError::InputError( - format!("Unrecognized constraint type: '{}'", constraint_type).into(), - )); - } - } - }; - - // Check if there is a change to whether this column is a primary/unique key: - let is_primary = match pstruct { - Expression::Label(label) if label == "primary" => true, - _ => false, - }; - if is_primary != column_has_constraint_type("PRIMARY KEY")? { - return Ok(true); - } else if !is_primary { - let is_unique = match pstruct { - Expression::Label(label) if label == "unique" => true, - _ => false, - }; - let unique_in_db = column_has_constraint_type("UNIQUE")?; - if is_unique != unique_in_db { - // A child of a tree constraint implies a unique db constraint, so if there is a - // unique constraint in the db that is not configured, that is the explanation, - // and in that case we do not count this as a change to the column. - if !unique_in_db { - return Ok(true); - } else { - let trees = - self.config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|o| o.get("tree")) - .and_then(|t| t.as_object()) - .and_then(|o| o.get(table)) - .and_then(|t| t.as_array()) - .and_then(|a| { - Some(a.iter().map(|o| { - o.as_object().and_then(|o| o.get("child")).unwrap() - })) - }) - .unwrap() - .collect::>(); - if !trees.contains(&&SerdeValue::String(column.to_string())) { - return Ok(true); - } - } - } - } - - match pstruct { - Expression::Function(name, args) if name == "from" => { - match &*args[0] { - Expression::Field(cfg_ftable, cfg_fcolumn) => { - if self.pool.any_kind() == AnyKind::Sqlite { - let sql = format!(r#"PRAGMA FOREIGN_KEY_LIST("{}")"#, table); - for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { - let from = row.get::("from"); - if from == column { - let db_ftable = row.get::("table"); - let db_fcolumn = row.get::("to"); - if *cfg_ftable != db_ftable || *cfg_fcolumn != db_fcolumn { - return Ok(true); - } - } - } - } else { - let sql = format!( - r#"SELECT - ccu.table_name AS foreign_table_name, - ccu.column_name AS foreign_column_name - FROM information_schema.table_constraints AS tc - JOIN information_schema.key_column_usage AS kcu - ON tc.constraint_name = kcu.constraint_name - AND tc.table_schema = kcu.table_schema - JOIN information_schema.constraint_column_usage AS ccu - ON ccu.constraint_name = tc.constraint_name - WHERE tc.constraint_type = 'FOREIGN KEY' - AND tc.table_name = '{}' - AND kcu.column_name = '{}'"#, - table, column - ); - let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; - if rows.len() == 0 { - // If the table doesn't even exist return true. - return Ok(true); - } else if rows.len() > 1 { - // This seems impossible given how PostgreSQL works: - unreachable!(); - } else { - let row = &rows[0]; - let db_ftable = row.get::("foreign_table_name"); - let db_fcolumn = row.get::("foreign_column_name"); - if *cfg_ftable != db_ftable || *cfg_fcolumn != db_fcolumn { - return Ok(true); - } - } - } - } - _ => { - return Err(ValveError::InputError( - format!("Unrecognized structure: {:?}", pstruct).into(), - )); - } - }; - } - _ => (), - }; - - Ok(false) - }; - - let (columns_config, configured_column_order) = { - let table_config = self - .config - .get("table") - .and_then(|tc| tc.get(table)) - .and_then(|t| t.as_object()) - .unwrap(); - let columns_config = table_config - .get("column") - .and_then(|c| c.as_object()) - .unwrap(); - let configured_column_order = { - let mut configured_column_order = { - if table == "message" { - vec!["message_id".to_string()] - } else if table == "history" { - vec!["history_id".to_string()] - } else { - vec!["row_number".to_string()] - } - }; - configured_column_order.append( - &mut table_config - .get("column_order") - .and_then(|c| c.as_array()) - .and_then(|a| Some(a.iter())) - .and_then(|a| Some(a.map(|c| c.as_str().unwrap().to_string()))) - .and_then(|a| Some(a.collect::>())) - .unwrap(), - ); - configured_column_order - }; - - (columns_config, configured_column_order) - }; - - let db_columns_in_order = { - if self.pool.any_kind() == AnyKind::Sqlite { - let sql = format!( - r#"SELECT 1 FROM sqlite_master WHERE "type" = 'table' AND "name" = '{}'"#, - table - ); - let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; - if rows.len() == 0 { - if self.verbose { - info!( - "The table '{}' will be recreated as it does not exist in the database", - table - ); - } - return Ok(true); - } else if rows.len() == 1 { - // Otherwise send another query to the db to get the column info: - let sql = format!(r#"PRAGMA TABLE_INFO("{}")"#, table); - let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; - rows.iter() - .map(|r| { - ( - r.get::("name"), - r.get::("type"), - r.get::("pk") as u32, - ) - }) - .collect::>() - } else { - unreachable!(); - } - } else { - let sql = format!( - r#"SELECT "column_name", "data_type" - FROM "information_schema"."columns" - WHERE "table_name" = '{}' - ORDER BY "ordinal_position""#, - table, - ); - let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; - if rows.len() == 0 { - if self.verbose { - info!( - "The table '{}' will be recreated as it does not exist in the database", - table - ); - } - return Ok(true); - } - // Otherwise we get the column name: - rows.iter() - .map(|r| { - ( - r.get::("column_name"), - r.get::("data_type"), - // The third entry is just a dummy so that the datatypes in the two - // wings of this if/else block match. - 0, - ) - }) - .collect::>() - } - }; - - // Check if the order of the configured columns matches the order of the columns in the - // database: - let db_column_order = db_columns_in_order - .iter() - .map(|c| c.0.clone()) - .collect::>(); - if db_column_order != configured_column_order { - if self.verbose { - info!( - "The table '{}' will be recreated since the database columns: {:?} \ - and/or their order does not match the configured columns: {:?}", - table, db_column_order, configured_column_order - ); - } - return Ok(true); - } - - // Check, for all tables, whether their column configuration matches the contents of the - // database: - for (cname, ctype, pk) in &db_columns_in_order { - // Do not consider these special columns: - if (table == "message" && cname == "message_id") - || (table == "message" && cname == "row") - || (table == "history" && cname == "history_id") - || (table == "history" && cname == "timestamp") - || (table == "history" && cname == "row") - || cname == "row_number" - { - continue; - } - let column_config = columns_config - .get(cname) - .and_then(|c| c.as_object()) - .unwrap(); - let sql_type = - get_sql_type_from_global_config(&self.config, table, &cname, &self.pool).unwrap(); - - // Check the column's SQL type: - if sql_type.to_lowercase() != ctype.to_lowercase() { - let s = sql_type.to_lowercase(); - let c = ctype.to_lowercase(); - // CHARACTER VARYING and VARCHAR are synonyms so we ignore this difference. - if !((s.starts_with("varchar") || s.starts_with("character varying")) - && (c.starts_with("varchar") || c.starts_with("character varying"))) - { - if self.verbose { - info!( - "The table '{}' will be recreated because the SQL type of column '{}', \ - {}, does not match the configured value: {}", - table, - cname, - ctype, - sql_type - ); - } - return Ok(true); - } - } - - // Check the column's structure: - let structure = column_config.get("structure").and_then(|d| d.as_str()); - match structure { - Some(structure) if structure != "" => { - let parsed_structure = self - .parsed_structure_conditions - .get(structure) - .and_then(|p| Some(p.parsed.clone())) - .unwrap(); - if structure_has_changed(&parsed_structure, table, &cname, &pk)? { - if self.verbose { - info!( - "The table '{}' will be recreated because the database \ - constraints for column '{}' do not match the configured \ - structure, '{}'", - table, cname, structure - ); - } - return Ok(true); - } - } - _ => (), - }; - } - - Ok(false) - } - - /// Generates and returns the DDL required to setup the database. - pub async fn get_setup_statements(&self) -> Result>, ValveError> { - let tables_config = self - .config - .get("table") - .and_then(|t| t.as_object()) - .unwrap() - .clone(); - let datatypes_config = self - .config - .get("datatype") - .and_then(|d| d.as_object()) - .unwrap() - .clone(); - - let parser = StartParser::new(); - - // Begin by reading in the TSV files corresponding to the tables defined in tables_config, - // and use that information to create the associated database tables, while saving - // constraint information to constrains_config. - let mut setup_statements = HashMap::new(); - for table_name in tables_config.keys().cloned().collect::>() { - // Generate the statements for creating the table and its corresponding conflict table: - let mut table_statements = vec![]; - for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] { - let mut statements = get_table_ddl( - &tables_config, - &datatypes_config, - &parser, - &table, - &self.pool, - ); - table_statements.append(&mut statements); - } - - let create_view_sql = get_sql_for_standard_view(&table_name, &self.pool); - let create_text_view_sql = - get_sql_for_text_view(&tables_config, &table_name, &self.pool); - table_statements.push(create_view_sql); - table_statements.push(create_text_view_sql); - - setup_statements.insert(table_name.to_string(), table_statements); - } - - let text_type = get_sql_type(&datatypes_config, &"text".to_string(), &self.pool).unwrap(); - - // Generate DDL for the history table: - let mut history_statements = vec![]; - history_statements.push(format!( - indoc! {r#" - CREATE TABLE "history" ( - {history_id} - "table" {text_type}, - "row" BIGINT, - "from" {text_type}, - "to" {text_type}, - "summary" {text_type}, - "user" {text_type}, - "undone_by" {text_type}, - {timestamp} - ); - "#}, - history_id = { - if self.pool.any_kind() == AnyKind::Sqlite { - "\"history_id\" INTEGER PRIMARY KEY," - } else { - "\"history_id\" SERIAL PRIMARY KEY," - } - }, - text_type = text_type, - timestamp = { - if self.pool.any_kind() == AnyKind::Sqlite { - "\"timestamp\" TIMESTAMP DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))" - } else { - "\"timestamp\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP" - } - }, - )); - history_statements - .push(r#"CREATE INDEX "history_tr_idx" ON "history"("table", "row");"#.to_string()); - setup_statements.insert("history".to_string(), history_statements); - - // Generate DDL for the message table: - let mut message_statements = vec![]; - message_statements.push(format!( - indoc! {r#" - CREATE TABLE "message" ( - {message_id} - "table" {text_type}, - "row" BIGINT, - "column" {text_type}, - "value" {text_type}, - "level" {text_type}, - "rule" {text_type}, - "message" {text_type} - ); - "#}, - message_id = { - if self.pool.any_kind() == AnyKind::Sqlite { - "\"message_id\" INTEGER PRIMARY KEY," - } else { - "\"message_id\" SERIAL PRIMARY KEY," - } - }, - text_type = text_type, - )); - message_statements.push( - r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(), - ); - setup_statements.insert("message".to_string(), message_statements); - - return Ok(setup_statements); - } - - /// Writes the database schema to stdout. - pub async fn dump_schema(&self) -> Result<(), ValveError> { - let setup_statements = self.get_setup_statements().await?; - for table in self.get_sorted_table_list(false) { - let table_statements = setup_statements.get(table).unwrap(); - let output = String::from(table_statements.join("\n")); - println!("{}\n", output); - } - Ok(()) - } - - /// Create all configured database tables and views if they do not already exist as configured. - pub async fn create_all_tables(&self) -> Result<&Self, ValveError> { - let setup_statements = self.get_setup_statements().await?; - let sorted_table_list = self.get_sorted_table_list(false); - for table in &sorted_table_list { - if self.table_has_changed(*table).await? { - self.drop_tables(&vec![table]).await?; - let table_statements = setup_statements.get(*table).unwrap(); - for stmt in table_statements { - self.execute_sql(stmt).await?; - } - } - } - - Ok(self) - } - - /// Checks whether the given table exists in the database. - pub async fn table_exists(&self, table: &str) -> Result { - let sql = { - if self.pool.any_kind() == AnyKind::Sqlite { - format!( - r#"SELECT 1 - FROM "sqlite_master" - WHERE "type" = 'table' AND name = '{}' - LIMIT 1"#, - table - ) - } else { - format!( - r#"SELECT 1 - FROM "information_schema"."tables" - WHERE "table_schema" = 'public' - AND "table_name" = '{}'"#, - table - ) - } - }; - let query = sqlx_query(&sql); - let rows = query.fetch_all(&self.pool).await?; - return Ok(rows.len() > 0); - } - - /// Get all the incoming (tables that depend on it) or outgoing (tables it depends on) - /// dependencies of the given table. - pub fn get_dependencies(&self, table: &str, incoming: bool) -> Vec { - let mut dependent_tables = vec![]; - if table != "message" && table != "history" { - let direct_deps = { - if incoming { - self.table_dependencies_in.get(table).unwrap().to_vec() - } else { - self.table_dependencies_out.get(table).unwrap().to_vec() - } - }; - for direct_dep in direct_deps { - let mut indirect_deps = self.get_dependencies(&direct_dep, incoming); - dependent_tables.append(&mut indirect_deps); - dependent_tables.push(direct_dep); - } - } - dependent_tables - } - - /// Given a list of tables, fill it in with any further tables that are dependent upon tables - /// in the given list. If deletion_order is true, the tables are sorted as required for - /// deleting them all sequentially, otherwise they are ordered in reverse. - pub fn add_dependencies(&self, tables: &Vec<&str>, deletion_order: bool) -> Vec { - let mut with_dups = vec![]; - for table in tables { - let dependent_tables = self.get_dependencies(table, true); - for dep_table in dependent_tables { - with_dups.push(dep_table.to_string()); - } - with_dups.push(table.to_string()); - } - // The algorithm above gives the tables in the order needed for deletion. But we want - // this function to return the creation order by default so we reverse it unless - // the deletion_order flag is set to true. - if !deletion_order { - with_dups.reverse(); - } - - // Remove the duplicates from the returned table list: - let mut tables_in_order = vec![]; - for table in with_dups.iter().unique() { - tables_in_order.push(table.to_string()); - } - tables_in_order - } - - /// Given a subset of the configured tables, return them in sorted dependency order, or in - /// reverse if `reverse` is set to true. - pub fn sort_tables( - &self, - table_subset: &Vec<&str>, - reverse: bool, - ) -> Result, ValveError> { - let full_table_list = self.get_sorted_table_list(false); - if !table_subset - .iter() - .all(|item| full_table_list.contains(item)) - { - return Err(ValveError::InputError(format!( - "[{}] contains tables that are not in the configured table list: [{}]", - table_subset.join(", "), - full_table_list.join(", ") - ))); - } - - let constraints_config = self - .config - .get("constraints") - .and_then(|c| c.as_object()) - .ok_or(ValveError::ConfigError( - "Unable to retrieve configured constraints.".into(), - ))?; - - // Filter out message and history since they are not represented in the constraints config. - // They will be added implicitly to the list returned by verify_table_deps_and_sort. - let filtered_subset = table_subset - .iter() - .filter(|m| **m != "history" && **m != "message") - .map(|s| s.to_string()) - .collect::>(); - - let (sorted_subset, _, _) = - verify_table_deps_and_sort(&filtered_subset, &constraints_config); - - // Since the result of verify_table_deps_and_sort() will include dependencies of the tables - // in its input list, we filter those out here: - let mut sorted_subset = sorted_subset - .iter() - .filter(|m| table_subset.contains(&m.as_str())) - .map(|s| s.to_string()) - .collect::>(); - - if reverse { - sorted_subset.reverse(); - } - Ok(sorted_subset) - } - - /// Returns an IndexMap, indexed by configured table, containing lists of their dependencies. - /// If incoming is true, the lists are incoming dependencies, else they are outgoing. - pub fn collect_dependencies(&self, incoming: bool) -> IndexMap> { - let tables = self.get_sorted_table_list(false); - let mut dependencies = IndexMap::new(); - for table in tables { - dependencies.insert(table.to_string(), self.get_dependencies(table, incoming)); - } - dependencies - } - - /// Drop all configured tables, in reverse dependency order. - pub async fn drop_all_tables(&self) -> Result<&Self, ValveError> { - // Drop all of the database tables in the reverse of their sorted order: - self.drop_tables(&self.get_sorted_table_list(true)).await?; - Ok(self) - } - - /// Given a vector of table names, drop those tables, in the given order. - pub async fn drop_tables(&self, tables: &Vec<&str>) -> Result<&Self, ValveError> { - let drop_list = self.add_dependencies(tables, true); - for table in &drop_list { - if *table != "message" && *table != "history" { - let sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); - self.execute_sql(&sql).await?; - let sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table); - self.execute_sql(&sql).await?; - let sql = format!(r#"DROP TABLE IF EXISTS "{}_conflict""#, table); - self.execute_sql(&sql).await?; - } - let sql = format!(r#"DROP TABLE IF EXISTS "{}""#, table); - self.execute_sql(&sql).await?; - } - - Ok(self) - } - - /// Truncate all configured tables, in reverse dependency order. - pub async fn truncate_all_tables(&self) -> Result<&Self, ValveError> { - self.truncate_tables(&self.get_sorted_table_list(true)) - .await?; - Ok(self) - } - - /// Given a vector of table names, truncate those tables, in the given order. - pub async fn truncate_tables(&self, tables: &Vec<&str>) -> Result<&Self, ValveError> { - self.create_all_tables().await?; - let truncate_list = self.add_dependencies(tables, true); - - // We must use CASCADE in the case of PostgreSQL since we cannot truncate a table, T, that - // depends on another table, T', even in the case where we have previously truncated T'. - // SQLite does not need this. However SQLite does require that the tables be truncated in - // deletion order (which means that it must be checking that T' is empty). - let truncate_sql = |table: &str| -> String { - if self.pool.any_kind() == AnyKind::Postgres { - format!(r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#, table) - } else { - format!(r#"DELETE FROM "{}""#, table) - } - }; - - for table in &truncate_list { - let sql = truncate_sql(&table); - self.execute_sql(&sql).await?; - if *table != "message" && *table != "history" { - let sql = truncate_sql(&format!("{}_conflict", table)); - self.execute_sql(&sql).await?; - } - } - - Ok(self) - } - - /// Load all configured tables in dependency order. If `validate` is false, just try to insert - /// all rows, irrespective of whether they are valid or not or will possibly trigger a db error. - pub async fn load_all_tables(&self, validate: bool) -> Result<&Self, ValveError> { - let table_list = self.get_sorted_table_list(false); - if self.verbose { - info!("Processing {} tables.", table_list.len()); - } - self.load_tables(&table_list, validate).await - } - - /// Given a vector of table names, load those tables in the given order. If `validate` is false, - /// just try to insert all rows, irrespective of whether they are valid or not or will possibly - /// trigger a db error. - pub async fn load_tables( - &self, - table_list: &Vec<&str>, - validate: bool, - ) -> Result<&Self, ValveError> { - let list_for_truncation = self.sort_tables(table_list, true)?; - self.truncate_tables( - &list_for_truncation - .iter() - .map(|i| i.as_str()) - .collect::>(), - ) - .await?; - - let num_tables = table_list.len(); - let mut total_errors = 0; - let mut total_warnings = 0; - let mut total_infos = 0; - let mut table_num = 1; - for table_name in table_list { - if *table_name == "message" || *table_name == "history" { - continue; - } - let table_name = table_name.to_string(); - let path = String::from( - self.config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|o| o.get(&table_name)) - .and_then(|n| n.get("path")) - .and_then(|p| p.as_str()) - .unwrap(), - ); - let mut rdr = { - match File::open(path.clone()) { - Err(e) => { - warn!("Unable to open '{}': {}", path.clone(), e); - continue; - } - Ok(table_file) => ReaderBuilder::new() - .has_headers(false) - .delimiter(b'\t') - .from_reader(table_file), - } - }; - if self.verbose { - info!("Loading table {}/{}: {}", table_num, num_tables, table_name); - } - table_num += 1; - - // Extract the headers, which we will need later: - let mut records = rdr.records(); - let headers; - if let Some(result) = records.next() { - headers = result.unwrap(); - } else { - panic!("'{}' is empty", path); - } - - for header in headers.iter() { - if header.trim().is_empty() { - panic!( - "One or more of the header fields is empty for table '{}'", - table_name - ); - } - } - - // HashMap used to report info about the number of error/warning/info messages for this - // table when the verbose flag is set to true: - let mut messages_stats = HashMap::new(); - messages_stats.insert("error".to_string(), 0); - messages_stats.insert("warning".to_string(), 0); - messages_stats.insert("info".to_string(), 0); - - // Split the data into chunks of size CHUNK_SIZE before passing them to the validation - // logic: - let chunks = records.chunks(CHUNK_SIZE); - insert_chunks( - &self.config, - &self.pool, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &table_name, - &chunks, - &headers, - &mut messages_stats, - self.verbose, - validate, - ) - .await?; - - if validate { - // We need to wait until all of the rows for a table have been loaded before - // validating the "foreign" constraints on a table's trees, since this checks if the - // values of one column (the tree's parent) are all contained in another column (the - // tree's child). We also need to wait before validating a table's "under" - // constraints. Although the tree associated with such a constraint need not be - // defined on the same table, it can be. - let mut recs_to_update = - validate_tree_foreign_keys(&self.config, &self.pool, None, &table_name, None) - .await?; - recs_to_update.append( - &mut validate_under(&self.config, &self.pool, None, &table_name, None).await?, - ); - - for record in recs_to_update { - let row_number = record.get("row_number").unwrap(); - let column_name = record.get("column").and_then(|s| s.as_str()).unwrap(); - let value = record.get("value").and_then(|s| s.as_str()).unwrap(); - let level = record.get("level").and_then(|s| s.as_str()).unwrap(); - let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); - let message = record.get("message").and_then(|s| s.as_str()).unwrap(); - - let sql = local_sql_syntax( - &self.pool, - &format!( - r#"INSERT INTO "message" - ("table", "row", "column", "value", "level", "rule", "message") - VALUES ({}, {}, {}, {}, {}, {}, {})"#, - SQL_PARAM, - row_number, - SQL_PARAM, - SQL_PARAM, - SQL_PARAM, - SQL_PARAM, - SQL_PARAM - ), - ); - let mut query = sqlx_query(&sql); - query = query.bind(&table_name); - query = query.bind(&column_name); - query = query.bind(&value); - query = query.bind(&level); - query = query.bind(&rule); - query = query.bind(&message); - query.execute(&self.pool).await?; - - if self.verbose { - // Add the generated message to messages_stats: - let messages = vec![json!({ - "message": message, - "level": level, - })]; - add_message_counts(&messages, &mut messages_stats); - } - } - } - - if self.verbose { - // Output a report on the messages generated to stderr: - let errors = messages_stats.get("error").unwrap(); - let warnings = messages_stats.get("warning").unwrap(); - let infos = messages_stats.get("info").unwrap(); - let status_message = format!( - "{} errors, {} warnings, and {} information messages generated for {}", - errors, warnings, infos, table_name - ); - info!("{}", status_message); - total_errors += errors; - total_warnings += warnings; - total_infos += infos; - } - } - - if self.verbose { - info!( - "Loading complete with {} errors, {} warnings, and {} information messages", - total_errors, total_warnings, total_infos - ); - } - Ok(self) - } - - /// Save all configured tables to their configured path's, unless save_dir is specified, - /// in which case save them there instead. - pub fn save_all_tables(&self, save_dir: &Option) -> Result<&Self, ValveError> { - let tables = self.get_sorted_table_list(false); - self.save_tables(&tables, save_dir)?; - Ok(self) - } - - /// Given a vector of table names, save those tables to their configured path's, unless - /// save_dir is specified, in which case save them there instead. - pub fn save_tables( - &self, - tables: &Vec<&str>, - save_dir: &Option, - ) -> Result<&Self, ValveError> { - let table_paths: HashMap = self - .config - .get("table") - .unwrap() - .as_object() - .unwrap() - .iter() - .filter(|(k, v)| { - !["message", "history"].contains(&k.as_str()) - && tables.contains(&k.as_str()) - && v.get("path").is_some() - }) - .map(|(k, v)| { - ( - k.clone(), - v.get("path").unwrap().as_str().unwrap().to_string(), - ) - }) - .collect(); - - info!( - "Saving tables: {} ...", - table_paths - .keys() - .map(|k| k.to_string()) - .collect::>() - .join(", ") - ); - for (table, path) in table_paths.iter() { - let columns: Vec<&str> = self - .config - .get("table") - .and_then(|v| v.as_object()) - .and_then(|o| o.get(table)) - .and_then(|v| v.as_object()) - .and_then(|o| o.get("column_order")) - .and_then(|v| v.as_array()) - .and_then(|v| Some(v.iter().map(|i| i.as_str().unwrap()).collect())) - .unwrap(); - - let path = match save_dir { - Some(s) => format!( - "{}/{}", - s, - Path::new(path) - .file_name() - .and_then(|n| n.to_str()) - .unwrap() - ), - None => path.to_string(), - }; - self.save_table(table, &columns, &path)?; - } - - Ok(self) - } - - /// Save the given table with the given columns at the given path as a TSV file. - pub fn save_table( - &self, - table: &str, - columns: &Vec<&str>, - path: &str, - ) -> Result<&Self, ValveError> { - // TODO: Do some validation on the path. - - let mut quoted_columns = vec!["\"row_number\"".to_string()]; - quoted_columns.append( - &mut columns - .iter() - .map(|v| enquote::enquote('"', v)) - .collect::>(), - ); - let text_view = format!("\"{}_text_view\"", table); - let sql = format!( - r#"SELECT {} from {} ORDER BY "row_number""#, - quoted_columns.join(", "), - text_view - ); - - let mut writer = WriterBuilder::new() - .delimiter(b'\t') - .quote_style(QuoteStyle::Never) - .from_path(path)?; - writer.write_record(columns)?; - let mut stream = sqlx_query(&sql).fetch(&self.pool); - while let Some(row) = block_on(stream.try_next()).unwrap() { - let mut record: Vec<&str> = vec![]; - for column in columns.iter() { - let cell = row.try_get::<&str, &str>(column).ok().unwrap_or_default(); - record.push(cell); - } - writer.write_record(record)?; - } - writer.flush()?; - - Ok(self) - } - - /// Given a table name and a row, return the validated row. - pub async fn validate_row( - &self, - table_name: &str, - row: &ValveRow, - row_number: Option, - ) -> Result { - validate_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - None, - table_name, - row, - row_number, - None, - ) - .await - } - - /// Given a table name and a row as JSON, add the row to the table in the database, and return - /// the validated row, including its new row_number. - pub async fn insert_row( - &self, - table_name: &str, - row: &ValveRow, - ) -> Result<(u32, ValveRow), ValveError> { - let mut tx = self.pool.begin().await?; - - let row = validate_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - Some(&mut tx), - table_name, - row, - None, - None, - ) - .await?; - - let rn = insert_new_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table_name, - &row, - None, - true, - ) - .await?; - - record_row_change(&mut tx, table_name, &rn, None, Some(&row), &self.user).await?; - tx.commit().await?; - Ok((rn, row)) - } - - /// Given a table name, a row number, and a row, update the row in the database, and return the - /// validated row. - pub async fn update_row( - &self, - table_name: &str, - row_number: &u32, - row: &ValveRow, - ) -> Result { - let mut tx = self.pool.begin().await?; - - // Get the old version of the row from the database so that we can later record it to the - // history table: - let old_row = - get_row_from_db(&self.config, &self.pool, &mut tx, table_name, &row_number).await?; - - let row = validate_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - Some(&mut tx), - table_name, - row, - Some(*row_number), - None, - ) - .await?; - - update_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table_name, - &row, - row_number, - true, - false, - ) - .await?; - - // Record the row update in the history table: - record_row_change( - &mut tx, - table_name, - row_number, - Some(&old_row), - Some(&row), - &self.user, - ) - .await?; - - tx.commit().await?; - Ok(row) - } - - /// Given a table name and a row number, delete that row from the table. - pub async fn delete_row(&self, table_name: &str, row_number: &u32) -> Result<(), ValveError> { - let mut tx = self.pool.begin().await?; - - let row = - get_row_from_db(&self.config, &self.pool, &mut tx, &table_name, row_number).await?; - - record_row_change( - &mut tx, - &table_name, - row_number, - Some(&row), - None, - &self.user, - ) - .await?; - - delete_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table_name, - row_number, - ) - .await?; - - tx.commit().await?; - Ok(()) - } - - /// Return the next change that can be undone, or None if there isn't any. - pub async fn get_record_to_undo(&self) -> Result, ValveError> { - // Look in the history table, get the row with the greatest ID, get the row number, - // from, and to, and determine whether the last operation was a delete, insert, or update. - let is_clause = if self.pool.any_kind() == AnyKind::Sqlite { - "IS" - } else { - "IS NOT DISTINCT FROM" - }; - let sql = format!( - r#"SELECT * FROM "history" - WHERE "undone_by" {} NULL - ORDER BY "history_id" DESC LIMIT 1"#, - is_clause - ); - let query = sqlx_query(&sql); - let result_row = query.fetch_optional(&self.pool).await?; - Ok(result_row) - } - - /// Return the next change that can be redone, or None if there isn't any. - pub async fn get_record_to_redo(&self) -> Result, ValveError> { - // Look in the history table, get the row with the greatest ID, get the row number, - // from, and to, and determine whether the last operation was a delete, insert, or update. - let is_not_clause = if self.pool.any_kind() == AnyKind::Sqlite { - "IS NOT" - } else { - "IS DISTINCT FROM" - }; - let sql = format!( - r#"SELECT * FROM "history" - WHERE "undone_by" {} NULL - ORDER BY "timestamp" DESC LIMIT 1"#, - is_not_clause - ); - let query = sqlx_query(&sql); - let result_row = query.fetch_optional(&self.pool).await?; - Ok(result_row) - } - - /// Undo one change and return the change record or None if there was no change to undo. - pub async fn undo(&self) -> Result, ValveError> { - let last_change = match self.get_record_to_undo().await? { - None => { - warn!("Nothing to undo."); - return Ok(None); - } - Some(r) => r, - }; - let history_id: i32 = last_change.get("history_id"); - let history_id = history_id as u16; - let table: &str = last_change.get("table"); - let row_number: i64 = last_change.get("row"); - let row_number = row_number as u32; - let from = get_json_from_row(&last_change, "from"); - let to = get_json_from_row(&last_change, "to"); - - match (from, to) { - (None, None) => { - return Err(ValveError::DataError( - "Cannot redo unknown operation from None to None".into(), - )) - } - (None, Some(_)) => { - // Undo an insert: - let mut tx = self.pool.begin().await?; - - delete_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &row_number, - ) - .await?; - - switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(None) - } - (Some(from), None) => { - // Undo a delete: - let mut tx = self.pool.begin().await?; - - insert_new_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &from, - Some(row_number), - false, - ) - .await?; - - switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(Some(from)) - } - (Some(from), Some(_)) => { - // Undo an an update: - let mut tx = self.pool.begin().await?; - - update_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &from, - &row_number, - false, - false, - ) - .await?; - - switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(Some(from)) - } - } - } - - /// Redo one change and return the change record or None if there was no change to redo. - pub async fn redo(&self) -> Result, ValveError> { - let last_undo = match self.get_record_to_redo().await? { - None => { - warn!("Nothing to redo."); - return Ok(None); - } - Some(last_undo) => { - let undone_by = last_undo.try_get_raw("undone_by")?; - if undone_by.is_null() { - warn!("Nothing to redo."); - return Ok(None); - } - last_undo - } - }; - let history_id: i32 = last_undo.get("history_id"); - let history_id = history_id as u16; - let table: &str = last_undo.get("table"); - let row_number: i64 = last_undo.get("row"); - let row_number = row_number as u32; - let from = get_json_from_row(&last_undo, "from"); - let to = get_json_from_row(&last_undo, "to"); - - match (from, to) { - (None, None) => { - return Err(ValveError::DataError( - "Cannot redo unknown operation from None to None".into(), - )) - } - (None, Some(to)) => { - // Redo an insert: - let mut tx = self.pool.begin().await?; - - insert_new_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &to, - Some(row_number), - false, - ) - .await?; - - switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(Some(to)) - } - (Some(_), None) => { - // Redo a delete: - let mut tx = self.pool.begin().await?; - - delete_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &row_number, - ) - .await?; - - switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(None) - } - (Some(_), Some(to)) => { - // Redo an an update: - let mut tx = self.pool.begin().await?; - - update_row_tx( - &self.config, - &self.compiled_datatype_conditions, - &self.compiled_rule_conditions, - &self.pool, - &mut tx, - table, - &to, - &row_number, - false, - false, - ) - .await?; - - switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; - tx.commit().await?; - Ok(Some(to)) - } - } - } - - /// Given a table name, a column name, and (optionally) a string to match, return a JSON array - /// of possible valid values for the given column which contain the matching string as a - /// substring (or all of them if no matching string is given). The JSON array returned is - /// formatted for Typeahead, i.e., it takes the form: - /// `[{"id": id, "label": label, "order": order}, ...]`. - pub async fn get_matching_values( - &self, - table_name: &str, - column_name: &str, - matching_string: Option<&str>, - ) -> Result { - let config = &self.config; - let compiled_datatype_conditions = &self.compiled_datatype_conditions; - let parsed_structure_conditions = &self.parsed_structure_conditions; - let pool = &self.pool; - let dt_name = config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table_name)) - .and_then(|t| t.as_object()) - .and_then(|t| t.get("column")) - .and_then(|c| c.as_object()) - .and_then(|c| c.get(column_name)) - .and_then(|c| c.as_object()) - .and_then(|c| c.get("datatype")) - .and_then(|d| d.as_str()) - .unwrap(); - - let dt_condition = compiled_datatype_conditions - .get(dt_name) - .and_then(|d| Some(d.parsed.clone())); - - let mut values = vec![]; - match dt_condition { - Some(Expression::Function(name, args)) if name == "in" => { - for arg in args { - if let Expression::Label(arg) = *arg { - // Remove the enclosing quotes from the values being returned: - let label = unquote(&arg).unwrap_or_else(|_| arg); - if let Some(s) = matching_string { - if label.contains(s) { - values.push(label); - } - } - } - } - } - _ => { - // If the datatype for the column does not correspond to an `in(...)` function, then - // we check the column's structure constraints. If they include a - // `from(foreign_table.foreign_column)` condition, then the values are taken from - // the foreign column. Otherwise if the structure includes an - // `under(tree_table.tree_column, value)` condition, then get the values from the - // tree column that are under `value`. - let structure = parsed_structure_conditions.get( - config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table_name)) - .and_then(|t| t.as_object()) - .and_then(|t| t.get("column")) - .and_then(|c| c.as_object()) - .and_then(|c| c.get(column_name)) - .and_then(|c| c.as_object()) - .and_then(|c| c.get("structure")) - .and_then(|d| d.as_str()) - .unwrap_or_else(|| ""), - ); - - let sql_type = - get_sql_type_from_global_config(&config, table_name, &column_name, &pool) - .unwrap(); - - match structure { - Some(ParsedStructure { original, parsed }) => { - let matching_string = { - match matching_string { - None => "%".to_string(), - Some(s) => format!("%{}%", s), - } - }; - - match parsed { - Expression::Function(name, args) if name == "from" => { - let foreign_key = &args[0]; - if let Expression::Field(ftable, fcolumn) = &**foreign_key { - let fcolumn_text = cast_column_sql_to_text(&fcolumn, &sql_type); - let sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT "{}" FROM "{}" WHERE {} LIKE {}"#, - fcolumn, ftable, fcolumn_text, SQL_PARAM - ), - ); - let rows = sqlx_query(&sql) - .bind(&matching_string) - .fetch_all(pool) - .await?; - for row in rows.iter() { - values.push(get_column_value(&row, &fcolumn, &sql_type)); - } - } - } - Expression::Function(name, args) - if name == "under" || name == "tree" => - { - let mut tree_col = "not set"; - let mut under_val = Some("not set".to_string()); - if name == "under" { - if let Expression::Field(_, column) = &**&args[0] { - tree_col = column; - } - if let Expression::Label(label) = &**&args[1] { - under_val = Some(label.to_string()); - } - } else { - let tree_key = &args[0]; - if let Expression::Label(label) = &**tree_key { - tree_col = label; - under_val = None; - } - } - - let tree = config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|c| c.get("tree")) - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table_name)) - .and_then(|t| t.as_array()) - .and_then(|t| { - t.iter().find(|o| o.get("child").unwrap() == tree_col) - }) - .expect( - format!("No tree: '{}.{}' found", table_name, tree_col) - .as_str(), - ) - .as_object() - .unwrap(); - let child_column = - tree.get("child").and_then(|c| c.as_str()).unwrap(); - - let (tree_sql, mut params) = with_tree_sql( - &config, - tree, - &table_name.to_string(), - &table_name.to_string(), - under_val.as_ref(), - None, - &pool, - ); - let child_column_text = - cast_column_sql_to_text(&child_column, &sql_type); - let sql = local_sql_syntax( - &pool, - &format!( - r#"{} SELECT "{}" FROM "tree" WHERE {} LIKE {}"#, - tree_sql, child_column, child_column_text, SQL_PARAM - ), - ); - params.push(matching_string); - - let mut query = sqlx_query(&sql); - for param in ¶ms { - query = query.bind(param); - } - - let rows = query.fetch_all(pool).await?; - for row in rows.iter() { - values.push(get_column_value(&row, &child_column, &sql_type)); - } - } - _ => panic!("Unrecognised structure: {}", original), - }; - } - None => (), - }; - } - }; - - let mut typeahead_values = vec![]; - for (i, v) in values.iter().enumerate() { - // enumerate() begins at 0 but we need to begin at 1: - let i = i + 1; - typeahead_values.push(json!({ - "id": v, - "label": v, - "order": i, - })); - } - - Ok(json!(typeahead_values)) - } -} - /// Given a string representing the location of a database, return a database connection pool. pub async fn get_pool_from_connection_string(database: &str) -> Result { let connection_options; diff --git a/src/main.rs b/src/main.rs index 569938bd..413b42f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,8 @@ mod api_test; use crate::api_test::run_api_tests; - use argparse::{ArgumentParser, Store, StoreTrue}; - -use ontodev_valve::{Valve, ValveError}; +use ontodev_valve::{valve::Valve, valve::ValveError}; use serde_json::{from_str, Value as SerdeValue}; use std::{env, process}; diff --git a/src/validate.rs b/src/validate.rs index a8aeae03..26c70f53 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -1,15 +1,15 @@ +use crate::{ + cast_sql_param_from_text, error, get_column_value, get_sql_type_from_global_config, + is_sql_type_error, local_sql_syntax, + valve::{ValveError, ValveRow}, + ColumnRule, CompiledCondition, SerdeMap, +}; use chrono::Utc; use indexmap::IndexMap; use serde_json::{json, Value as SerdeValue}; use sqlx::{any::AnyPool, query as sqlx_query, Acquire, Row, Transaction, ValueRef}; use std::collections::HashMap; -use crate::{ - cast_sql_param_from_text, error, get_column_value, get_sql_type_from_global_config, - is_sql_type_error, local_sql_syntax, ColumnRule, CompiledCondition, SerdeMap, ValveError, - ValveRow, -}; - /// Represents a particular cell in a particular row of data with vaildation results. #[derive(Clone, Debug)] pub struct ResultCell { diff --git a/src/valve.rs b/src/valve.rs new file mode 100644 index 00000000..9866390a --- /dev/null +++ b/src/valve.rs @@ -0,0 +1,1828 @@ +use crate::{ + add_message_counts, + ast::Expression, + cast_column_sql_to_text, delete_row_tx, get_column_value, get_compiled_datatype_conditions, + get_compiled_rule_conditions, get_json_from_row, get_parsed_structure_conditions, + get_pool_from_connection_string, get_row_from_db, get_sql_for_standard_view, + get_sql_for_text_view, get_sql_type, get_sql_type_from_global_config, get_table_ddl, info, + insert_chunks, insert_new_row_tx, local_sql_syntax, read_config_files, record_row_change, + switch_undone_state, update_row_tx, + validate::{validate_row_tx, validate_tree_foreign_keys, validate_under, with_tree_sql}, + valve_grammar::StartParser, + verify_table_deps_and_sort, warn, ColumnRule, CompiledCondition, ParsedStructure, SerdeMap, + CHUNK_SIZE, SQL_PARAM, +}; +use chrono::Utc; +use csv::{QuoteStyle, ReaderBuilder, WriterBuilder}; +use enquote::unquote; +use futures::{executor::block_on, TryStreamExt}; +use indexmap::IndexMap; +use indoc::indoc; +use itertools::Itertools; +use regex::Regex; +use serde_json::{json, Value as SerdeValue}; +use sqlx::{ + any::{AnyKind, AnyPool, AnyRow}, + query as sqlx_query, Row, ValueRef, +}; +use std::{collections::HashMap, fs::File, path::Path}; + +/// Alias for [serde_json::Map](..//serde_json/struct.Map.html). +// Note: serde_json::Map is +// [backed by a BTreeMap by default](https://docs.serde.rs/serde_json/map/index.html) +pub type ValveRow = serde_json::Map; + +/// Main entrypoint for the Valve API. +#[derive(Clone, Debug)] +pub struct Valve { + /// The valve configuration map. + pub config: SerdeMap, + /// Pre-compiled datatype conditions. + pub compiled_datatype_conditions: HashMap, + /// Pre-compiled rule conditions. + pub compiled_rule_conditions: HashMap>>, + /// Parsed structure conditions: + pub parsed_structure_conditions: HashMap, + /// Lists of tables that depend on a given table, indexed by table. + pub table_dependencies_in: HashMap>, + /// Lists of tables that a given table depends on, indexed by table. + pub table_dependencies_out: HashMap>, + /// The database connection pool. + pub pool: AnyPool, + /// The user associated with this valve instance. + pub user: String, + /// Produce more logging output. + pub verbose: bool, +} + +#[derive(Debug)] +pub enum ValveError { + /// An error in the Valve configuration: + ConfigError(String), + /// An error that occurred while reading or writing to a CSV/TSV: + CsvError(csv::Error), + /// An error involving the data: + DataError(String), + /// An error generated by the underlying database: + DatabaseError(sqlx::Error), + /// An error in the inputs to a function: + InputError(String), + /// An error that occurred while reading/writing to stdio: + IOError(std::io::Error), + /// An error that occurred while serialising or deserialising to/from JSON: + SerdeJsonError(serde_json::Error), +} + +impl From for ValveError { + fn from(e: csv::Error) -> Self { + Self::CsvError(e) + } +} + +impl From for ValveError { + fn from(e: sqlx::Error) -> Self { + Self::DatabaseError(e) + } +} + +impl From for ValveError { + fn from(e: serde_json::Error) -> Self { + Self::SerdeJsonError(e) + } +} + +impl From for ValveError { + fn from(e: std::io::Error) -> Self { + Self::IOError(e) + } +} + +impl Valve { + /// Given a path to a table table, a path to a database, a flag for verbose output, and a flag + /// indicating whether the database should be configured for initial loading: Set up a database + /// connection, configure VALVE, and return a new Valve struct. + pub async fn build( + table_path: &str, + database: &str, + verbose: bool, + initial_load: bool, + ) -> Result { + let pool = get_pool_from_connection_string(database).await?; + if pool.any_kind() == AnyKind::Sqlite { + sqlx_query("PRAGMA foreign_keys = ON") + .execute(&pool) + .await?; + if initial_load { + // These pragmas are unsafe but they are used during initial loading since data + // integrity is not a priority in this case. + sqlx_query("PRAGMA journal_mode = OFF") + .execute(&pool) + .await?; + sqlx_query("PRAGMA synchronous = 0").execute(&pool).await?; + sqlx_query("PRAGMA cache_size = 1000000") + .execute(&pool) + .await?; + sqlx_query("PRAGMA temp_store = MEMORY") + .execute(&pool) + .await?; + } + } + + let parser = StartParser::new(); + let ( + specials_config, + tables_config, + datatypes_config, + rules_config, + constraints_config, + sorted_table_list, + table_dependencies_in, + table_dependencies_out, + ) = read_config_files(table_path, &parser, &pool); + + let mut config = SerdeMap::new(); + config.insert( + String::from("special"), + SerdeValue::Object(specials_config.clone()), + ); + config.insert( + String::from("table"), + SerdeValue::Object(tables_config.clone()), + ); + config.insert( + String::from("datatype"), + SerdeValue::Object(datatypes_config.clone()), + ); + config.insert( + String::from("rule"), + SerdeValue::Object(rules_config.clone()), + ); + config.insert( + String::from("constraints"), + SerdeValue::Object(constraints_config.clone()), + ); + let mut sorted_table_serdevalue_list: Vec = vec![]; + for table in &sorted_table_list { + sorted_table_serdevalue_list.push(SerdeValue::String(table.to_string())); + } + config.insert( + String::from("sorted_table_list"), + SerdeValue::Array(sorted_table_serdevalue_list), + ); + + let compiled_datatype_conditions = get_compiled_datatype_conditions(&config, &parser); + let compiled_rule_conditions = + get_compiled_rule_conditions(&config, compiled_datatype_conditions.clone(), &parser); + let parsed_structure_conditions = get_parsed_structure_conditions(&config, &parser); + + Ok(Self { + config: config, + compiled_datatype_conditions: compiled_datatype_conditions, + compiled_rule_conditions: compiled_rule_conditions, + parsed_structure_conditions: parsed_structure_conditions, + table_dependencies_in: table_dependencies_in, + table_dependencies_out: table_dependencies_out, + pool: pool, + user: String::from("VALVE"), + verbose: verbose, + }) + } + + /// Convenience function to retrieve the path to Valve's "table table", the main entrypoint + /// to Valve's configuration. + pub fn get_path(&self) -> String { + self.config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|t| t.get("table")) + .and_then(|t| t.as_object()) + .and_then(|t| t.get("path")) + .and_then(|p| p.as_str()) + .unwrap() + .to_string() + } + + /// Controls the maximum length of a username. + const USERNAME_MAX_LEN: usize = 20; + + /// Sets the user name, which must be a short, trimmed, string without newlines, for this Valve + /// instance. + pub fn set_user(&mut self, user: &str) -> Result<&mut Self, ValveError> { + if user.len() > Self::USERNAME_MAX_LEN { + return Err(ValveError::ConfigError(format!( + "Username '{}' is longer than {} characters.", + user, + Self::USERNAME_MAX_LEN + ))); + } else { + let user_regex = Regex::new(r#"^\S([^\n]*\S)*$"#).unwrap(); + if !user_regex.is_match(user) { + return Err(ValveError::ConfigError(format!( + "Username '{}' is not a short, trimmed, string without newlines.", + user, + ))); + } + } + self.user = user.to_string(); + Ok(self) + } + + /// Given a SQL string, execute it using the connection pool associated with the Valve instance. + async fn execute_sql(&self, sql: &str) -> Result<(), ValveError> { + sqlx_query(&sql).execute(&self.pool).await?; + Ok(()) + } + + /// Return the list of configured tables in sorted order, or reverse sorted order if the + /// reverse flag is set. + pub fn get_sorted_table_list(&self, reverse: bool) -> Vec<&str> { + let mut sorted_tables = self + .config + .get("sorted_table_list") + .and_then(|l| l.as_array()) + .and_then(|l| Some(l.iter().map(|i| i.as_str().unwrap()))) + .and_then(|l| Some(l.collect::>())) + .unwrap(); + if reverse { + sorted_tables.reverse(); + } + sorted_tables + } + + /// Given the name of a table, determine whether its current instantiation in the database + /// differs from the way it has been configured. The answer to this question is yes whenever + /// (1) the number of columns or any of their names differs from their configured values, or + /// the order of database columns differs from the configured order; (2) The SQL type of one or + /// more columns does not match the configured SQL type for that column; (3) Some column with a + /// 'unique', 'primary', or 'from(table, column)' in its column configuration fails to be + /// associated, in the database, with a unique constraint, primary key, or foreign key, + /// respectively; or vice versa; (4) The table does not exist in the database. + pub async fn table_has_changed(&self, table: &str) -> Result { + // A clojure that, given a parsed structure condition, a table and column name, and an + // unsigned integer representing whether the given column, in the case of a SQLite database, + // is a primary key (in the case of PostgreSQL, the sqlite_pk parameter is ignored): + // determine whether the structure of the column is properly reflected in the db. E.g., a + // `from(table.column)` struct should be associated with a foreign key, `primary` with a + // primary key, `unique` with a unique constraint. + let structure_has_changed = |pstruct: &Expression, + table: &str, + column: &str, + sqlite_pk: &u32| + -> Result { + // A clojure to determine whether the given column has the given constraint type, which + // can be one of 'UNIQUE', 'PRIMARY KEY', 'FOREIGN KEY': + let column_has_constraint_type = |constraint_type: &str| -> Result { + if self.pool.any_kind() == AnyKind::Postgres { + let sql = format!( + r#"SELECT 1 + FROM information_schema.table_constraints tco + JOIN information_schema.key_column_usage kcu + ON kcu.constraint_name = tco.constraint_name + AND kcu.constraint_schema = tco.constraint_schema + AND kcu.table_name = '{}' + WHERE tco.constraint_type = '{}' + AND kcu.column_name = '{}'"#, + table, constraint_type, column + ); + let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; + if rows.len() > 1 { + unreachable!(); + } + Ok(rows.len() == 1) + } else { + if constraint_type == "PRIMARY KEY" { + return Ok(*sqlite_pk == 1); + } else if constraint_type == "UNIQUE" { + let sql = format!(r#"PRAGMA INDEX_LIST("{}")"#, table); + for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { + let idx_name = row.get::("name"); + let unique = row.get::("unique") as u8; + if unique == 1 { + let sql = format!(r#"PRAGMA INDEX_INFO("{}")"#, idx_name); + let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; + if rows.len() == 1 { + let cname = rows[0].get::("name"); + if cname == column { + return Ok(true); + } + } + } + } + Ok(false) + } else if constraint_type == "FOREIGN KEY" { + let sql = format!(r#"PRAGMA FOREIGN_KEY_LIST("{}")"#, table); + for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { + let cname = row.get::("from"); + if cname == column { + return Ok(true); + } + } + Ok(false) + } else { + return Err(ValveError::InputError( + format!("Unrecognized constraint type: '{}'", constraint_type).into(), + )); + } + } + }; + + // Check if there is a change to whether this column is a primary/unique key: + let is_primary = match pstruct { + Expression::Label(label) if label == "primary" => true, + _ => false, + }; + if is_primary != column_has_constraint_type("PRIMARY KEY")? { + return Ok(true); + } else if !is_primary { + let is_unique = match pstruct { + Expression::Label(label) if label == "unique" => true, + _ => false, + }; + let unique_in_db = column_has_constraint_type("UNIQUE")?; + if is_unique != unique_in_db { + // A child of a tree constraint implies a unique db constraint, so if there is a + // unique constraint in the db that is not configured, that is the explanation, + // and in that case we do not count this as a change to the column. + if !unique_in_db { + return Ok(true); + } else { + let trees = + self.config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("tree")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table)) + .and_then(|t| t.as_array()) + .and_then(|a| { + Some(a.iter().map(|o| { + o.as_object().and_then(|o| o.get("child")).unwrap() + })) + }) + .unwrap() + .collect::>(); + if !trees.contains(&&SerdeValue::String(column.to_string())) { + return Ok(true); + } + } + } + } + + match pstruct { + Expression::Function(name, args) if name == "from" => { + match &*args[0] { + Expression::Field(cfg_ftable, cfg_fcolumn) => { + if self.pool.any_kind() == AnyKind::Sqlite { + let sql = format!(r#"PRAGMA FOREIGN_KEY_LIST("{}")"#, table); + for row in block_on(sqlx_query(&sql).fetch_all(&self.pool))? { + let from = row.get::("from"); + if from == column { + let db_ftable = row.get::("table"); + let db_fcolumn = row.get::("to"); + if *cfg_ftable != db_ftable || *cfg_fcolumn != db_fcolumn { + return Ok(true); + } + } + } + } else { + let sql = format!( + r#"SELECT + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + WHERE tc.constraint_type = 'FOREIGN KEY' + AND tc.table_name = '{}' + AND kcu.column_name = '{}'"#, + table, column + ); + let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; + if rows.len() == 0 { + // If the table doesn't even exist return true. + return Ok(true); + } else if rows.len() > 1 { + // This seems impossible given how PostgreSQL works: + unreachable!(); + } else { + let row = &rows[0]; + let db_ftable = row.get::("foreign_table_name"); + let db_fcolumn = row.get::("foreign_column_name"); + if *cfg_ftable != db_ftable || *cfg_fcolumn != db_fcolumn { + return Ok(true); + } + } + } + } + _ => { + return Err(ValveError::InputError( + format!("Unrecognized structure: {:?}", pstruct).into(), + )); + } + }; + } + _ => (), + }; + + Ok(false) + }; + + let (columns_config, configured_column_order) = { + let table_config = self + .config + .get("table") + .and_then(|tc| tc.get(table)) + .and_then(|t| t.as_object()) + .unwrap(); + let columns_config = table_config + .get("column") + .and_then(|c| c.as_object()) + .unwrap(); + let configured_column_order = { + let mut configured_column_order = { + if table == "message" { + vec!["message_id".to_string()] + } else if table == "history" { + vec!["history_id".to_string()] + } else { + vec!["row_number".to_string()] + } + }; + configured_column_order.append( + &mut table_config + .get("column_order") + .and_then(|c| c.as_array()) + .and_then(|a| Some(a.iter())) + .and_then(|a| Some(a.map(|c| c.as_str().unwrap().to_string()))) + .and_then(|a| Some(a.collect::>())) + .unwrap(), + ); + configured_column_order + }; + + (columns_config, configured_column_order) + }; + + let db_columns_in_order = { + if self.pool.any_kind() == AnyKind::Sqlite { + let sql = format!( + r#"SELECT 1 FROM sqlite_master WHERE "type" = 'table' AND "name" = '{}'"#, + table + ); + let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; + if rows.len() == 0 { + if self.verbose { + info!( + "The table '{}' will be recreated as it does not exist in the database", + table + ); + } + return Ok(true); + } else if rows.len() == 1 { + // Otherwise send another query to the db to get the column info: + let sql = format!(r#"PRAGMA TABLE_INFO("{}")"#, table); + let rows = block_on(sqlx_query(&sql).fetch_all(&self.pool))?; + rows.iter() + .map(|r| { + ( + r.get::("name"), + r.get::("type"), + r.get::("pk") as u32, + ) + }) + .collect::>() + } else { + unreachable!(); + } + } else { + let sql = format!( + r#"SELECT "column_name", "data_type" + FROM "information_schema"."columns" + WHERE "table_name" = '{}' + ORDER BY "ordinal_position""#, + table, + ); + let rows = sqlx_query(&sql).fetch_all(&self.pool).await?; + if rows.len() == 0 { + if self.verbose { + info!( + "The table '{}' will be recreated as it does not exist in the database", + table + ); + } + return Ok(true); + } + // Otherwise we get the column name: + rows.iter() + .map(|r| { + ( + r.get::("column_name"), + r.get::("data_type"), + // The third entry is just a dummy so that the datatypes in the two + // wings of this if/else block match. + 0, + ) + }) + .collect::>() + } + }; + + // Check if the order of the configured columns matches the order of the columns in the + // database: + let db_column_order = db_columns_in_order + .iter() + .map(|c| c.0.clone()) + .collect::>(); + if db_column_order != configured_column_order { + if self.verbose { + info!( + "The table '{}' will be recreated since the database columns: {:?} \ + and/or their order does not match the configured columns: {:?}", + table, db_column_order, configured_column_order + ); + } + return Ok(true); + } + + // Check, for all tables, whether their column configuration matches the contents of the + // database: + for (cname, ctype, pk) in &db_columns_in_order { + // Do not consider these special columns: + if (table == "message" && cname == "message_id") + || (table == "message" && cname == "row") + || (table == "history" && cname == "history_id") + || (table == "history" && cname == "timestamp") + || (table == "history" && cname == "row") + || cname == "row_number" + { + continue; + } + let column_config = columns_config + .get(cname) + .and_then(|c| c.as_object()) + .unwrap(); + let sql_type = + get_sql_type_from_global_config(&self.config, table, &cname, &self.pool).unwrap(); + + // Check the column's SQL type: + if sql_type.to_lowercase() != ctype.to_lowercase() { + let s = sql_type.to_lowercase(); + let c = ctype.to_lowercase(); + // CHARACTER VARYING and VARCHAR are synonyms so we ignore this difference. + if !((s.starts_with("varchar") || s.starts_with("character varying")) + && (c.starts_with("varchar") || c.starts_with("character varying"))) + { + if self.verbose { + info!( + "The table '{}' will be recreated because the SQL type of column '{}', \ + {}, does not match the configured value: {}", + table, + cname, + ctype, + sql_type + ); + } + return Ok(true); + } + } + + // Check the column's structure: + let structure = column_config.get("structure").and_then(|d| d.as_str()); + match structure { + Some(structure) if structure != "" => { + let parsed_structure = self + .parsed_structure_conditions + .get(structure) + .and_then(|p| Some(p.parsed.clone())) + .unwrap(); + if structure_has_changed(&parsed_structure, table, &cname, &pk)? { + if self.verbose { + info!( + "The table '{}' will be recreated because the database \ + constraints for column '{}' do not match the configured \ + structure, '{}'", + table, cname, structure + ); + } + return Ok(true); + } + } + _ => (), + }; + } + + Ok(false) + } + + /// Generates and returns the DDL required to setup the database. + pub async fn get_setup_statements(&self) -> Result>, ValveError> { + let tables_config = self + .config + .get("table") + .and_then(|t| t.as_object()) + .unwrap() + .clone(); + let datatypes_config = self + .config + .get("datatype") + .and_then(|d| d.as_object()) + .unwrap() + .clone(); + + let parser = StartParser::new(); + + // Begin by reading in the TSV files corresponding to the tables defined in tables_config, + // and use that information to create the associated database tables, while saving + // constraint information to constrains_config. + let mut setup_statements = HashMap::new(); + for table_name in tables_config.keys().cloned().collect::>() { + // Generate the statements for creating the table and its corresponding conflict table: + let mut table_statements = vec![]; + for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] { + let mut statements = get_table_ddl( + &tables_config, + &datatypes_config, + &parser, + &table, + &self.pool, + ); + table_statements.append(&mut statements); + } + + let create_view_sql = get_sql_for_standard_view(&table_name, &self.pool); + let create_text_view_sql = + get_sql_for_text_view(&tables_config, &table_name, &self.pool); + table_statements.push(create_view_sql); + table_statements.push(create_text_view_sql); + + setup_statements.insert(table_name.to_string(), table_statements); + } + + let text_type = get_sql_type(&datatypes_config, &"text".to_string(), &self.pool).unwrap(); + + // Generate DDL for the history table: + let mut history_statements = vec![]; + history_statements.push(format!( + indoc! {r#" + CREATE TABLE "history" ( + {history_id} + "table" {text_type}, + "row" BIGINT, + "from" {text_type}, + "to" {text_type}, + "summary" {text_type}, + "user" {text_type}, + "undone_by" {text_type}, + {timestamp} + ); + "#}, + history_id = { + if self.pool.any_kind() == AnyKind::Sqlite { + "\"history_id\" INTEGER PRIMARY KEY," + } else { + "\"history_id\" SERIAL PRIMARY KEY," + } + }, + text_type = text_type, + timestamp = { + if self.pool.any_kind() == AnyKind::Sqlite { + "\"timestamp\" TIMESTAMP DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))" + } else { + "\"timestamp\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP" + } + }, + )); + history_statements + .push(r#"CREATE INDEX "history_tr_idx" ON "history"("table", "row");"#.to_string()); + setup_statements.insert("history".to_string(), history_statements); + + // Generate DDL for the message table: + let mut message_statements = vec![]; + message_statements.push(format!( + indoc! {r#" + CREATE TABLE "message" ( + {message_id} + "table" {text_type}, + "row" BIGINT, + "column" {text_type}, + "value" {text_type}, + "level" {text_type}, + "rule" {text_type}, + "message" {text_type} + ); + "#}, + message_id = { + if self.pool.any_kind() == AnyKind::Sqlite { + "\"message_id\" INTEGER PRIMARY KEY," + } else { + "\"message_id\" SERIAL PRIMARY KEY," + } + }, + text_type = text_type, + )); + message_statements.push( + r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(), + ); + setup_statements.insert("message".to_string(), message_statements); + + return Ok(setup_statements); + } + + /// Writes the database schema to stdout. + pub async fn dump_schema(&self) -> Result<(), ValveError> { + let setup_statements = self.get_setup_statements().await?; + for table in self.get_sorted_table_list(false) { + let table_statements = setup_statements.get(table).unwrap(); + let output = String::from(table_statements.join("\n")); + println!("{}\n", output); + } + Ok(()) + } + + /// Create all configured database tables and views if they do not already exist as configured. + pub async fn create_all_tables(&self) -> Result<&Self, ValveError> { + let setup_statements = self.get_setup_statements().await?; + let sorted_table_list = self.get_sorted_table_list(false); + for table in &sorted_table_list { + if self.table_has_changed(*table).await? { + self.drop_tables(&vec![table]).await?; + let table_statements = setup_statements.get(*table).unwrap(); + for stmt in table_statements { + self.execute_sql(stmt).await?; + } + } + } + + Ok(self) + } + + /// Checks whether the given table exists in the database. + pub async fn table_exists(&self, table: &str) -> Result { + let sql = { + if self.pool.any_kind() == AnyKind::Sqlite { + format!( + r#"SELECT 1 + FROM "sqlite_master" + WHERE "type" = 'table' AND name = '{}' + LIMIT 1"#, + table + ) + } else { + format!( + r#"SELECT 1 + FROM "information_schema"."tables" + WHERE "table_schema" = 'public' + AND "table_name" = '{}'"#, + table + ) + } + }; + let query = sqlx_query(&sql); + let rows = query.fetch_all(&self.pool).await?; + return Ok(rows.len() > 0); + } + + /// Get all the incoming (tables that depend on it) or outgoing (tables it depends on) + /// dependencies of the given table. + pub fn get_dependencies(&self, table: &str, incoming: bool) -> Vec { + let mut dependent_tables = vec![]; + if table != "message" && table != "history" { + let direct_deps = { + if incoming { + self.table_dependencies_in.get(table).unwrap().to_vec() + } else { + self.table_dependencies_out.get(table).unwrap().to_vec() + } + }; + for direct_dep in direct_deps { + let mut indirect_deps = self.get_dependencies(&direct_dep, incoming); + dependent_tables.append(&mut indirect_deps); + dependent_tables.push(direct_dep); + } + } + dependent_tables + } + + /// Given a list of tables, fill it in with any further tables that are dependent upon tables + /// in the given list. If deletion_order is true, the tables are sorted as required for + /// deleting them all sequentially, otherwise they are ordered in reverse. + pub fn add_dependencies(&self, tables: &Vec<&str>, deletion_order: bool) -> Vec { + let mut with_dups = vec![]; + for table in tables { + let dependent_tables = self.get_dependencies(table, true); + for dep_table in dependent_tables { + with_dups.push(dep_table.to_string()); + } + with_dups.push(table.to_string()); + } + // The algorithm above gives the tables in the order needed for deletion. But we want + // this function to return the creation order by default so we reverse it unless + // the deletion_order flag is set to true. + if !deletion_order { + with_dups.reverse(); + } + + // Remove the duplicates from the returned table list: + let mut tables_in_order = vec![]; + for table in with_dups.iter().unique() { + tables_in_order.push(table.to_string()); + } + tables_in_order + } + + /// Given a subset of the configured tables, return them in sorted dependency order, or in + /// reverse if `reverse` is set to true. + pub fn sort_tables( + &self, + table_subset: &Vec<&str>, + reverse: bool, + ) -> Result, ValveError> { + let full_table_list = self.get_sorted_table_list(false); + if !table_subset + .iter() + .all(|item| full_table_list.contains(item)) + { + return Err(ValveError::InputError(format!( + "[{}] contains tables that are not in the configured table list: [{}]", + table_subset.join(", "), + full_table_list.join(", ") + ))); + } + + let constraints_config = self + .config + .get("constraints") + .and_then(|c| c.as_object()) + .ok_or(ValveError::ConfigError( + "Unable to retrieve configured constraints.".into(), + ))?; + + // Filter out message and history since they are not represented in the constraints config. + // They will be added implicitly to the list returned by verify_table_deps_and_sort. + let filtered_subset = table_subset + .iter() + .filter(|m| **m != "history" && **m != "message") + .map(|s| s.to_string()) + .collect::>(); + + let (sorted_subset, _, _) = + verify_table_deps_and_sort(&filtered_subset, &constraints_config); + + // Since the result of verify_table_deps_and_sort() will include dependencies of the tables + // in its input list, we filter those out here: + let mut sorted_subset = sorted_subset + .iter() + .filter(|m| table_subset.contains(&m.as_str())) + .map(|s| s.to_string()) + .collect::>(); + + if reverse { + sorted_subset.reverse(); + } + Ok(sorted_subset) + } + + /// Returns an IndexMap, indexed by configured table, containing lists of their dependencies. + /// If incoming is true, the lists are incoming dependencies, else they are outgoing. + pub fn collect_dependencies(&self, incoming: bool) -> IndexMap> { + let tables = self.get_sorted_table_list(false); + let mut dependencies = IndexMap::new(); + for table in tables { + dependencies.insert(table.to_string(), self.get_dependencies(table, incoming)); + } + dependencies + } + + /// Drop all configured tables, in reverse dependency order. + pub async fn drop_all_tables(&self) -> Result<&Self, ValveError> { + // Drop all of the database tables in the reverse of their sorted order: + self.drop_tables(&self.get_sorted_table_list(true)).await?; + Ok(self) + } + + /// Given a vector of table names, drop those tables, in the given order. + pub async fn drop_tables(&self, tables: &Vec<&str>) -> Result<&Self, ValveError> { + let drop_list = self.add_dependencies(tables, true); + for table in &drop_list { + if *table != "message" && *table != "history" { + let sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); + self.execute_sql(&sql).await?; + let sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table); + self.execute_sql(&sql).await?; + let sql = format!(r#"DROP TABLE IF EXISTS "{}_conflict""#, table); + self.execute_sql(&sql).await?; + } + let sql = format!(r#"DROP TABLE IF EXISTS "{}""#, table); + self.execute_sql(&sql).await?; + } + + Ok(self) + } + + /// Truncate all configured tables, in reverse dependency order. + pub async fn truncate_all_tables(&self) -> Result<&Self, ValveError> { + self.truncate_tables(&self.get_sorted_table_list(true)) + .await?; + Ok(self) + } + + /// Given a vector of table names, truncate those tables, in the given order. + pub async fn truncate_tables(&self, tables: &Vec<&str>) -> Result<&Self, ValveError> { + self.create_all_tables().await?; + let truncate_list = self.add_dependencies(tables, true); + + // We must use CASCADE in the case of PostgreSQL since we cannot truncate a table, T, that + // depends on another table, T', even in the case where we have previously truncated T'. + // SQLite does not need this. However SQLite does require that the tables be truncated in + // deletion order (which means that it must be checking that T' is empty). + let truncate_sql = |table: &str| -> String { + if self.pool.any_kind() == AnyKind::Postgres { + format!(r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#, table) + } else { + format!(r#"DELETE FROM "{}""#, table) + } + }; + + for table in &truncate_list { + let sql = truncate_sql(&table); + self.execute_sql(&sql).await?; + if *table != "message" && *table != "history" { + let sql = truncate_sql(&format!("{}_conflict", table)); + self.execute_sql(&sql).await?; + } + } + + Ok(self) + } + + /// Load all configured tables in dependency order. If `validate` is false, just try to insert + /// all rows, irrespective of whether they are valid or not or will possibly trigger a db error. + pub async fn load_all_tables(&self, validate: bool) -> Result<&Self, ValveError> { + let table_list = self.get_sorted_table_list(false); + if self.verbose { + info!("Processing {} tables.", table_list.len()); + } + self.load_tables(&table_list, validate).await + } + + /// Given a vector of table names, load those tables in the given order. If `validate` is false, + /// just try to insert all rows, irrespective of whether they are valid or not or will possibly + /// trigger a db error. + pub async fn load_tables( + &self, + table_list: &Vec<&str>, + validate: bool, + ) -> Result<&Self, ValveError> { + let list_for_truncation = self.sort_tables(table_list, true)?; + self.truncate_tables( + &list_for_truncation + .iter() + .map(|i| i.as_str()) + .collect::>(), + ) + .await?; + + let num_tables = table_list.len(); + let mut total_errors = 0; + let mut total_warnings = 0; + let mut total_infos = 0; + let mut table_num = 1; + for table_name in table_list { + if *table_name == "message" || *table_name == "history" { + continue; + } + let table_name = table_name.to_string(); + let path = String::from( + self.config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|o| o.get(&table_name)) + .and_then(|n| n.get("path")) + .and_then(|p| p.as_str()) + .unwrap(), + ); + let mut rdr = { + match File::open(path.clone()) { + Err(e) => { + warn!("Unable to open '{}': {}", path.clone(), e); + continue; + } + Ok(table_file) => ReaderBuilder::new() + .has_headers(false) + .delimiter(b'\t') + .from_reader(table_file), + } + }; + if self.verbose { + info!("Loading table {}/{}: {}", table_num, num_tables, table_name); + } + table_num += 1; + + // Extract the headers, which we will need later: + let mut records = rdr.records(); + let headers; + if let Some(result) = records.next() { + headers = result.unwrap(); + } else { + panic!("'{}' is empty", path); + } + + for header in headers.iter() { + if header.trim().is_empty() { + panic!( + "One or more of the header fields is empty for table '{}'", + table_name + ); + } + } + + // HashMap used to report info about the number of error/warning/info messages for this + // table when the verbose flag is set to true: + let mut messages_stats = HashMap::new(); + messages_stats.insert("error".to_string(), 0); + messages_stats.insert("warning".to_string(), 0); + messages_stats.insert("info".to_string(), 0); + + // Split the data into chunks of size CHUNK_SIZE before passing them to the validation + // logic: + let chunks = records.chunks(CHUNK_SIZE); + insert_chunks( + &self.config, + &self.pool, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &table_name, + &chunks, + &headers, + &mut messages_stats, + self.verbose, + validate, + ) + .await?; + + if validate { + // We need to wait until all of the rows for a table have been loaded before + // validating the "foreign" constraints on a table's trees, since this checks if the + // values of one column (the tree's parent) are all contained in another column (the + // tree's child). We also need to wait before validating a table's "under" + // constraints. Although the tree associated with such a constraint need not be + // defined on the same table, it can be. + let mut recs_to_update = + validate_tree_foreign_keys(&self.config, &self.pool, None, &table_name, None) + .await?; + recs_to_update.append( + &mut validate_under(&self.config, &self.pool, None, &table_name, None).await?, + ); + + for record in recs_to_update { + let row_number = record.get("row_number").unwrap(); + let column_name = record.get("column").and_then(|s| s.as_str()).unwrap(); + let value = record.get("value").and_then(|s| s.as_str()).unwrap(); + let level = record.get("level").and_then(|s| s.as_str()).unwrap(); + let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); + let message = record.get("message").and_then(|s| s.as_str()).unwrap(); + + let sql = local_sql_syntax( + &self.pool, + &format!( + r#"INSERT INTO "message" + ("table", "row", "column", "value", "level", "rule", "message") + VALUES ({}, {}, {}, {}, {}, {}, {})"#, + SQL_PARAM, + row_number, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM, + SQL_PARAM + ), + ); + let mut query = sqlx_query(&sql); + query = query.bind(&table_name); + query = query.bind(&column_name); + query = query.bind(&value); + query = query.bind(&level); + query = query.bind(&rule); + query = query.bind(&message); + query.execute(&self.pool).await?; + + if self.verbose { + // Add the generated message to messages_stats: + let messages = vec![json!({ + "message": message, + "level": level, + })]; + add_message_counts(&messages, &mut messages_stats); + } + } + } + + if self.verbose { + // Output a report on the messages generated to stderr: + let errors = messages_stats.get("error").unwrap(); + let warnings = messages_stats.get("warning").unwrap(); + let infos = messages_stats.get("info").unwrap(); + let status_message = format!( + "{} errors, {} warnings, and {} information messages generated for {}", + errors, warnings, infos, table_name + ); + info!("{}", status_message); + total_errors += errors; + total_warnings += warnings; + total_infos += infos; + } + } + + if self.verbose { + info!( + "Loading complete with {} errors, {} warnings, and {} information messages", + total_errors, total_warnings, total_infos + ); + } + Ok(self) + } + + /// Save all configured tables to their configured path's, unless save_dir is specified, + /// in which case save them there instead. + pub fn save_all_tables(&self, save_dir: &Option) -> Result<&Self, ValveError> { + let tables = self.get_sorted_table_list(false); + self.save_tables(&tables, save_dir)?; + Ok(self) + } + + /// Given a vector of table names, save those tables to their configured path's, unless + /// save_dir is specified, in which case save them there instead. + pub fn save_tables( + &self, + tables: &Vec<&str>, + save_dir: &Option, + ) -> Result<&Self, ValveError> { + let table_paths: HashMap = self + .config + .get("table") + .unwrap() + .as_object() + .unwrap() + .iter() + .filter(|(k, v)| { + !["message", "history"].contains(&k.as_str()) + && tables.contains(&k.as_str()) + && v.get("path").is_some() + }) + .map(|(k, v)| { + ( + k.clone(), + v.get("path").unwrap().as_str().unwrap().to_string(), + ) + }) + .collect(); + + info!( + "Saving tables: {} ...", + table_paths + .keys() + .map(|k| k.to_string()) + .collect::>() + .join(", ") + ); + for (table, path) in table_paths.iter() { + let columns: Vec<&str> = self + .config + .get("table") + .and_then(|v| v.as_object()) + .and_then(|o| o.get(table)) + .and_then(|v| v.as_object()) + .and_then(|o| o.get("column_order")) + .and_then(|v| v.as_array()) + .and_then(|v| Some(v.iter().map(|i| i.as_str().unwrap()).collect())) + .unwrap(); + + let path = match save_dir { + Some(s) => format!( + "{}/{}", + s, + Path::new(path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap() + ), + None => path.to_string(), + }; + self.save_table(table, &columns, &path)?; + } + + Ok(self) + } + + /// Save the given table with the given columns at the given path as a TSV file. + pub fn save_table( + &self, + table: &str, + columns: &Vec<&str>, + path: &str, + ) -> Result<&Self, ValveError> { + // TODO: Do some validation on the path. + + let mut quoted_columns = vec!["\"row_number\"".to_string()]; + quoted_columns.append( + &mut columns + .iter() + .map(|v| enquote::enquote('"', v)) + .collect::>(), + ); + let text_view = format!("\"{}_text_view\"", table); + let sql = format!( + r#"SELECT {} from {} ORDER BY "row_number""#, + quoted_columns.join(", "), + text_view + ); + + let mut writer = WriterBuilder::new() + .delimiter(b'\t') + .quote_style(QuoteStyle::Never) + .from_path(path)?; + writer.write_record(columns)?; + let mut stream = sqlx_query(&sql).fetch(&self.pool); + while let Some(row) = block_on(stream.try_next()).unwrap() { + let mut record: Vec<&str> = vec![]; + for column in columns.iter() { + let cell = row.try_get::<&str, &str>(column).ok().unwrap_or_default(); + record.push(cell); + } + writer.write_record(record)?; + } + writer.flush()?; + + Ok(self) + } + + /// Given a table name and a row, return the validated row. + pub async fn validate_row( + &self, + table_name: &str, + row: &ValveRow, + row_number: Option, + ) -> Result { + validate_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + None, + table_name, + row, + row_number, + None, + ) + .await + } + + /// Given a table name and a row as JSON, add the row to the table in the database, and return + /// the validated row, including its new row_number. + pub async fn insert_row( + &self, + table_name: &str, + row: &ValveRow, + ) -> Result<(u32, ValveRow), ValveError> { + let mut tx = self.pool.begin().await?; + + let row = validate_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + Some(&mut tx), + table_name, + row, + None, + None, + ) + .await?; + + let rn = insert_new_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table_name, + &row, + None, + true, + ) + .await?; + + record_row_change(&mut tx, table_name, &rn, None, Some(&row), &self.user).await?; + tx.commit().await?; + Ok((rn, row)) + } + + /// Given a table name, a row number, and a row, update the row in the database, and return the + /// validated row. + pub async fn update_row( + &self, + table_name: &str, + row_number: &u32, + row: &ValveRow, + ) -> Result { + let mut tx = self.pool.begin().await?; + + // Get the old version of the row from the database so that we can later record it to the + // history table: + let old_row = + get_row_from_db(&self.config, &self.pool, &mut tx, table_name, &row_number).await?; + + let row = validate_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + Some(&mut tx), + table_name, + row, + Some(*row_number), + None, + ) + .await?; + + update_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table_name, + &row, + row_number, + true, + false, + ) + .await?; + + // Record the row update in the history table: + record_row_change( + &mut tx, + table_name, + row_number, + Some(&old_row), + Some(&row), + &self.user, + ) + .await?; + + tx.commit().await?; + Ok(row) + } + + /// Given a table name and a row number, delete that row from the table. + pub async fn delete_row(&self, table_name: &str, row_number: &u32) -> Result<(), ValveError> { + let mut tx = self.pool.begin().await?; + + let row = + get_row_from_db(&self.config, &self.pool, &mut tx, &table_name, row_number).await?; + + record_row_change( + &mut tx, + &table_name, + row_number, + Some(&row), + None, + &self.user, + ) + .await?; + + delete_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table_name, + row_number, + ) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// Return the next change that can be undone, or None if there isn't any. + pub async fn get_record_to_undo(&self) -> Result, ValveError> { + // Look in the history table, get the row with the greatest ID, get the row number, + // from, and to, and determine whether the last operation was a delete, insert, or update. + let is_clause = if self.pool.any_kind() == AnyKind::Sqlite { + "IS" + } else { + "IS NOT DISTINCT FROM" + }; + let sql = format!( + r#"SELECT * FROM "history" + WHERE "undone_by" {} NULL + ORDER BY "history_id" DESC LIMIT 1"#, + is_clause + ); + let query = sqlx_query(&sql); + let result_row = query.fetch_optional(&self.pool).await?; + Ok(result_row) + } + + /// Return the next change that can be redone, or None if there isn't any. + pub async fn get_record_to_redo(&self) -> Result, ValveError> { + // Look in the history table, get the row with the greatest ID, get the row number, + // from, and to, and determine whether the last operation was a delete, insert, or update. + let is_not_clause = if self.pool.any_kind() == AnyKind::Sqlite { + "IS NOT" + } else { + "IS DISTINCT FROM" + }; + let sql = format!( + r#"SELECT * FROM "history" + WHERE "undone_by" {} NULL + ORDER BY "timestamp" DESC LIMIT 1"#, + is_not_clause + ); + let query = sqlx_query(&sql); + let result_row = query.fetch_optional(&self.pool).await?; + Ok(result_row) + } + + /// Undo one change and return the change record or None if there was no change to undo. + pub async fn undo(&self) -> Result, ValveError> { + let last_change = match self.get_record_to_undo().await? { + None => { + warn!("Nothing to undo."); + return Ok(None); + } + Some(r) => r, + }; + let history_id: i32 = last_change.get("history_id"); + let history_id = history_id as u16; + let table: &str = last_change.get("table"); + let row_number: i64 = last_change.get("row"); + let row_number = row_number as u32; + let from = get_json_from_row(&last_change, "from"); + let to = get_json_from_row(&last_change, "to"); + + match (from, to) { + (None, None) => { + return Err(ValveError::DataError( + "Cannot redo unknown operation from None to None".into(), + )) + } + (None, Some(_)) => { + // Undo an insert: + let mut tx = self.pool.begin().await?; + + delete_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &row_number, + ) + .await?; + + switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(None) + } + (Some(from), None) => { + // Undo a delete: + let mut tx = self.pool.begin().await?; + + insert_new_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &from, + Some(row_number), + false, + ) + .await?; + + switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(Some(from)) + } + (Some(from), Some(_)) => { + // Undo an an update: + let mut tx = self.pool.begin().await?; + + update_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &from, + &row_number, + false, + false, + ) + .await?; + + switch_undone_state(&self.user, history_id, true, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(Some(from)) + } + } + } + + /// Redo one change and return the change record or None if there was no change to redo. + pub async fn redo(&self) -> Result, ValveError> { + let last_undo = match self.get_record_to_redo().await? { + None => { + warn!("Nothing to redo."); + return Ok(None); + } + Some(last_undo) => { + let undone_by = last_undo.try_get_raw("undone_by")?; + if undone_by.is_null() { + warn!("Nothing to redo."); + return Ok(None); + } + last_undo + } + }; + let history_id: i32 = last_undo.get("history_id"); + let history_id = history_id as u16; + let table: &str = last_undo.get("table"); + let row_number: i64 = last_undo.get("row"); + let row_number = row_number as u32; + let from = get_json_from_row(&last_undo, "from"); + let to = get_json_from_row(&last_undo, "to"); + + match (from, to) { + (None, None) => { + return Err(ValveError::DataError( + "Cannot redo unknown operation from None to None".into(), + )) + } + (None, Some(to)) => { + // Redo an insert: + let mut tx = self.pool.begin().await?; + + insert_new_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &to, + Some(row_number), + false, + ) + .await?; + + switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(Some(to)) + } + (Some(_), None) => { + // Redo a delete: + let mut tx = self.pool.begin().await?; + + delete_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &row_number, + ) + .await?; + + switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(None) + } + (Some(_), Some(to)) => { + // Redo an an update: + let mut tx = self.pool.begin().await?; + + update_row_tx( + &self.config, + &self.compiled_datatype_conditions, + &self.compiled_rule_conditions, + &self.pool, + &mut tx, + table, + &to, + &row_number, + false, + false, + ) + .await?; + + switch_undone_state(&self.user, history_id, false, &mut tx, &self.pool).await?; + tx.commit().await?; + Ok(Some(to)) + } + } + } + + /// Given a table name, a column name, and (optionally) a string to match, return a JSON array + /// of possible valid values for the given column which contain the matching string as a + /// substring (or all of them if no matching string is given). The JSON array returned is + /// formatted for Typeahead, i.e., it takes the form: + /// `[{"id": id, "label": label, "order": order}, ...]`. + pub async fn get_matching_values( + &self, + table_name: &str, + column_name: &str, + matching_string: Option<&str>, + ) -> Result { + let config = &self.config; + let compiled_datatype_conditions = &self.compiled_datatype_conditions; + let parsed_structure_conditions = &self.parsed_structure_conditions; + let pool = &self.pool; + let dt_name = config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|t| t.get(table_name)) + .and_then(|t| t.as_object()) + .and_then(|t| t.get("column")) + .and_then(|c| c.as_object()) + .and_then(|c| c.get(column_name)) + .and_then(|c| c.as_object()) + .and_then(|c| c.get("datatype")) + .and_then(|d| d.as_str()) + .unwrap(); + + let dt_condition = compiled_datatype_conditions + .get(dt_name) + .and_then(|d| Some(d.parsed.clone())); + + let mut values = vec![]; + match dt_condition { + Some(Expression::Function(name, args)) if name == "in" => { + for arg in args { + if let Expression::Label(arg) = *arg { + // Remove the enclosing quotes from the values being returned: + let label = unquote(&arg).unwrap_or_else(|_| arg); + if let Some(s) = matching_string { + if label.contains(s) { + values.push(label); + } + } + } + } + } + _ => { + // If the datatype for the column does not correspond to an `in(...)` function, then + // we check the column's structure constraints. If they include a + // `from(foreign_table.foreign_column)` condition, then the values are taken from + // the foreign column. Otherwise if the structure includes an + // `under(tree_table.tree_column, value)` condition, then get the values from the + // tree column that are under `value`. + let structure = parsed_structure_conditions.get( + config + .get("table") + .and_then(|t| t.as_object()) + .and_then(|t| t.get(table_name)) + .and_then(|t| t.as_object()) + .and_then(|t| t.get("column")) + .and_then(|c| c.as_object()) + .and_then(|c| c.get(column_name)) + .and_then(|c| c.as_object()) + .and_then(|c| c.get("structure")) + .and_then(|d| d.as_str()) + .unwrap_or_else(|| ""), + ); + + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, &pool) + .unwrap(); + + match structure { + Some(ParsedStructure { original, parsed }) => { + let matching_string = { + match matching_string { + None => "%".to_string(), + Some(s) => format!("%{}%", s), + } + }; + + match parsed { + Expression::Function(name, args) if name == "from" => { + let foreign_key = &args[0]; + if let Expression::Field(ftable, fcolumn) = &**foreign_key { + let fcolumn_text = cast_column_sql_to_text(&fcolumn, &sql_type); + let sql = local_sql_syntax( + &pool, + &format!( + r#"SELECT "{}" FROM "{}" WHERE {} LIKE {}"#, + fcolumn, ftable, fcolumn_text, SQL_PARAM + ), + ); + let rows = sqlx_query(&sql) + .bind(&matching_string) + .fetch_all(pool) + .await?; + for row in rows.iter() { + values.push(get_column_value(&row, &fcolumn, &sql_type)); + } + } + } + Expression::Function(name, args) + if name == "under" || name == "tree" => + { + let mut tree_col = "not set"; + let mut under_val = Some("not set".to_string()); + if name == "under" { + if let Expression::Field(_, column) = &**&args[0] { + tree_col = column; + } + if let Expression::Label(label) = &**&args[1] { + under_val = Some(label.to_string()); + } + } else { + let tree_key = &args[0]; + if let Expression::Label(label) = &**tree_key { + tree_col = label; + under_val = None; + } + } + + let tree = config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|c| c.get("tree")) + .and_then(|t| t.as_object()) + .and_then(|t| t.get(table_name)) + .and_then(|t| t.as_array()) + .and_then(|t| { + t.iter().find(|o| o.get("child").unwrap() == tree_col) + }) + .expect( + format!("No tree: '{}.{}' found", table_name, tree_col) + .as_str(), + ) + .as_object() + .unwrap(); + let child_column = + tree.get("child").and_then(|c| c.as_str()).unwrap(); + + let (tree_sql, mut params) = with_tree_sql( + &config, + tree, + &table_name.to_string(), + &table_name.to_string(), + under_val.as_ref(), + None, + &pool, + ); + let child_column_text = + cast_column_sql_to_text(&child_column, &sql_type); + let sql = local_sql_syntax( + &pool, + &format!( + r#"{} SELECT "{}" FROM "tree" WHERE {} LIKE {}"#, + tree_sql, child_column, child_column_text, SQL_PARAM + ), + ); + params.push(matching_string); + + let mut query = sqlx_query(&sql); + for param in ¶ms { + query = query.bind(param); + } + + let rows = query.fetch_all(pool).await?; + for row in rows.iter() { + values.push(get_column_value(&row, &child_column, &sql_type)); + } + } + _ => panic!("Unrecognised structure: {}", original), + }; + } + None => (), + }; + } + }; + + let mut typeahead_values = vec![]; + for (i, v) in values.iter().enumerate() { + // enumerate() begins at 0 but we need to begin at 1: + let i = i + 1; + typeahead_values.push(json!({ + "id": v, + "label": v, + "order": i, + })); + } + + Ok(json!(typeahead_values)) + } +}