diff --git a/src/lib.rs b/src/lib.rs index 892c86ad..4081db99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -256,7 +256,6 @@ impl Valve { table: &str, column: &str, sqlite_pk: &u32, - sqlite_ctype: &str, ) -> Result { let pool = self.pool.as_ref().unwrap(); // A clojure to determine whether the given column has the given constraint type, which @@ -316,22 +315,47 @@ impl Valve { } }; - // Check if there is a change to whether this column is a primary key: + // Check if there is a change to whether this column is a primary/unique key: let is_primary = match pstruct { Expression::Label(label) if label == "primary" => true, _ => false, }; if is_primary != column_has_constraint_type("PRIMARY KEY")? { return Ok(true); - } - - // Check if there is a change to whether this column is a unique constraint: - let is_unique = match pstruct { - Expression::Label(label) if label == "unique" => true, - _ => false, - }; - if is_unique != column_has_constraint_type("UNIQUE")? { - return Ok(true); + } else if !is_primary { + let is_unique = match pstruct { + Expression::Label(label) if label == "unique" => true, + _ => false, + }; + let unique_in_db = column_has_constraint_type("UNIQUE")?; + if is_unique != unique_in_db { + // A child of a tree constraint implies a unique db constraint, so if there is a + // unique constraint in the db that is not configured, that is the explanation, + // and in that case we do not count this as a change to the column. + if !unique_in_db { + return Ok(true); + } else { + let trees = self + .global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("tree")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table)) + .and_then(|t| t.as_array()) + .and_then(|a| { + Some( + a.iter() + .map(|o| o.as_object().and_then(|o| o.get("child")).unwrap()), + ) + }) + .unwrap() + .collect::>(); + if !trees.contains(&&SerdeValue::String(column.to_string())) { + return Ok(true); + } + } + } } match pstruct { @@ -368,8 +392,10 @@ impl Valve { ); let rows = sqlx_query(&sql).fetch_all(pool).await?; if rows.len() == 0 { + // If the table doesn't even exist return true. return Ok(true); } else if rows.len() > 1 { + // This seems impossible given how PostgreSQL works: unreachable!(); } else { let row = &rows[0]; @@ -394,7 +420,15 @@ impl Valve { Ok(false) } - /// TODO: Add docstring here + /// Given the name of a table, determine whether its current instantiation in the database + /// differs from the way it has been configured. The answer to this question is yes whenever + /// (1) the number of columns or any of their names differs from their configured values, or + /// the order of database columns differs from the configured order; (2) The values in the + /// table table differ from their configured values; (3) The SQL type of one or more columns + /// does not match the configured SQL type for that column; (3) All columns with a 'unique', + /// 'primary', or 'from(table, column)' in their column configuration are associated, in the + /// database, with a unique constraint, primary key, and foreign key, respectively, and vice + /// versa. pub async fn table_has_changed(&self, table: &str) -> Result { let pool = self.pool.as_ref().unwrap(); let (columns_config, configured_column_order, description, table_type, path) = { @@ -575,7 +609,7 @@ impl Valve { ); } return Ok(true); - } else if rows.len() > 0 { + } else if rows.len() > 1 { if self.verbose { valve_log!( "WARN more than one row was returned from the query '{}'", @@ -589,9 +623,12 @@ impl Valve { // Check, for all tables, whether their column configuration matches the contents of the // database: for (cname, ctype, pk) in &db_columns_in_order { - // Do not consider special row/message/history identifier columns: + // Do not consider these special columns: if (table == "message" && cname == "message_id") + || (table == "message" && cname == "row") || (table == "history" && cname == "history_id") + || (table == "history" && cname == "timestamp") + || (table == "history" && cname == "row") || cname == "row_number" { continue; @@ -613,8 +650,8 @@ impl Valve { let s = sql_type.to_lowercase(); let c = ctype.to_lowercase(); // CHARACTER VARYING and VARCHAR are synonyms so we ignore this difference. - if !(s.starts_with("varchar") || s.starts_with("character varying")) - || !(c.starts_with("varchar") || c.starts_with("character varying")) + if !((s.starts_with("varchar") || s.starts_with("character varying")) + && (c.starts_with("varchar") || c.starts_with("character varying"))) { if self.verbose { valve_log!( @@ -640,7 +677,7 @@ impl Valve { .and_then(|p| Some(p.parsed.clone())) .unwrap(); if self - .structure_has_changed(&parsed_structure, table, &cname, &pk, &ctype) + .structure_has_changed(&parsed_structure, table, &cname, &pk) .await? { if self.verbose { @@ -663,51 +700,150 @@ impl Valve { Ok(false) } - /// Create all configured database tables and views if they do not already exist as configured. - pub async fn create_missing_tables(&mut self) -> Result<&mut Self, sqlx::Error> { - // DatabaseError - - // TODO: Add logging statements here. - - let mut tables_config = self + /// TODO: Add docstring here + pub async fn get_setup_statements(&self) -> Result>, sqlx::Error> { + let tables_config = self .global_config - .get_mut("table") - .and_then(|t| t.as_object_mut()) + .get("table") + .and_then(|t| t.as_object()) .unwrap() .clone(); - let mut datatypes_config = self + let datatypes_config = self .global_config - .get_mut("datatype") - .and_then(|d| d.as_object_mut()) + .get("datatype") + .and_then(|d| d.as_object()) .unwrap() .clone(); let pool = self.pool.as_ref().unwrap(); let parser = StartParser::new(); - let setup_statements = - get_setup_statements(&mut tables_config, &mut datatypes_config, &pool, &parser).await?; + // Begin by reading in the TSV files corresponding to the tables defined in tables_config, and + // use that information to create the associated database tables, while saving constraint + // information to constrains_config. + let mut setup_statements = HashMap::new(); + for table_name in tables_config.keys().cloned().collect::>() { + // Generate the statements for creating the table and its corresponding conflict table: + let mut table_statements = vec![]; + for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] { + let mut statements = + get_table_ddl(&tables_config, &datatypes_config, &parser, &table, &pool); + table_statements.append(&mut statements); + } - // Add the message and history tables to the beginning of the list of tables to create - // (the message table in particular needs to be at the beginning since the table views all - // reference it). + let (_, create_view_sql) = get_sql_for_standard_view(&table_name, pool); + let (_, create_text_view_sql) = + get_sql_for_text_view(&tables_config, &table_name, pool); + table_statements.push(create_view_sql); + table_statements.push(create_text_view_sql); + + setup_statements.insert(table_name.to_string(), table_statements); + } + + let text_type = get_sql_type(&datatypes_config, &"text".to_string(), pool).unwrap(); + + // Generate DDL for the history table: + let mut history_statements = vec![]; + history_statements.push(format!( + indoc! {r#" + CREATE TABLE "history" ( + {history_id} + "table" {text_type}, + "row" BIGINT, + "from" {text_type}, + "to" {text_type}, + "summary" {text_type}, + "user" {text_type}, + "undone_by" {text_type}, + {timestamp} + ); + "#}, + history_id = { + if pool.any_kind() == AnyKind::Sqlite { + "\"history_id\" INTEGER PRIMARY KEY," + } else { + "\"history_id\" SERIAL PRIMARY KEY," + } + }, + text_type = text_type, + timestamp = { + if pool.any_kind() == AnyKind::Sqlite { + "\"timestamp\" TIMESTAMP DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))" + } else { + "\"timestamp\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP" + } + }, + )); + history_statements + .push(r#"CREATE INDEX "history_tr_idx" ON "history"("table", "row");"#.to_string()); + setup_statements.insert("history".to_string(), history_statements); + + // Generate DDL for the message table: + let mut message_statements = vec![]; + message_statements.push(format!( + indoc! {r#" + CREATE TABLE "message" ( + {message_id} + "table" {text_type}, + "row" BIGINT, + "column" {text_type}, + "value" {text_type}, + "level" {text_type}, + "rule" {text_type}, + "message" {text_type} + ); + "#}, + message_id = { + if pool.any_kind() == AnyKind::Sqlite { + "\"message_id\" INTEGER PRIMARY KEY," + } else { + "\"message_id\" SERIAL PRIMARY KEY," + } + }, + text_type = text_type, + )); + message_statements.push( + r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(), + ); + setup_statements.insert("message".to_string(), message_statements); + + return Ok(setup_statements); + } + + /// TODO: Add docstring + pub async fn dump_schema(&self) -> Result<(), sqlx::Error> { + let setup_statements = self.get_setup_statements().await?; for table in self.get_tables_ordered_for_creation() { - // TODO: Use table_has_changed() to control whether a table is created or not, and once - // this is done, remove the "IF NOT EXISTS" qualifiers from all of the CREATE TABLE, - // CREATE VIEW, and CREATE INDEX statements. - if self.table_has_changed(table).await? { - // ... - } else { - // ... - } let table_statements = setup_statements.get(table).unwrap(); - for stmt in table_statements { - self.execute_sql(stmt).await?; + let output = String::from(table_statements.join("\n")); + println!("{}\n", output); + } + Ok(()) + } + + /// Create all configured database tables and views if they do not already exist as configured. + pub async fn create_missing_tables(&self) -> Result<&Self, sqlx::Error> { + // DatabaseError + + // TODO: Add logging statements here. + + let setup_statements = self.get_setup_statements().await?; + let sorted_table_list = self.get_tables_ordered_for_creation(); + for (i, table) in sorted_table_list.iter().enumerate() { + if self.table_has_changed(*table).await? { + let mut tables_to_drop = vec![""; sorted_table_list.len() - i]; + tables_to_drop.clone_from_slice(&sorted_table_list[i..]); + tables_to_drop.reverse(); + for table in tables_to_drop { + valve_log!("Dropping table {}", table); + self.drop_tables(vec![table]).await?; + } + + let table_statements = setup_statements.get(*table).unwrap(); + for stmt in table_statements { + self.execute_sql(stmt).await?; + } } - //if self.verbose { - // let output = String::from(table_statements.join("\n")); - // println!("{}\n", output); - //} } Ok(self) @@ -718,7 +854,18 @@ impl Valve { // DatabaseError // Drop all of the database tables in the reverse of their sorted order: - for table in self.get_tables_ordered_for_deletion() { + self.drop_tables(self.get_tables_ordered_for_deletion()) + .await?; + Ok(self) + } + + /// Given a vector of table names, + /// drop those tables, in the given order. + /// Return an error on invalid table name or database problem. + pub async fn drop_tables(&self, tables: Vec<&str>) -> Result<&Self, sqlx::Error> { + // DatabaseError + + for table in tables { if table != "message" && table != "history" { let sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); self.execute_sql(&sql).await?; @@ -734,15 +881,6 @@ impl Valve { Ok(self) } - /// Given a vector of table names, - /// drop those tables, in the given order. - /// Return an error on invalid table name or database problem. - pub fn drop_tables(&self, _tables: Vec<&str>) -> Result<&Self, sqlx::Error> { - // DatabaseError - // TODO - Ok(self) - } - /// Truncate all configured tables, in reverse dependency order. pub async fn truncate_all_tables(&self) -> Result<&Self, sqlx::Error> { // DatabaseError @@ -786,7 +924,7 @@ impl Valve { /// If `validate` is false, just try to insert all rows. /// Return an error on database problem, /// including database conflicts that prevent rows being inserted. - pub async fn load_all_tables(&mut self, _validate: bool) -> Result<&mut Self, sqlx::Error> { + pub async fn load_all_tables(&self, _validate: bool) -> Result<&Self, sqlx::Error> { // DatabaseError self.create_missing_tables().await?; @@ -1514,7 +1652,10 @@ pub fn read_config_files( "row", "from", "to", + "summary", "user", + "undone_by", + "timestamp", ], "column": { "table": { @@ -1566,6 +1707,14 @@ pub fn read_config_files( "datatype": "line", "structure": "", }, + "timestamp": { + "table": "history", + "column": "timestamp", + "description": "The time of the change, or of the undo.", + "datatype": "line", + "structure": "", + }, + } }), ); @@ -1840,16 +1989,9 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { ); } - let create_or_replace_view = { - if pool.any_kind() == AnyKind::Postgres { - "CREATE OR REPLACE VIEW" - } else { - "CREATE VIEW IF NOT EXISTS" - } - }; let create_view_sql = format!( indoc! {r#" - {create_or_replace_view} "{t}_view" AS + CREATE VIEW "{t}_view" AS SELECT union_t.*, {message_t} AS "message", @@ -1860,7 +2002,6 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { SELECT * FROM "{t}_conflict" ) as union_t; "#}, - create_or_replace_view = create_or_replace_view, t = table, message_t = message_t, history_t = history_t, @@ -1880,7 +2021,7 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { /// returned in the form of a tuple of Strings, with the first string being a SQL statement /// for dropping the view, and the second string being a SQL statement for creating it. fn get_sql_for_text_view( - tables_config: &mut SerdeMap, + tables_config: &SerdeMap, table: &str, pool: &AnyPool, ) -> (String, String) { @@ -1960,21 +2101,13 @@ fn get_sql_for_text_view( v }; - let create_or_replace_view = { - if pool.any_kind() == AnyKind::Postgres { - "CREATE OR REPLACE VIEW" - } else { - "CREATE VIEW IF NOT EXISTS" - } - }; let create_view_sql = format!( - r#"{create_or_replace_view} "{table}_text_view" AS + r#"CREATE VIEW "{table}_text_view" AS SELECT {outer_columns} FROM ( SELECT {inner_columns} FROM "{table}_view" ) t"#, - create_or_replace_view = create_or_replace_view, outer_columns = outer_columns.join(", "), inner_columns = inner_columns.join(", "), table = table, @@ -1983,102 +2116,6 @@ fn get_sql_for_text_view( (drop_view_sql, create_view_sql) } -/// TODO: Add docstring here -pub async fn get_setup_statements( - tables_config: &mut SerdeMap, - datatypes_config: &mut SerdeMap, - pool: &AnyPool, - parser: &StartParser, -) -> Result>, sqlx::Error> { - // Begin by reading in the TSV files corresponding to the tables defined in tables_config, and - // use that information to create the associated database tables, while saving constraint - // information to constrains_config. - let mut setup_statements = HashMap::new(); - for table_name in tables_config.keys().cloned().collect::>() { - // Generate the statements for creating the table and its corresponding conflict table: - let mut table_statements = vec![]; - for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] { - let mut statements = - get_table_ddl(tables_config, datatypes_config, parser, &table, &pool); - table_statements.append(&mut statements); - } - - let (_, create_view_sql) = get_sql_for_standard_view(&table_name, pool); - let (_, create_text_view_sql) = get_sql_for_text_view(tables_config, &table_name, pool); - table_statements.push(create_view_sql); - table_statements.push(create_text_view_sql); - - setup_statements.insert(table_name.to_string(), table_statements); - } - - // Generate DDL for the history table: - let mut history_statements = vec![]; - history_statements.push(format!( - indoc! {r#" - CREATE TABLE IF NOT EXISTS "history" ( - {row_number} - "table" TEXT, - "row" BIGINT, - "from" TEXT, - "to" TEXT, - "summary" TEXT, - "user" TEXT, - "undone_by" TEXT, - {timestamp} - ); - "#}, - row_number = { - if pool.any_kind() == AnyKind::Sqlite { - "\"history_id\" INTEGER PRIMARY KEY," - } else { - "\"history_id\" SERIAL PRIMARY KEY," - } - }, - timestamp = { - if pool.any_kind() == AnyKind::Sqlite { - "\"timestamp\" TIMESTAMP DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))" - } else { - "\"timestamp\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP" - } - }, - )); - history_statements.push( - r#"CREATE INDEX IF NOT EXISTS "history_tr_idx" ON "history"("table", "row");"#.to_string(), - ); - setup_statements.insert("history".to_string(), history_statements); - - // Generate DDL for the message table: - let mut message_statements = vec![]; - message_statements.push(format!( - indoc! {r#" - CREATE TABLE IF NOT EXISTS "message" ( - {} - "table" TEXT, - "row" BIGINT, - "column" TEXT, - "value" TEXT, - "level" TEXT, - "rule" TEXT, - "message" TEXT - ); - "#}, - { - if pool.any_kind() == AnyKind::Sqlite { - "\"message_id\" INTEGER PRIMARY KEY," - } else { - "\"message_id\" SERIAL PRIMARY KEY," - } - }, - )); - message_statements.push( - r#"CREATE INDEX IF NOT EXISTS "message_trc_idx" ON "message"("table", "row", "column");"# - .to_string(), - ); - setup_statements.insert("message".to_string(), message_statements); - - return Ok(setup_statements); -} - // TODO: Remove this function once it has been factored. /// Given config maps for tables and datatypes, a database connection pool, and a StartParser, /// read in the TSV files corresponding to the tables defined in the tables config, and use that @@ -5157,15 +5194,15 @@ fn get_table_constraints( // TODO: Add docstring here fn get_table_ddl( - tables_config: &mut SerdeMap, - datatypes_config: &mut SerdeMap, + tables_config: &SerdeMap, + datatypes_config: &SerdeMap, parser: &StartParser, table_name: &String, pool: &AnyPool, ) -> Vec { let mut statements = vec![]; let mut create_lines = vec![ - format!(r#"CREATE TABLE IF NOT EXISTS "{}" ("#, table_name), + format!(r#"CREATE TABLE "{}" ("#, table_name), String::from(r#" "row_number" BIGINT,"#), ]; @@ -5330,7 +5367,7 @@ fn get_table_ddl( && !primary_keys.contains(&SerdeValue::String(tree_child.to_string())) { statements.push(format!( - r#"CREATE UNIQUE INDEX IF NOT EXISTS "{}_{}_idx" ON "{}"("{}");"#, + r#"CREATE UNIQUE INDEX "{}_{}_idx" ON "{}"("{}");"#, table_name, tree_child, table_name, tree_child )); } @@ -5338,7 +5375,7 @@ fn get_table_ddl( // Finally, create a further unique index on row_number: statements.push(format!( - r#"CREATE UNIQUE INDEX IF NOT EXISTS "{}_row_number_idx" ON "{}"("row_number");"#, + r#"CREATE UNIQUE INDEX "{}_row_number_idx" ON "{}"("row_number");"#, table_name, table_name )); diff --git a/src/main.rs b/src/main.rs index 408e29bd..7c367c36 100644 --- a/src/main.rs +++ b/src/main.rs @@ -166,7 +166,7 @@ async fn main() -> Result<(), sqlx::Error> { ) .await?; } else { - let mut valve = + let valve = Valve::build(&source, &config_table, &destination, verbose, initial_load).await?; valve.load_all_tables(true).await?; diff --git a/test/src/datatype.tsv b/test/src/datatype.tsv index c162ca56..b2079e3b 100644 --- a/test/src/datatype.tsv +++ b/test/src/datatype.tsv @@ -18,7 +18,7 @@ real nonspace match(/-?\d+(\.\d+)?/) a positive or negative real number REAL R suffix word exclude(/\W/) a suffix for a CURIE table_name word exclude(/\W/) a table name table_type word lowercase in('table', 'column', 'datatype') a table type -text any text TEXT VARCHAR(100) xsd:string textarea +text any text TEXT TEXT xsd:string textarea trimmed_line line match(/\S([^\n]*\S)*/) a line of text that does not begin or end with whitespace trimmed_text text exclude(/^\s+|\s+$/) text that does not begin or end with whitespace word nonspace exclude(/\W/) a single word: letters, numbers, underscore