From f1f2d6142ba02679fada3f96aea4672b59d17071 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 16 Sep 2023 19:20:37 +0200 Subject: [PATCH 01/14] add new user view --- src/lib.rs | 320 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 218 insertions(+), 102 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fa9e9990..a7231e3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -725,6 +725,219 @@ pub fn get_parsed_structure_conditions( parsed_structure_conditions } +/// TODO: Add docstring here. +fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { + let table = match table.strip_suffix("_conflict") { + None => table.clone(), + Some(base) => base, + }; + let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table); + let message_t; + if pool.any_kind() == AnyKind::Postgres { + drop_view_sql.push_str(" CASCADE"); + message_t = format!( + indoc! {r#" + ( + SELECT JSON_AGG(m)::TEXT FROM ( + SELECT "column", "value", "level", "rule", "message" + FROM "message" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + ORDER BY "column", "message_id" + ) m + ) + "#}, + t = table, + ); + } else { + message_t = format!( + indoc! {r#" + ( + SELECT NULLIF( + JSON_GROUP_ARRAY( + JSON_OBJECT( + 'column', "column", + 'value', "value", + 'level', "level", + 'rule', "rule", + 'message', "message" + ) + ), + '[]' + ) + FROM "message" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + ORDER BY "column", "message_id" + ) + "#}, + t = table, + ); + } + drop_view_sql.push_str(";"); + + let history_t; + if pool.any_kind() == AnyKind::Postgres { + history_t = format!( + indoc! {r#" + ( + SELECT '[' || STRING_AGG("summary", ',') || ']' + FROM ( + SELECT "summary" + FROM "history" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + AND "summary" IS DISTINCT FROM NULL + AND "undone_by" IS NOT DISTINCT FROM NULL + ORDER BY "history_id" + ) h + ) + "#}, + t = table, + ); + } else { + history_t = format!( + indoc! {r#" + ( + SELECT '[' || GROUP_CONCAT("summary") || ']' + FROM ( + SELECT "summary" + FROM "history" + WHERE "table" = '{t}' + AND "row" = union_t."row_number" + AND "summary" IS NOT NULL + AND "undone_by" IS NULL + ORDER BY "history_id" + ) h + ) + "#}, + t = table, + ); + } + + let create_view_sql = format!( + indoc! {r#" + CREATE VIEW "{t}_view" AS + SELECT + union_t.*, + {message_t} AS "message", + {history_t} AS "history" + FROM ( + SELECT * FROM "{t}" + UNION ALL + SELECT * FROM "{t}_conflict" + ) as union_t; + "#}, + t = table, + message_t = message_t, + history_t = history_t, + ); + + (drop_view_sql, create_view_sql) +} + +/// TODO: Add docstring here. +fn get_sql_for_user_view( + tables_config: &mut SerdeMap, + table: &str, + pool: &AnyPool, +) -> (String, String) { + let table = match table.strip_suffix("_conflict") { + None => table.clone(), + Some(base) => base, + }; + let is_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS" + } else { + "IS NOT DISTINCT FROM" + }; + + // The config map can be the global map, or just the tables configuration map: + let real_columns = tables_config + .get(table) + .and_then(|t| t.as_object()) + .and_then(|t| t.get("column")) + .and_then(|t| t.as_object()) + .and_then(|t| Some(t.keys())) + .and_then(|k| Some(k.map(|k| k.to_string()))) + .and_then(|t| Some(t.collect::>())) + .unwrap(); + + // Add a second "user view" such that the datatypes of all values are TEXT and appear + // directly in their corresponsing columns (rather than as NULLs) even when they have + // errors. + let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_user_view""#, table); + if pool.any_kind() == AnyKind::Postgres { + drop_view_sql.push_str(" CASCADE"); + } + + let mut inner_columns = real_columns + .iter() + .map(|c| { + format!( + r#"CASE + WHEN "{column}" {is_clause} NULL THEN ( + SELECT value + FROM "message" + WHERE "row" = "row_number" + AND "column" = '{column}' + AND "table" = '{table}' + ORDER BY "message_id" DESC + LIMIT 1 + ) + ELSE {casted_column} + END AS "{column}""#, + casted_column = if pool.any_kind() == AnyKind::Sqlite { + cast_column_sql_to_text(c, "non-text") + } else { + format!("\"{}\"::TEXT", c) + }, + column = c, + table = table, + ) + }) + .collect::>(); + + let mut outer_columns = real_columns + .iter() + .map(|c| format!("t.\"{}\"", c)) + .collect::>(); + + let inner_columns = { + let mut v = vec![ + "row_number".to_string(), + "message".to_string(), + "history".to_string(), + ]; + v.append(&mut inner_columns); + v + }; + + let outer_columns = { + let mut v = vec![ + "t.row_number".to_string(), + "t.message".to_string(), + "t.history".to_string(), + ]; + v.append(&mut outer_columns); + v + }; + + let create_view_sql = format!( + r#"CREATE VIEW "{table}_user_view" AS + SELECT {outer_columns} + FROM ( + SELECT {inner_columns} + FROM "{table}_view" + ) t"#, + outer_columns = outer_columns.join(", "), + inner_columns = inner_columns.join(", "), + table = table, + ); + + (drop_view_sql, create_view_sql) +} + /// Given config maps for tables and datatypes, a database connection pool, and a StartParser, /// read in the TSV files corresponding to the tables defined in the tables config, and use that /// information to fill in constraints information into a new config map that is then returned along @@ -869,110 +1082,13 @@ pub async fn configure_db( } } - // Create a view as the union of the regular and conflict versions of the table: - let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table_name); - let message_t; - if pool.any_kind() == AnyKind::Postgres { - drop_view_sql.push_str(" CASCADE"); - message_t = format!( - indoc! {r#" - ( - SELECT JSON_AGG(m)::TEXT FROM ( - SELECT "column", "value", "level", "rule", "message" - FROM "message" - WHERE "table" = '{t}' - AND "row" = union_t."row_number" - ORDER BY "column", "message_id" - ) m - ) - "#}, - t = table_name, - ); - } else { - message_t = format!( - indoc! {r#" - ( - SELECT NULLIF( - JSON_GROUP_ARRAY( - JSON_OBJECT( - 'column', "column", - 'value', "value", - 'level', "level", - 'rule', "rule", - 'message', "message" - ) - ), - '[]' - ) - FROM "message" - WHERE "table" = '{t}' - AND "row" = union_t."row_number" - ORDER BY "column", "message_id" - ) - "#}, - t = table_name, - ); - } - drop_view_sql.push_str(";"); - - let history_t; - if pool.any_kind() == AnyKind::Postgres { - history_t = format!( - indoc! {r#" - ( - SELECT '[' || STRING_AGG("summary", ',') || ']' - FROM ( - SELECT "summary" - FROM "history" - WHERE "table" = '{t}' - AND "row" = union_t."row_number" - AND "summary" IS DISTINCT FROM NULL - AND "undone_by" IS NOT DISTINCT FROM NULL - ORDER BY "history_id" - ) h - ) - "#}, - t = table_name, - ); - } else { - history_t = format!( - indoc! {r#" - ( - SELECT '[' || GROUP_CONCAT("summary") || ']' - FROM ( - SELECT "summary" - FROM "history" - WHERE "table" = '{t}' - AND "row" = union_t."row_number" - AND "summary" IS NOT NULL - AND "undone_by" IS NULL - ORDER BY "history_id" - ) h - ) - "#}, - t = table_name, - ); - } - - let create_view_sql = format!( - indoc! {r#" - CREATE VIEW "{t}_view" AS - SELECT - union_t.*, - {message_t} AS "message", - {history_t} AS "history" - FROM ( - SELECT * FROM "{t}" - UNION ALL - SELECT * FROM "{t}_conflict" - ) as union_t; - "#}, - t = table_name, - message_t = message_t, - history_t = history_t, - ); + let (drop_view_sql, create_view_sql) = get_sql_for_standard_view(&table_name, pool); + let (drop_user_view_sql, create_user_view_sql) = + get_sql_for_user_view(tables_config, &table_name, pool); + table_statements.push(drop_user_view_sql); table_statements.push(drop_view_sql); table_statements.push(create_view_sql); + table_statements.push(create_user_view_sql); setup_statements.insert(table_name.to_string(), table_statements); } From 4871931f4b87674955a00c4c73781e79f9d9c816 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 29 Sep 2023 10:42:56 -0400 Subject: [PATCH 02/14] keep values whenever possible in API functions for updating rows and inserting new rows, even when there are errors. --- src/lib.rs | 330 +++++++++----------------------------- test/expected/history.tsv | 2 +- 2 files changed, 79 insertions(+), 253 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a7231e3e..464ac7b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,6 +75,9 @@ lazy_static! { static ref PG_SQL_TYPES: Vec<&'static str> = vec!["text", "varchar", "numeric", "integer", "real"]; static ref SL_SQL_TYPES: Vec<&'static str> = vec!["text", "numeric", "integer", "real"]; + // TODO: CHECK THAT NO CONFLICT RULES ARE MISSING FROM THIS LIST. + static ref CONFLICT_RULES: Vec<&'static str> = + vec!["key:primary", "key:unique", "key:foreign", "tree:child-unique", "tree:foreign"]; } /// An alias for [serde_json::Map](..//serde_json/struct.Map.html). @@ -725,7 +728,12 @@ pub fn get_parsed_structure_conditions( parsed_structure_conditions } -/// TODO: Add docstring here. +/// Given the name of a table and a database connection pool, generate SQL for creating a view +/// based on the table that provides a unified representation of the normal and conflict versions +/// of the table, plus columns summarising the information associated with the given table that is +/// contained in the message and history tables. The SQL generated is in the form of a tuple of +/// Strings, with the first string being a SQL statement for dropping the view, and the second +/// string being a SQL statement for creating it. fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { let table = match table.strip_suffix("_conflict") { None => table.clone(), @@ -836,7 +844,15 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { (drop_view_sql, create_view_sql) } -/// TODO: Add docstring here. +/// Given the name of a table and a database connection pool, generate SQL for creating a more +/// user-friendly version of the view that is generated by [get_sql_for_standard_view()]. +/// Unlike the standard view generated by that function, the view generated by this function +/// (called my_table_user_view) always shows all of the values of every column in the table, even +/// when those values contain errors. Also unlike the standard view, the datatypes of all columns +/// in the user view are TEXT (this is what makes it possible to always display error values). +/// Like the function for generating a standard view, the SQL generated by this function is in the +/// form of a tuple of Strings, with the first string being a SQL statement for dropping the view, +/// and the second string being a SQL statement for creating it. fn get_sql_for_user_view( tables_config: &mut SerdeMap, table: &str, @@ -852,7 +868,6 @@ fn get_sql_for_user_view( "IS NOT DISTINCT FROM" }; - // The config map can be the global map, or just the tables configuration map: let real_columns = tables_config .get(table) .and_then(|t| t.as_object()) @@ -2406,6 +2421,7 @@ pub async fn insert_new_row_tx( let mut insert_params = vec![]; let mut messages = vec![]; let sorted_datatypes = get_sorted_datatypes(global_config); + let mut use_conflict_table = false; for (column, cell) in row.iter() { insert_columns.append(&mut vec![format!(r#""{}""#, column)]); let cell = cell @@ -2417,17 +2433,8 @@ pub async fn insert_new_row_tx( let cell_value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( format!("No string named 'value' in {:?}", cell).into(), ))?; - let mut cell_for_insert = cell.clone(); - if cell_valid { - cell_for_insert.remove("value"); - let sql_type = get_sql_type_from_global_config(&global_config, &table, &column, pool) - .ok_or(SqlxCErr( - format!("Unable to determine SQL type for {}.{}", table, column).into(), - ))?; - insert_values.push(cast_sql_param_from_text(&sql_type)); - insert_params.push(String::from(cell_value)); - } else { - insert_values.push(String::from("NULL")); + let mut insert_null = false; + if !cell_valid { let cell_messages = sort_messages( &sorted_datatypes, cell.get("messages") @@ -2437,6 +2444,23 @@ pub async fn insert_new_row_tx( ))?, ); for cell_message in cell_messages { + let level = cell_message + .get("level") + .and_then(|l| l.as_str()) + .ok_or(SqlxCErr(format!("No 'level' in {:?}", cell).into()))?; + let rule = cell_message + .get("rule") + .and_then(|l| l.as_str()) + .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; + if level == "error" { + if CONFLICT_RULES.contains(&rule) { + use_conflict_table = true; + } + if rule.starts_with("datatype:") { + use_conflict_table = true; + insert_null = true; + } + } messages.push(json!({ "column": column, "value": cell_value, @@ -2455,6 +2479,17 @@ pub async fn insert_new_row_tx( })); } } + + if insert_null { + insert_values.push(String::from("NULL")); + } else { + let sql_type = get_sql_type_from_global_config(&global_config, &table, &column, pool) + .ok_or(SqlxCErr( + format!("Unable to determine SQL type for {}.{}", table, column).into(), + ))?; + insert_values.push(cast_sql_param_from_text(&sql_type)); + insert_params.push(String::from(cell_value)); + } } // Used to validate the given row, counterfactually, "as if" the version of the row in the @@ -2474,47 +2509,13 @@ pub async fn insert_new_row_tx( .map_err(|e| SqlxCErr(e.into()))?; // Check it to see if the row should be redirected to the conflict table: - let mut table_to_write = String::from(table); - let mut outer_break = false; - for (column, cell) in row.iter() { - if outer_break { - break; - } - let valid = cell.get("valid").ok_or(SqlxCErr( - format!("No flag named 'valid' in {:?}", cell).into(), - ))?; - if valid == false { - let structure = global_config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table)) - .and_then(|t| t.as_object()) - .and_then(|t| t.get("column")) - .and_then(|c| c.as_object()) - .and_then(|c| c.get(column)) - .and_then(|c| c.as_object()) - .and_then(|c| c.get("structure")) - .and_then(|s| s.as_str()) - .unwrap_or_else(|| ""); - if vec!["primary", "unique"].contains(&structure) || structure.starts_with("tree(") { - let messages = cell - .get("messages") - .and_then(|m| m.as_array()) - .ok_or(SqlxCErr(format!("No 'messages' in {:?}", cell).into()))?; - for msg in messages { - let level = msg - .get("level") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'level' in {:?}", cell).into()))?; - if level == "error" { - table_to_write.push_str("_conflict"); - outer_break = true; - break; - } - } - } + let table_to_write = { + if use_conflict_table { + format!("{}_conflict", table) + } else { + String::from(table) } - } + }; // Add the new row to the table: let insert_stmt = local_sql_syntax( @@ -2768,124 +2769,6 @@ pub async fn update_row_tx( skip_validation: bool, do_not_recurse: bool, ) -> Result<(), sqlx::Error> { - async fn direct_update( - // Function to update the database table directly using an UPDATE statement, as opposed to - // a DELETE followed by an INSERT which must be used when the row needs to move from the - // normal to the conflict table or vice versa. - global_config: &SerdeMap, - pool: &AnyPool, - tx: &mut Transaction<'_, sqlx::Any>, - table_to_write: &str, - row_number: &u32, - row: &SerdeMap, - ) -> Result<(), sqlx::Error> { - let base_table = match table_to_write.strip_suffix("_conflict") { - None => table_to_write.clone(), - Some(base) => base, - }; - // Prepare the row and messages for the database update: - let mut assignments = vec![]; - let mut params = vec![]; - let mut messages = vec![]; - let sorted_datatypes = get_sorted_datatypes(global_config); - for (column, cell) in row.iter() { - let cell = cell - .as_object() - .ok_or(SqlxCErr(format!("Cell {:?} is not an object", cell).into()))?; - let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).ok_or(SqlxCErr( - format!("No flag named 'valid' in {:?}", cell).into(), - ))?; - let cell_value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( - format!("No str named 'value' in {:?}", cell).into(), - ))?; - - // Generate the assignment statements and messages for each column: - let mut cell_for_insert = cell.clone(); - if cell_valid { - cell_for_insert.remove("value"); - let sql_type = get_sql_type_from_global_config( - &global_config, - &base_table.to_string(), - &column, - pool, - ) - .ok_or(SqlxCErr( - format!("Unable to determine SQL type for {}.{}", base_table, column).into(), - ))?; - assignments.push(format!( - r#""{}" = {}"#, - column, - cast_sql_param_from_text(&sql_type) - )); - params.push(String::from(cell_value)); - } else { - assignments.push(format!(r#""{}" = NULL"#, column)); - let cell_messages = sort_messages( - &sorted_datatypes, - cell.get("messages") - .and_then(|m| m.as_array()) - .ok_or(SqlxCErr( - format!("No array named 'messages' in {:?}", cell).into(), - ))?, - ); - for cell_message in cell_messages { - messages.push(json!({ - "column": String::from(column), - "value": String::from(cell_value), - "level": cell_message.get("level").and_then(|s| s.as_str()).ok_or( - SqlxCErr(format!("No 'level' in {:?}", cell_message).into()) - )?, - "rule": cell_message.get("rule").and_then(|s| s.as_str()).ok_or( - SqlxCErr(format!("No 'rule' in {:?}", cell_message).into()) - )?, - "message": cell_message.get("message").and_then(|s| s.as_str()).ok_or( - SqlxCErr(format!("No 'message' in {:?}", cell_message).into()) - )?, - })); - } - } - } - - let mut update_stmt = format!(r#"UPDATE "{}" SET "#, table_to_write); - update_stmt.push_str(&assignments.join(", ")); - update_stmt.push_str(&format!(r#" WHERE "row_number" = {}"#, row_number)); - let update_stmt = local_sql_syntax(&pool, &update_stmt); - let mut query = sqlx_query(&update_stmt); - for param in ¶ms { - query = query.bind(param); - } - query.execute(tx.acquire().await?).await?; - - // Now delete any messages that had been previously inserted to the message table for the - // old version of this row: - let delete_sql = format!( - r#"DELETE FROM "message" WHERE "table" = '{}' AND "row" = {}"#, - base_table, row_number - ); - let query = sqlx_query(&delete_sql); - query.execute(tx.acquire().await?).await?; - - // Now add the messages to the message table for the new version of this row: - for m in messages { - let column = m.get("column").and_then(|c| c.as_str()).unwrap(); - let value = m.get("value").and_then(|c| c.as_str()).unwrap(); - let level = m.get("level").and_then(|c| c.as_str()).unwrap(); - let rule = m.get("rule").and_then(|c| c.as_str()).unwrap(); - let message = m.get("message").and_then(|c| c.as_str()).unwrap(); - let message = message.replace("'", "''"); - let insert_sql = format!( - r#"INSERT INTO "message" - ("table", "row", "column", "value", "level", "rule", "message") - VALUES ('{}', {}, '{}', '{}', '{}', '{}', '{}')"#, - base_table, row_number, column, value, level, rule, message - ); - let query = sqlx_query(&insert_sql); - query.execute(tx.acquire().await?).await?; - } - - Ok(()) - } - // Remove any _conflict suffix from the table name: let table = match table.strip_suffix("_conflict") { None => table.clone(), @@ -2945,86 +2828,29 @@ pub async fn update_row_tx( row.clone() }; - // Now figure out whether the row is currently in the base table or the conflict table: - let sql = format!( - "SELECT 1 FROM \"{}\" WHERE row_number = {}", - table, row_number - ); - let query = sqlx_query(&sql); - let rows = query.fetch_all(tx.acquire().await?).await?; - let mut current_table = String::from(table); - if rows.len() == 0 { - current_table.push_str("_conflict"); - } - - // Next, figure out where to put the new version of the row: - let mut table_to_write = String::from(table); - for (column, cell) in row.iter() { - let valid = cell.get("valid").ok_or(SqlxCErr( - format!("No flag named 'valid' in {:?}", cell).into(), - ))?; - if valid == false { - let structure = global_config - .get("table") - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table)) - .and_then(|t| t.as_object()) - .and_then(|t| t.get("column")) - .and_then(|c| c.as_object()) - .and_then(|c| c.get(column)) - .and_then(|c| c.as_object()) - .and_then(|c| c.get("structure")) - .and_then(|s| s.as_str()) - .unwrap_or_else(|| ""); - if vec!["primary", "unique"].contains(&structure) || structure.starts_with("tree(") { - let messages = cell - .get("messages") - .and_then(|m| m.as_array()) - .ok_or(SqlxCErr( - format!("No array named 'messages' in {:?}", cell).into(), - ))?; - for msg in messages { - let level = msg - .get("level") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'level' in {:?}", msg).into()))?; - if level == "error" { - table_to_write.push_str("_conflict"); - break; - } - } - } - } - } - - // If table_to_write and current_table are the same, update it. Otherwise delete the current - // version of the row from the database and insert the new version to table_to_write: - if table_to_write == current_table { - direct_update(global_config, pool, tx, &table_to_write, row_number, &row).await?; - } else { - delete_row_tx( - global_config, - compiled_datatype_conditions, - compiled_rule_conditions, - pool, - tx, - table, - row_number, - ) - .await?; - insert_new_row_tx( - global_config, - compiled_datatype_conditions, - compiled_rule_conditions, - pool, - tx, - table, - &row, - Some(*row_number), - false, - ) - .await?; - } + // Perform the update in two steps: + delete_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + tx, + table, + row_number, + ) + .await?; + insert_new_row_tx( + global_config, + compiled_datatype_conditions, + compiled_rule_conditions, + pool, + tx, + table, + &row, + Some(*row_number), + false, + ) + .await?; // Now process the rows from the same table as the target table that need to be re-validated // because of unique or primary constraints: diff --git a/test/expected/history.tsv b/test/expected/history.tsv index 42c70bcc..23f15d77 100644 --- a/test/expected/history.tsv +++ b/test/expected/history.tsv @@ -5,7 +5,7 @@ history_id table row from to summary user undone_by 4 table6 10 {"bar":{"messages":[{"level":"error","message":"An unrelated error","rule":"custom:unrelated"}],"valid":false,"value":"2"},"child":{"messages":[{"level":"error","message":"Values of child must be unique","rule":"tree:child-unique"}],"valid":false,"value":"2"},"foo":{"messages":[],"valid":true,"value":"a"},"parent":{"messages":[],"valid":true,"value":"6"},"xyzzy":{"messages":[{"level":"error","message":"Value '23' of column xyzzy is not in table6.child","rule":"under:not-in-tree"}],"valid":false,"value":"23"}} VALVE 5 table10 1 {"foreign_column":{"messages":[],"valid":true,"value":"a"},"numeric_foreign_column":{"messages":[],"valid":true,"value":"1"},"other_foreign_column":{"messages":[],"valid":true,"value":"a"}} {"foreign_column":{"messages":[],"valid":true,"value":"w"},"numeric_foreign_column":{"messages":[{"level":"error","message":"numeric_foreign_column should be a positive or negative integer","rule":"datatype:integer"},{"level":"error","message":"numeric_foreign_column should be a line of text that does not begin or end with whitespace","rule":"datatype:trimmed_line"}],"valid":false,"value":""},"other_foreign_column":{"messages":[],"valid":true,"value":"z"}} [{"column":"foreign_column","level":"update","message":"Value changed from 'a' to 'w'","old_value":"a","value":"w"},{"column":"numeric_foreign_column","level":"update","message":"Value changed from 1 to ''","old_value":"1","value":""},{"column":"other_foreign_column","level":"update","message":"Value changed from 'a' to 'z'","old_value":"a","value":"z"}] VALVE 6 table11 2 {"bar":{"messages":[],"valid":true,"value":"f"},"child":{"messages":[],"valid":true,"value":"b"},"foo":{"messages":[],"valid":true,"value":"e"},"parent":{"messages":[],"valid":true,"value":"c"},"xyzzy":{"messages":[],"valid":true,"value":"d"}} {"bar":{"messages":[],"valid":true,"value":"f"},"child":{"messages":[],"valid":true,"value":"b"},"foo":{"messages":[{"level":"error","message":"Values of foo must be unique","rule":"key:primary"}],"valid":false,"value":"d"},"parent":{"messages":[],"valid":true,"value":"c"},"xyzzy":{"messages":[],"valid":true,"value":"d"}} [{"column":"foo","level":"update","message":"Value changed from 'e' to 'd'","old_value":"e","value":"d"}] VALVE -7 table11 4 {"bar":{"messages":[],"valid":true,"value":"z"},"child":{"messages":[],"valid":true,"value":"f"},"foo":{"messages":[{"column":"foo","level":"error","message":"Values of foo must be unique","rule":"key:primary","value":"e"}],"valid":false,"value":"e"},"parent":{"messages":[],"valid":true,"value":"g"},"xyzzy":{"messages":[],"valid":true,"value":"x"}} VALVE +7 table11 4 {"bar":{"messages":[],"valid":true,"value":"z"},"child":{"messages":[],"valid":true,"value":"f"},"foo":{"messages":[],"valid":true,"value":"e"},"parent":{"messages":[],"valid":true,"value":"g"},"xyzzy":{"messages":[],"valid":true,"value":"x"}} VALVE 8 table10 9 {"foreign_column":{"messages":[],"valid":true,"value":"i"},"numeric_foreign_column":{"messages":[],"valid":true,"value":"9"},"other_foreign_column":{"messages":[],"valid":true,"value":"i"}} VALVE 9 table10 10 {"foreign_column":{"messages":[],"valid":true,"value":"j"},"numeric_foreign_column":{"messages":[],"valid":true,"value":"10"},"other_foreign_column":{"messages":[],"valid":true,"value":"j"}} VALVE VALVE 10 table10 8 {"foreign_column":{"messages":[],"valid":true,"value":"h"},"numeric_foreign_column":{"messages":[],"valid":true,"value":"8"},"other_foreign_column":{"messages":[],"valid":true,"value":"h"}} {"foreign_column":{"messages":[],"valid":true,"value":"k"},"numeric_foreign_column":{"messages":[],"valid":true,"value":"11"},"other_foreign_column":{"messages":[],"valid":true,"value":"k"}} [{"column":"foreign_column","level":"update","message":"Value changed from 'h' to 'k'","old_value":"h","value":"k"},{"column":"numeric_foreign_column","level":"update","message":"Value changed from 8 to 11","old_value":"8","value":"11"},{"column":"other_foreign_column","level":"update","message":"Value changed from 'h' to 'k'","old_value":"h","value":"k"}] VALVE VALVE From a2950a1811d2e288546f8f6e28d37cd51db6e8d9 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 30 Sep 2023 14:31:39 -0400 Subject: [PATCH 03/14] keep values whenever possible during batch loading --- src/lib.rs | 151 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 62 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 464ac7b4..b06fd7b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,9 +75,6 @@ lazy_static! { static ref PG_SQL_TYPES: Vec<&'static str> = vec!["text", "varchar", "numeric", "integer", "real"]; static ref SL_SQL_TYPES: Vec<&'static str> = vec!["text", "numeric", "integer", "real"]; - // TODO: CHECK THAT NO CONFLICT RULES ARE MISSING FROM THIS LIST. - static ref CONFLICT_RULES: Vec<&'static str> = - vec!["key:primary", "key:unique", "key:foreign", "tree:child-unique", "tree:foreign"]; } /// An alias for [serde_json::Map](..//serde_json/struct.Map.html). @@ -2293,6 +2290,68 @@ pub async fn redo( Ok(()) } +/// Given a global config map and a table name, return a list of the columns from the table +/// that may potentially result in database conflicts. +fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec { + let mut conflict_columns = vec![]; + let primaries = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|c| c.get("primary")) + .and_then(|t| t.as_object()) + .and_then(|t| t.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap(); + + let uniques = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|c| c.get("unique")) + .and_then(|t| t.as_object()) + .and_then(|t| t.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap(); + + let trees = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("tree")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap() + .iter() + .map(|v| v.as_object().unwrap()) + .map(|v| v.get("child").unwrap().clone()) + .collect::>(); + + for key_columns in vec![primaries, uniques, &trees] { + for column in key_columns { + if !conflict_columns.contains(column) { + conflict_columns.push(column.clone()); + } + } + } + + conflict_columns +} + +/// Given a string describing a rule violation, return true if it is the kind of violation that +/// will trigger a database error or false otherwise. +fn causes_db_error(rule: &str) -> bool { + vec![ + "key:primary", + "key:unique", + "key:foreign", + "tree:child-unique", + "tree:foreign", + "under:not-in-tree", + "under:not-under", + ] + .contains(&rule) + || rule.starts_with("datatype:") +} + /// A wrapper around [insert_new_row_tx()] in which the following steps are also performed: /// - A database transaction is created and then committed once the given new row has been inserted. /// - The row is validated before insertion and the update to the database is recorded to the @@ -2421,6 +2480,7 @@ pub async fn insert_new_row_tx( let mut insert_params = vec![]; let mut messages = vec![]; let sorted_datatypes = get_sorted_datatypes(global_config); + let conflict_columns = get_conflict_columns(global_config, table); let mut use_conflict_table = false; for (column, cell) in row.iter() { insert_columns.append(&mut vec![format!(r#""{}""#, column)]); @@ -2433,6 +2493,10 @@ pub async fn insert_new_row_tx( let cell_value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( format!("No string named 'value' in {:?}", cell).into(), ))?; + + if !use_conflict_table && !cell_valid && conflict_columns.contains(&json!(column)) { + use_conflict_table = true; + } let mut insert_null = false; if !cell_valid { let cell_messages = sort_messages( @@ -2452,14 +2516,8 @@ pub async fn insert_new_row_tx( .get("rule") .and_then(|l| l.as_str()) .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; - if level == "error" { - if CONFLICT_RULES.contains(&rule) { - use_conflict_table = true; - } - if rule.starts_with("datatype:") { - use_conflict_table = true; - insert_null = true; - } + if level == "error" && causes_db_error(&rule) { + insert_null = true; } messages.push(json!({ "column": column, @@ -3887,50 +3945,6 @@ async fn make_inserts( ), sqlx::Error, > { - let conflict_columns = { - let mut conflict_columns = vec![]; - let primaries = config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|c| c.get("primary")) - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table_name)) - .and_then(|t| t.as_array()) - .unwrap(); - - let uniques = config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|c| c.get("unique")) - .and_then(|t| t.as_object()) - .and_then(|t| t.get(table_name)) - .and_then(|t| t.as_array()) - .unwrap(); - - let trees = config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|o| o.get("tree")) - .and_then(|t| t.as_object()) - .and_then(|o| o.get(table_name)) - .and_then(|t| t.as_array()) - .unwrap() - .iter() - .map(|v| v.as_object().unwrap()) - .map(|v| v.get("child").unwrap().clone()) - .collect::>(); - - for key_columns in vec![primaries, uniques, &trees] { - for column in key_columns { - if !conflict_columns.contains(column) { - conflict_columns.push(column.clone()); - } - } - } - - conflict_columns - }; - fn generate_sql( config: &SerdeMap, table_name: &String, @@ -3949,10 +3963,22 @@ async fn make_inserts( let mut values = vec![format!("{}", row.row_number.unwrap())]; for column in column_names { let cell = row.contents.get(column).unwrap(); - - // Insert the value of the cell into the column unless it is invalid or has the - // nulltype field set, in which case insert NULL: - if cell.nulltype == None && cell.valid { + // Insert the value of the cell into the column unless inserting it will cause a db + // error or it has the nulltype field set, in which case insert NULL: + let mut insert_null = false; + if cell.nulltype != None { + insert_null = true; + } else if !cell.valid { + for cell_message in &cell.messages { + let level = cell_message.get("level").and_then(|l| l.as_str()).unwrap(); + let rule = cell_message.get("rule").and_then(|l| l.as_str()).unwrap(); + if level == "error" && causes_db_error(&rule) { + insert_null = true; + break; + } + } + } + if !insert_null { let sql_type = get_sql_type_from_global_config(&config, &table_name, &column, pool) .unwrap(); @@ -4053,23 +4079,24 @@ async fn make_inserts( (output, params, message_output, message_params) } - fn has_conflict(row: &ResultRow, conflict_columns: &Vec) -> bool { + fn has_error_in_conflict_column(row: &ResultRow, conflict_columns: &Vec) -> bool { for (column, cell) in &row.contents { let column = SerdeValue::String(column.to_string()); - if conflict_columns.contains(&column) && !cell.valid { + if !cell.valid && conflict_columns.contains(&column) { return true; } } return false; } + let conflict_columns = get_conflict_columns(config, table_name); let mut main_rows = vec![]; let mut conflict_rows = vec![]; for (i, row) in rows.iter_mut().enumerate() { // enumerate begins at 0 but we need to begin at 1: let i = i + 1; row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32); - if has_conflict(&row, &conflict_columns) { + if has_error_in_conflict_column(&row, &conflict_columns) { conflict_rows.push(row.clone()); } else { main_rows.push(row.clone()); From b5cd0012c37aef0755955bb130d81d7ec7c7c81f Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 30 Sep 2023 16:07:04 -0400 Subject: [PATCH 04/14] avoid more nulls --- src/lib.rs | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b06fd7b0..8b2ba95c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2312,6 +2312,19 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); + let trees = global_config .get("constraints") .and_then(|c| c.as_object()) @@ -2325,7 +2338,20 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - for key_columns in vec![primaries, uniques, &trees] { + let unders = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("under")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap() + .iter() + .map(|v| v.as_object().unwrap()) + .map(|v| v.get("tcolumn").unwrap().clone()) + .collect::>(); + + for key_columns in vec![primaries, uniques, &foreigns, &trees, &unders] { for column in key_columns { if !conflict_columns.contains(column) { conflict_columns.push(column.clone()); @@ -2517,7 +2543,9 @@ pub async fn insert_new_row_tx( .and_then(|l| l.as_str()) .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; if level == "error" && causes_db_error(&rule) { - insert_null = true; + if !use_conflict_table || rule.starts_with("datatype:") { + insert_null = true; + } } messages.push(json!({ "column": column, @@ -3973,8 +4001,10 @@ async fn make_inserts( let level = cell_message.get("level").and_then(|l| l.as_str()).unwrap(); let rule = cell_message.get("rule").and_then(|l| l.as_str()).unwrap(); if level == "error" && causes_db_error(&rule) { - insert_null = true; - break; + if !table_name.ends_with("_conflict") || rule.starts_with("datatype:") { + insert_null = true; + break; + } } } } From 348a75e5c0ff10c2826f223fc4d0f37a4abc24af Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Tue, 17 Oct 2023 09:53:59 +0200 Subject: [PATCH 05/14] fix bug when determining conflict columns --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 8b2ba95c..85899c50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2322,6 +2322,7 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); @@ -2348,6 +2349,7 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); From 830bec5c0a8b7e3458e0ab8a2f2ab5d2c4060d17 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Tue, 17 Oct 2023 11:11:09 +0200 Subject: [PATCH 06/14] fix bug in determining conflict columns better, and avoid nulls unless they result in db errors --- src/lib.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 85899c50..74b0f514 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2312,7 +2312,20 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); + + let foreign_targets = global_config .get("constraints") .and_then(|c| c.as_object()) .and_then(|o| o.get("foreign")) @@ -2326,7 +2339,20 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - let trees = global_config + let tree_parents = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("tree")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap() + .iter() + .map(|v| v.as_object().unwrap()) + .map(|v| v.get("parent").unwrap().clone()) + .collect::>(); + + let tree_children = global_config .get("constraints") .and_then(|c| c.as_object()) .and_then(|o| o.get("tree")) @@ -2339,7 +2365,20 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - let unders = global_config + let under_sources = global_config + .get("constraints") + .and_then(|c| c.as_object()) + .and_then(|o| o.get("under")) + .and_then(|t| t.as_object()) + .and_then(|o| o.get(table_name)) + .and_then(|t| t.as_array()) + .unwrap() + .iter() + .map(|v| v.as_object().unwrap()) + .map(|v| v.get("column").unwrap().clone()) + .collect::>(); + + let under_targets = global_config .get("constraints") .and_then(|c| c.as_object()) .and_then(|o| o.get("under")) @@ -2353,7 +2392,16 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - for key_columns in vec![primaries, uniques, &foreigns, &trees, &unders] { + for key_columns in vec![ + primaries, + uniques, + &foreign_sources, + &foreign_targets, + &tree_parents, + &tree_children, + &under_sources, + &under_targets, + ] { for column in key_columns { if !conflict_columns.contains(column) { conflict_columns.push(column.clone()); @@ -2366,7 +2414,7 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec bool { +fn is_db_key_error(rule: &str) -> bool { vec![ "key:primary", "key:unique", @@ -2377,7 +2425,35 @@ fn causes_db_error(rule: &str) -> bool { "under:not-under", ] .contains(&rule) - || rule.starts_with("datatype:") +} + +/// TODO: Add a docstring here +fn is_sql_type_error(sql_type: &str, value: &str) -> bool { + let sql_type = sql_type.to_lowercase(); + if sql_type == "numeric" { + // f64 + let numeric_value: Result = value.parse(); + match numeric_value { + Ok(_) => false, + Err(_) => true, + } + } else if sql_type == "integer" { + // i32 + let integer_value: Result = value.parse(); + match integer_value { + Ok(_) => false, + Err(_) => true, + } + } else if sql_type == "real" { + // f64 (actually f32) + let float_value: Result = value.parse(); + match float_value { + Ok(_) => false, + Err(_) => true, + } + } else { + false + } } /// A wrapper around [insert_new_row_tx()] in which the following steps are also performed: @@ -2544,8 +2620,14 @@ pub async fn insert_new_row_tx( .get("rule") .and_then(|l| l.as_str()) .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; - if level == "error" && causes_db_error(&rule) { - if !use_conflict_table || rule.starts_with("datatype:") { + let is_db_key_error = is_db_key_error(&rule); + let sql_type = get_sql_type_from_global_config(global_config, table, column, pool) + .ok_or(SqlxCErr( + format!("Could not get SQL type for {}.{}", table, column).into(), + ))?; + let is_sql_type_error = is_sql_type_error(&sql_type, cell_value); + if level == "error" && (is_db_key_error || is_sql_type_error) { + if !use_conflict_table || is_sql_type_error { insert_null = true; } } @@ -3984,6 +4066,10 @@ async fn make_inserts( verbose: bool, pool: &AnyPool, ) -> (String, Vec, String, Vec) { + let normal_table_name = match table_name.strip_suffix("_conflict") { + None => table_name.clone(), + Some(base) => base.to_string(), + }; let mut lines = vec![]; let mut params = vec![]; let mut message_lines = vec![]; @@ -4002,8 +4088,17 @@ async fn make_inserts( for cell_message in &cell.messages { let level = cell_message.get("level").and_then(|l| l.as_str()).unwrap(); let rule = cell_message.get("rule").and_then(|l| l.as_str()).unwrap(); - if level == "error" && causes_db_error(&rule) { - if !table_name.ends_with("_conflict") || rule.starts_with("datatype:") { + let is_db_key_error = is_db_key_error(&rule); + let sql_type = get_sql_type_from_global_config( + config, + &normal_table_name, + column, + pool, + ) + .unwrap(); + let is_sql_type_error = is_sql_type_error(&sql_type, &cell.value); + if level == "error" && (is_db_key_error || is_sql_type_error) { + if !table_name.ends_with("_conflict") || is_sql_type_error { insert_null = true; break; } From 474297105547f883b65d06821bf8464ff97beca2 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 20 Oct 2023 10:22:32 +0200 Subject: [PATCH 07/14] refactor query_with_message_values() --- src/lib.rs | 84 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 74b0f514..60965045 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1343,20 +1343,54 @@ pub async fn valve( Ok(config.to_string()) } -/// Given a table name, a global configuration map, and a database connection pool, construct an -/// SQL query that one can use to get the logical contents of the row (whether or not the row is -/// valid) including any messages. -pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &AnyPool) -> String { +/// Given a table name, a column name, and a database pool, construct an SQL string to extract the +/// value of the column, such that when the value of a given column is null, the query attempts to +/// extract it from the message table. Returns a String representing the SQL to retrieve the value +/// of the column. +pub fn query_column_with_message_value(table: &str, column: &str, pool: &AnyPool) -> String { let table = match table.strip_suffix("_conflict") { None => table.clone(), Some(base) => base, }; + let is_clause = if pool.any_kind() == AnyKind::Sqlite { "IS" } else { "IS NOT DISTINCT FROM" }; + format!( + r#"CASE + WHEN "{column}" {is_clause} NULL THEN ( + SELECT value + FROM "message" + WHERE "row" = "row_number" + AND "column" = '{column}' + AND "table" = '{table}' + ORDER BY "message_id" DESC + LIMIT 1 + ) + ELSE {casted_column} + END AS "{column}_with_mval""#, + casted_column = if pool.any_kind() == AnyKind::Sqlite { + cast_column_sql_to_text(column, "non-text") + } else { + format!("\"{}\"::TEXT", column) + }, + column = column, + table = table, + ) +} + +/// Given a table name, a global configuration map, and a database connection pool, construct an +/// SQL query that one can use to get the logical contents of the table, such that when the value +/// of a given column is null, the query attempts to extract it from the message table. Returns a +/// String representing the query. +pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &AnyPool) -> String { + let table = match table.strip_suffix("_conflict") { + None => table.clone(), + Some(base) => base, + }; let real_columns = global_config .get("table") .and_then(|t| t.get(table)) @@ -1370,34 +1404,12 @@ pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &A let mut inner_columns = real_columns .iter() - .map(|c| { - format!( - r#"CASE - WHEN "{column}" {is_clause} NULL THEN ( - SELECT value - FROM "message" - WHERE "row" = "row_number" - AND "column" = '{column}' - AND "table" = '{table}' - ORDER BY "message_id" DESC - LIMIT 1 - ) - ELSE {casted_column} - END AS "{column}_extended""#, - casted_column = if pool.any_kind() == AnyKind::Sqlite { - cast_column_sql_to_text(c, "non-text") - } else { - format!("\"{}\"::TEXT", c) - }, - column = c, - table = table, - ) - }) + .map(|column| query_column_with_message_value(table, column, pool)) .collect::>(); let mut outer_columns = real_columns .iter() - .map(|c| format!("t.\"{}_extended\"", c)) + .map(|c| format!("t.\"{}_with_mval\"", c)) .collect::>(); let inner_columns = { @@ -1442,7 +1454,7 @@ pub async fn get_affected_rows( // which the value of the column is the same as `value` let sql = { format!( - r#"{main_query} WHERE "{column}_extended" = '{value}'{except}"#, + r#"{main_query} WHERE "{column}_with_mval" = '{value}'{except}"#, main_query = query_with_message_values(table, global_config, pool), column = column, value = value, @@ -1481,7 +1493,7 @@ pub async fn get_affected_rows( "valid": true, "messages": json!([]), }); - let cname = cname.strip_suffix("_extended").unwrap(); + let cname = cname.strip_suffix("_with_mval").unwrap(); table_row.insert(cname.to_string(), json!(cell)); } } @@ -1536,19 +1548,19 @@ pub async fn get_row_from_db( let mut row = SerdeMap::new(); for column in sql_row.columns() { - let cname_extended = column.name(); - if !vec!["row_number", "message"].contains(&cname_extended) { - let raw_value = sql_row.try_get_raw(format!(r#"{}"#, cname_extended).as_str())?; + let cname = column.name(); + if !vec!["row_number", "message"].contains(&cname) { + let raw_value = sql_row.try_get_raw(format!(r#"{}"#, cname).as_str())?; let value; if !raw_value.is_null() { // The extended query returned by query_with_message_values() casts all column // values to text, so we pass "text" to get_column_value() for every column: - value = get_column_value(&sql_row, &cname_extended, "text"); + value = get_column_value(&sql_row, &cname, "text"); } else { value = String::from(""); } - let cname = match cname_extended.strip_suffix("_extended") { - None => cname_extended.clone(), + let cname = match cname.strip_suffix("_with_mval") { + None => cname.clone(), Some(cname) => cname, }; let column_messages = messages From a9bb252ddfd1e5f5be23ca87e84639916d7333d7 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 20 Oct 2023 11:03:13 +0200 Subject: [PATCH 08/14] do not unnecessarily strip _conflict from table names. This is not necessary in most cases and makes the code confusing. There was a need, previously, because we used to call the update/insert/delete single row functions directly on the conflict table in some cases - the reason being that the update_row function had two possible paths, one direct and one indirect. Now that it is always indirect we can clean this up. The only time where it is still needed is when we are actually generating insert statements. --- src/lib.rs | 60 +++---------------------------------------------- src/validate.rs | 6 ----- 2 files changed, 3 insertions(+), 63 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 60965045..392af3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -732,10 +732,6 @@ pub fn get_parsed_structure_conditions( /// Strings, with the first string being a SQL statement for dropping the view, and the second /// string being a SQL statement for creating it. fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table); let message_t; if pool.any_kind() == AnyKind::Postgres { @@ -855,10 +851,6 @@ fn get_sql_for_user_view( table: &str, pool: &AnyPool, ) -> (String, String) { - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; let is_clause = if pool.any_kind() == AnyKind::Sqlite { "IS" } else { @@ -1348,11 +1340,6 @@ pub async fn valve( /// extract it from the message table. Returns a String representing the SQL to retrieve the value /// of the column. pub fn query_column_with_message_value(table: &str, column: &str, pool: &AnyPool) -> String { - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; - let is_clause = if pool.any_kind() == AnyKind::Sqlite { "IS" } else { @@ -1387,10 +1374,6 @@ pub fn query_column_with_message_value(table: &str, column: &str, pool: &AnyPool /// of a given column is null, the query attempts to extract it from the message table. Returns a /// String representing the query. pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &AnyPool) -> String { - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; let real_columns = global_config .get("table") .and_then(|t| t.get(table)) @@ -1998,11 +1981,6 @@ pub async fn record_row_change( } } - // Always ignore the table suffix when recording a row change: - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; let summary = summarize(from, to).map_err(|e| SqlxCErr(e.into()))?; let (from, to) = (to_text(from, true), to_text(to, true)); let sql = format!( @@ -2532,12 +2510,6 @@ pub async fn insert_new_row_tx( new_row_number: Option, skip_validation: bool, ) -> Result { - // Remove any _conflict suffix from the table name: - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; - // Send the row through the row validator to determine if any fields are problematic and // to mark them with appropriate messages: let row = if !skip_validation { @@ -2795,12 +2767,6 @@ pub async fn delete_row_tx( table: &str, row_number: &u32, ) -> Result<(), sqlx::Error> { - // Remove any _conflict suffix from the table name: - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; - // Used to validate the given row, counterfactually, "as if" the row did not exist in the // database: let query_as_if = QueryAsIf { @@ -2951,12 +2917,6 @@ pub async fn update_row_tx( skip_validation: bool, do_not_recurse: bool, ) -> Result<(), sqlx::Error> { - // Remove any _conflict suffix from the table name: - let table = match table.strip_suffix("_conflict") { - None => table.clone(), - Some(base) => base, - }; - // First, look through the valve config to see which tables are dependent on this table and find // the rows that need to be updated. The variable query_as_if is used to validate the given row, // counterfactually, "as if" the version of the row in the database currently were replaced with @@ -3337,15 +3297,9 @@ pub fn get_sql_type_from_global_config( .get("datatype") .and_then(|d| d.as_object()) .unwrap(); - let normal_table_name; - if let Some(s) = table.strip_suffix("_conflict") { - normal_table_name = String::from(s); - } else { - normal_table_name = table.to_string(); - } let dt = global_config .get("table") - .and_then(|t| t.get(normal_table_name)) + .and_then(|t| t.get(table)) .and_then(|t| t.get("column")) .and_then(|c| c.get(column)) .and_then(|c| c.get("datatype")) @@ -4119,7 +4073,7 @@ async fn make_inserts( } if !insert_null { let sql_type = - get_sql_type_from_global_config(&config, &table_name, &column, pool) + get_sql_type_from_global_config(&config, &normal_table_name, &column, pool) .unwrap(); values.push(cast_sql_param_from_text(&sql_type)); params.push(cell.value.clone()); @@ -4140,15 +4094,7 @@ async fn make_inserts( ]; let message = message.as_object().unwrap(); - message_params.push({ - let normal_table_name; - if let Some(s) = table_name.strip_suffix("_conflict") { - normal_table_name = String::from(s); - } else { - normal_table_name = table_name.to_string(); - } - normal_table_name - }); + message_params.push(normal_table_name.clone()); message_params.push(column.clone()); message_params.push(cell.value.clone()); message_params.push( diff --git a/src/validate.rs b/src/validate.rs index a028594a..5d476be7 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -75,12 +75,6 @@ pub async fn validate_row( None => default_tx, }; - // Remove the _conflict suffix from the table_name if it has one: - let table_name = match table_name.strip_suffix("_conflict") { - None => table_name.clone(), - Some(base) => base, - }; - // Initialize the result row with the values from the given row: let mut result_row = ResultRow { row_number: row_number, From 5fb8a6435d042805039ccbc102984fcf5d1c15d3 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 28 Oct 2023 11:59:28 -0400 Subject: [PATCH 09/14] tweak batch loading and update validation functions to handle new null/conflict logic --- src/lib.rs | 492 ++++++++++------------ src/validate.rs | 366 +++++----------- test/expected/messages.tsv | 1 + test/expected/messages_a1.tsv | 1 + test/expected/messages_after_api_test.tsv | 1 + 5 files changed, 322 insertions(+), 539 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 392af3fc..fc0cfc91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1358,7 +1358,7 @@ pub fn query_column_with_message_value(table: &str, column: &str, pool: &AnyPool LIMIT 1 ) ELSE {casted_column} - END AS "{column}_with_mval""#, + END AS "{column}""#, casted_column = if pool.any_kind() == AnyKind::Sqlite { cast_column_sql_to_text(column, "non-text") } else { @@ -1392,7 +1392,7 @@ pub fn query_with_message_values(table: &str, global_config: &SerdeMap, pool: &A let mut outer_columns = real_columns .iter() - .map(|c| format!("t.\"{}_with_mval\"", c)) + .map(|c| format!("t.\"{}\"", c)) .collect::>(); let inner_columns = { @@ -1437,7 +1437,7 @@ pub async fn get_affected_rows( // which the value of the column is the same as `value` let sql = { format!( - r#"{main_query} WHERE "{column}_with_mval" = '{value}'{except}"#, + r#"{main_query} WHERE "{column}" = '{value}'{except}"#, main_query = query_with_message_values(table, global_config, pool), column = column, value = value, @@ -1476,7 +1476,6 @@ pub async fn get_affected_rows( "valid": true, "messages": json!([]), }); - let cname = cname.strip_suffix("_with_mval").unwrap(); table_row.insert(cname.to_string(), json!(cell)); } } @@ -1542,10 +1541,6 @@ pub async fn get_row_from_db( } else { value = String::from(""); } - let cname = match cname.strip_suffix("_with_mval") { - None => cname.clone(), - Some(cname) => cname, - }; let column_messages = messages .iter() .filter(|m| m.get("column").unwrap().as_str() == Some(cname)) @@ -2402,22 +2397,7 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec bool { - vec![ - "key:primary", - "key:unique", - "key:foreign", - "tree:child-unique", - "tree:foreign", - "under:not-in-tree", - "under:not-under", - ] - .contains(&rule) -} - -/// TODO: Add a docstring here +/// Given a SQL type and a value, return true if the value does not conform to the SQL type. fn is_sql_type_error(sql_type: &str, value: &str) -> bool { let sql_type = sql_type.to_lowercase(); if sql_type == "numeric" { @@ -2566,75 +2546,54 @@ pub async fn insert_new_row_tx( let mut insert_columns = vec![]; let mut insert_values = vec![]; let mut insert_params = vec![]; - let mut messages = vec![]; + let mut all_messages = vec![]; let sorted_datatypes = get_sorted_datatypes(global_config); let conflict_columns = get_conflict_columns(global_config, table); let mut use_conflict_table = false; for (column, cell) in row.iter() { insert_columns.append(&mut vec![format!(r#""{}""#, column)]); + let cell = cell .as_object() .ok_or(SqlxCErr(format!("Cell {:?} is not an object", cell).into()))?; - let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).ok_or(SqlxCErr( + let valid = cell.get("valid").and_then(|v| v.as_bool()).ok_or(SqlxCErr( format!("No bool named 'valid' in {:?}", cell).into(), ))?; - let cell_value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( + let value = cell.get("value").and_then(|v| v.as_str()).ok_or(SqlxCErr( format!("No string named 'value' in {:?}", cell).into(), ))?; + let messages = sort_messages( + &sorted_datatypes, + cell.get("messages") + .and_then(|m| m.as_array()) + .ok_or(SqlxCErr( + format!("No array named 'messages' in {:?}", cell).into(), + ))?, + ); - if !use_conflict_table && !cell_valid && conflict_columns.contains(&json!(column)) { - use_conflict_table = true; - } - let mut insert_null = false; - if !cell_valid { - let cell_messages = sort_messages( - &sorted_datatypes, - cell.get("messages") - .and_then(|m| m.as_array()) - .ok_or(SqlxCErr( - format!("No array named 'messages' in {:?}", cell).into(), - ))?, - ); - for cell_message in cell_messages { - let level = cell_message - .get("level") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'level' in {:?}", cell).into()))?; - let rule = cell_message - .get("rule") - .and_then(|l| l.as_str()) - .ok_or(SqlxCErr(format!("No 'rule' in {:?}", cell).into()))?; - let is_db_key_error = is_db_key_error(&rule); - let sql_type = get_sql_type_from_global_config(global_config, table, column, pool) - .ok_or(SqlxCErr( - format!("Could not get SQL type for {}.{}", table, column).into(), - ))?; - let is_sql_type_error = is_sql_type_error(&sql_type, cell_value); - if level == "error" && (is_db_key_error || is_sql_type_error) { - if !use_conflict_table || is_sql_type_error { - insert_null = true; - } - } - messages.push(json!({ - "column": column, - "value": cell_value, - "level": cell_message.get("level").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'level' in {:?}", cell_message).into()) - )?, - "rule": cell_message.get("rule").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'rule' in {:?}", cell_message).into()) - )?, - "message": cell_message.get("message").and_then(|s| s.as_str()) - .ok_or( - SqlxCErr(format!("No 'message' in {:?}", cell_message).into()) - )?, - })); - } + for message in messages { + all_messages.push(json!({ + "column": column, + "value": value, + "level": message.get("level").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'level' in {:?}", message).into()) + )?, + "rule": message.get("rule").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'rule' in {:?}", message).into()) + )?, + "message": message.get("message").and_then(|s| s.as_str()) + .ok_or( + SqlxCErr(format!("No 'message' in {:?}", message).into()) + )?, + })); } - if insert_null { + let sql_type = get_sql_type_from_global_config(global_config, table, column, pool).ok_or( + SqlxCErr(format!("Could not get SQL type for {}.{}", table, column).into()), + )?; + if is_sql_type_error(&sql_type, value) { insert_values.push(String::from("NULL")); } else { let sql_type = get_sql_type_from_global_config(&global_config, &table, &column, pool) @@ -2642,7 +2601,11 @@ pub async fn insert_new_row_tx( format!("Unable to determine SQL type for {}.{}", table, column).into(), ))?; insert_values.push(cast_sql_param_from_text(&sql_type)); - insert_params.push(String::from(cell_value)); + insert_params.push(String::from(value)); + } + + if !use_conflict_table && !valid && conflict_columns.contains(&json!(column)) { + use_conflict_table = true; } } @@ -2689,7 +2652,7 @@ pub async fn insert_new_row_tx( query.execute(tx.acquire().await?).await?; // Next add any validation messages to the message table: - for m in messages { + for m in all_messages { let column = m.get("column").and_then(|c| c.as_str()).unwrap(); let value = m.get("value").and_then(|c| c.as_str()).unwrap(); let level = m.get("level").and_then(|c| c.as_str()).unwrap(); @@ -3305,7 +3268,7 @@ pub fn get_sql_type_from_global_config( .and_then(|c| c.get("datatype")) .and_then(|d| d.as_str()) .and_then(|d| Some(d.to_string())) - .unwrap(); + .expect(&format!("Could not get datatype for {}.{}", table, column)); get_sql_type(&dt_config, &dt, pool) } @@ -4018,137 +3981,149 @@ async fn make_inserts( pool: &AnyPool, ) -> Result< ( - (String, Vec, String, Vec), - (String, Vec, String, Vec), + String, + Vec, + String, + Vec, + String, + Vec, ), sqlx::Error, > { + fn is_conflict_row(row: &ResultRow, conflict_columns: &Vec) -> bool { + for (column, cell) in &row.contents { + let column = SerdeValue::String(column.to_string()); + if !cell.valid && conflict_columns.contains(&column) { + return true; + } + } + return false; + } + fn generate_sql( config: &SerdeMap, - table_name: &String, - column_names: &Vec, - rows: &Vec, + main_table: &String, + columns: &Vec, + rows: &mut Vec, + chunk_number: usize, messages_stats: &mut HashMap, verbose: bool, pool: &AnyPool, - ) -> (String, Vec, String, Vec) { - let normal_table_name = match table_name.strip_suffix("_conflict") { - None => table_name.clone(), - Some(base) => base.to_string(), - }; - let mut lines = vec![]; - let mut params = vec![]; + ) -> ( + String, + Vec, + String, + Vec, + String, + Vec, + ) { + let mut main_lines = vec![]; + let mut main_params = vec![]; + let mut conflict_lines = vec![]; + let mut conflict_params = vec![]; let mut message_lines = vec![]; let mut message_params = vec![]; let sorted_datatypes = get_sorted_datatypes(config); - for row in rows.iter() { - let mut values = vec![format!("{}", row.row_number.unwrap())]; - for column in column_names { + let conflict_columns = get_conflict_columns(config, main_table); + for (i, row) in rows.iter_mut().enumerate() { + // enumerate begins at 0 but we need to begin at 1: + let i = i + 1; + row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32); + let use_conflict_table = is_conflict_row(&row, &conflict_columns); + let mut row_values = vec![format!("{}", row.row_number.unwrap())]; + let mut row_params = vec![]; + for column in columns { let cell = row.contents.get(column).unwrap(); // Insert the value of the cell into the column unless inserting it will cause a db // error or it has the nulltype field set, in which case insert NULL: - let mut insert_null = false; - if cell.nulltype != None { - insert_null = true; - } else if !cell.valid { - for cell_message in &cell.messages { - let level = cell_message.get("level").and_then(|l| l.as_str()).unwrap(); - let rule = cell_message.get("rule").and_then(|l| l.as_str()).unwrap(); - let is_db_key_error = is_db_key_error(&rule); - let sql_type = get_sql_type_from_global_config( - config, - &normal_table_name, - column, - pool, - ) - .unwrap(); - let is_sql_type_error = is_sql_type_error(&sql_type, &cell.value); - if level == "error" && (is_db_key_error || is_sql_type_error) { - if !table_name.ends_with("_conflict") || is_sql_type_error { - insert_null = true; - break; - } - } - } - } - if !insert_null { - let sql_type = - get_sql_type_from_global_config(&config, &normal_table_name, &column, pool) - .unwrap(); - values.push(cast_sql_param_from_text(&sql_type)); - params.push(cell.value.clone()); + let sql_type = + get_sql_type_from_global_config(config, &main_table, column, pool).unwrap(); + if cell.nulltype != None || is_sql_type_error(&sql_type, &cell.value) { + row_values.push(String::from("NULL")); } else { - values.push(String::from("NULL")); + row_values.push(cast_sql_param_from_text(&sql_type)); + row_params.push(cell.value.clone()); } - // If the cell isn't valid, generate values and params to be used for the insert to - // the message table: - if !cell.valid { - if verbose { - add_message_counts(&cell.messages, messages_stats); - } - for message in sort_messages(&sorted_datatypes, &cell.messages) { - let row = row.row_number.unwrap().to_string(); - let message_values = vec![ - SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, - ]; - - let message = message.as_object().unwrap(); - message_params.push(normal_table_name.clone()); - message_params.push(column.clone()); - message_params.push(cell.value.clone()); - message_params.push( - message - .get("level") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - message_params.push( - message - .get("rule") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - message_params.push( - message - .get("message") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - ); - let line = message_values.join(", "); - let line = format!("({})", line); - message_lines.push(line); - } + // Generate values and params to be used for the insert to the message table: + if verbose { + add_message_counts(&cell.messages, messages_stats); + } + + for message in sort_messages(&sorted_datatypes, &cell.messages) { + let row = row.row_number.unwrap().to_string(); + let message_values = vec![ + SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, + ]; + + let message = message.as_object().unwrap(); + message_params.push(main_table.clone()); + message_params.push(column.clone()); + message_params.push(cell.value.clone()); + message_params.push( + message + .get("level") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + message_params.push( + message + .get("rule") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + message_params.push( + message + .get("message") + .and_then(|s| s.as_str()) + .unwrap() + .to_string(), + ); + let line = message_values.join(", "); + let line = format!("({})", line); + message_lines.push(line); } } - let line = values.join(", "); + let line = row_values.join(", "); let line = format!("({})", line); - lines.push(line); + if use_conflict_table { + conflict_lines.push(line); + conflict_params.append(&mut row_params); + } else { + main_lines.push(line); + main_params.append(&mut row_params); + } } // Generate the SQL output for the insert to the table: - let mut output = String::from(""); - if !lines.is_empty() { - output.push_str(&format!( - r#"INSERT INTO "{}" ("row_number", {}) VALUES"#, - table_name, - { - let mut all_columns = vec![]; - for column_name in column_names { - let quoted_column_name = format!(r#""{}""#, column_name); - all_columns.push(quoted_column_name); + fn get_table_output(lines: &Vec, table: &str, columns: &Vec) -> String { + let mut output = String::from(""); + if !lines.is_empty() { + output.push_str(&format!( + r#"INSERT INTO "{}" ("row_number", {}) VALUES"#, + table, + { + let mut quoted_columns = vec![]; + for column in columns { + let quoted_column = format!(r#""{}""#, column); + quoted_columns.push(quoted_column); + } + quoted_columns.join(", ") } - all_columns.join(", ") - } - )); - output.push_str("\n"); - output.push_str(&lines.join(",\n")); - output.push_str(";"); + )); + output.push_str("\n"); + output.push_str(&lines.join(",\n")); + output.push_str(";"); + } + output } + let main_output = get_table_output(&main_lines, &main_table, &columns); + let conflict_table = format!("{}_conflict", main_table); + let conflict_output = get_table_output(&conflict_lines, &conflict_table, &columns); + // Generate the output for the insert to the message table: let mut message_output = String::from(""); if !message_lines.is_empty() { @@ -4161,31 +4136,14 @@ async fn make_inserts( message_output.push_str(";"); } - (output, params, message_output, message_params) - } - - fn has_error_in_conflict_column(row: &ResultRow, conflict_columns: &Vec) -> bool { - for (column, cell) in &row.contents { - let column = SerdeValue::String(column.to_string()); - if !cell.valid && conflict_columns.contains(&column) { - return true; - } - } - return false; - } - - let conflict_columns = get_conflict_columns(config, table_name); - let mut main_rows = vec![]; - let mut conflict_rows = vec![]; - for (i, row) in rows.iter_mut().enumerate() { - // enumerate begins at 0 but we need to begin at 1: - let i = i + 1; - row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32); - if has_error_in_conflict_column(&row, &conflict_columns) { - conflict_rows.push(row.clone()); - } else { - main_rows.push(row.clone()); - } + ( + main_output, + main_params, + conflict_output, + conflict_params, + message_output, + message_params, + ) } // Use the "column_order" field of the table config for this table to retrieve the column names @@ -4200,34 +4158,25 @@ async fn make_inserts( .map(|v| v.as_str().unwrap().to_string()) .collect::>(); - let (main_sql, main_params, main_message_sql, main_message_params) = generate_sql( - &config, - &table_name, - &column_names, - &main_rows, - messages_stats, - verbose, - pool, - ); - let (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params) = + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = generate_sql( &config, - &format!("{}_conflict", table_name), + &table_name, &column_names, - &conflict_rows, + rows, + chunk_number, messages_stats, verbose, pool, ); Ok(( - (main_sql, main_params, main_message_sql, main_message_params), - ( - conflict_sql, - conflict_params, - conflict_message_sql, - conflict_message_params, - ), + main_sql, + main_params, + conflict_sql, + conflict_params, + message_sql, + message_params, )) } @@ -4260,19 +4209,17 @@ async fn validate_rows_inter_and_insert( tmp_messages_stats.insert("error".to_string(), 0); tmp_messages_stats.insert("warning".to_string(), 0); tmp_messages_stats.insert("info".to_string(), 0); - let ( - (main_sql, main_params, main_message_sql, main_message_params), - (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params), - ) = make_inserts( - config, - table_name, - rows, - chunk_number, - &mut tmp_messages_stats, - verbose, - pool, - ) - .await?; + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + &mut tmp_messages_stats, + verbose, + pool, + ) + .await?; let main_sql = local_sql_syntax(&pool, &main_sql); let mut main_query = sqlx_query(&main_sql); @@ -4289,19 +4236,12 @@ async fn validate_rows_inter_and_insert( } conflict_query.execute(pool).await?; - let main_message_sql = local_sql_syntax(&pool, &main_message_sql); - let mut main_message_query = sqlx_query(&main_message_sql); - for param in &main_message_params { - main_message_query = main_message_query.bind(param); + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); } - main_message_query.execute(pool).await?; - - let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql); - let mut conflict_message_query = sqlx_query(&conflict_message_sql); - for param in &conflict_message_params { - conflict_message_query = conflict_message_query.bind(param); - } - conflict_message_query.execute(pool).await?; + message_query.execute(pool).await?; if verbose { let curr_errors = messages_stats.get("error").unwrap(); @@ -4323,20 +4263,17 @@ async fn validate_rows_inter_and_insert( } Err(_) => { validate_rows_constraints(config, pool, table_name, rows).await?; - - let ( - (main_sql, main_params, main_message_sql, main_message_params), - (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params), - ) = make_inserts( - config, - table_name, - rows, - chunk_number, - messages_stats, - verbose, - pool, - ) - .await?; + let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) = + make_inserts( + config, + table_name, + rows, + chunk_number, + messages_stats, + verbose, + pool, + ) + .await?; let main_sql = local_sql_syntax(&pool, &main_sql); let mut main_query = sqlx_query(&main_sql); @@ -4352,19 +4289,12 @@ async fn validate_rows_inter_and_insert( } conflict_query.execute(pool).await?; - let main_message_sql = local_sql_syntax(&pool, &main_message_sql); - let mut main_message_query = sqlx_query(&main_message_sql); - for param in &main_message_params { - main_message_query = main_message_query.bind(param); - } - main_message_query.execute(pool).await?; - - let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql); - let mut conflict_message_query = sqlx_query(&conflict_message_sql); - for param in &conflict_message_params { - conflict_message_query = conflict_message_query.bind(param); + let message_sql = local_sql_syntax(&pool, &message_sql); + let mut message_query = sqlx_query(&message_sql); + for param in &message_params { + message_query = message_query.bind(param); } - conflict_message_query.execute(pool).await?; + message_query.execute(pool).await?; } }; diff --git a/src/validate.rs b/src/validate.rs index 5d476be7..326b9eca 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -9,8 +9,8 @@ use std::collections::HashMap; use crate::{ ast::Expression, cast_column_sql_to_text, cast_sql_param_from_text, get_column_value, - get_sql_type_from_global_config, local_sql_syntax, ColumnRule, CompiledCondition, - ParsedStructure, SerdeMap, SQL_PARAM, + get_sql_type_from_global_config, is_sql_type_error, local_sql_syntax, ColumnRule, + CompiledCondition, ParsedStructure, SerdeMap, SQL_PARAM, }; /// Represents a particular cell in a particular row of data with vaildation results. @@ -157,10 +157,12 @@ pub async fn validate_row( cell, ); - // We don't do any further validation on cells that have datatype violations because + // We don't do any further validation on cells that have SQL type violations because // they can result in database errors when, for instance, we compare a numeric with a // non-numeric type. - if cell.valid || !contains_dt_violation(&cell.messages) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if !is_sql_type_error(&sql_type, &cell.value) { // TODO: Pass the query_as_if parameter to validate_cell_trees. validate_cell_trees( config, @@ -451,6 +453,7 @@ pub async fn validate_under( .and_then(|t| t.as_array()) .unwrap(); + let view_name = format!("{}_view", table_name); for ukey in ukeys { let ukey = ukey.as_object().unwrap(); let tree_table = ukey.get("ttable").and_then(|tt| tt.as_str()).unwrap(); @@ -479,7 +482,8 @@ pub async fn validate_under( let mut extra_clause; let mut params; if let Some(ref extra_row) = extra_row { - (extra_clause, params) = select_with_extra_row(&config, extra_row, table_name, pool); + (extra_clause, params) = + select_with_extra_row(&config, extra_row, table_name, &view_name, pool); } else { extra_clause = String::new(); params = vec![]; @@ -497,9 +501,9 @@ pub async fn validate_under( // understood to be under themselves. let effective_table; if !extra_clause.is_empty() { - effective_table = format!("{}_ext", table_name); + effective_table = format!("{}_ext", view_name); } else { - effective_table = table_name.clone(); + effective_table = view_name.clone(); } let effective_tree; @@ -535,35 +539,30 @@ pub async fn validate_under( let sql = local_sql_syntax( &pool, &format!( - r#"{} {} + r#"{tree_sql} {extra_clause} SELECT "row_number", - "{}"."{}", + "{effective_table}"."{column}", CASE - WHEN "{}"."{}" IN ( - SELECT "{}" FROM "{}" + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_child}" FROM "{effective_tree}" ) THEN 1 ELSE 0 END AS "is_in_tree", CASE - WHEN "{}"."{}" IN ( - SELECT "{}" FROM "tree" + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_parent}" FROM "tree" ) THEN 0 ELSE 1 END AS "is_under" - FROM "{}""#, - tree_sql, - extra_clause, - effective_table, - column, - effective_table, - column, - tree_child, - effective_tree, - effective_table, - column, - tree_parent, - effective_table, + FROM "{effective_table}""#, + tree_sql = tree_sql, + extra_clause = extra_clause, + effective_table = effective_table, + column = column, + tree_child = tree_child, + effective_tree = effective_tree, + tree_parent = tree_parent, ), ); @@ -591,87 +590,33 @@ pub async fn validate_under( } let raw_column_val = row.try_get_raw(format!(r#"{}"#, column).as_str()).unwrap(); - let mut column_val = String::from(""); - if raw_column_val.is_null() { - // If the column value already contains a different error, its value will be null - // and it will be returned by the above query regardless of whether it actually - // violates the tree's foreign constraint. So we check the value from the message - // table instead: - let message_sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT "value", "level", "rule", "message" - FROM "message" - WHERE "table" = {} - AND "row" = {} - AND "column" = {}"#, - SQL_PARAM, SQL_PARAM, SQL_PARAM - ), - ); - let mut message_query = sqlx_query(&message_sql); - message_query = message_query.bind(&table_name); - message_query = message_query.bind(&row_number); - message_query = message_query.bind(column); - let message_rows = { - if let None = tx { - message_query.fetch_all(pool).await? - } else { - message_query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - // If there are no rows in the message table then the cell is legitimately empty and - // we can skip this row: - if message_rows.is_empty() { - continue; - } - - let mut has_dt_violation = false; - for mrow in &message_rows { - let rule: &str = mrow.get_unchecked("rule"); - if rule.starts_with("datatype:") { - has_dt_violation = true; - break; - } else { - let value: &str = mrow.get_unchecked("value"); - column_val = value.to_string(); - } - } - // If the value in the column has already been deemed to be invalid because - // of a datatype error, then just skip this row. This is to avoid potential database - // errors that might arise if we compare a numeric with a non-numeric type. - if has_dt_violation { - continue; + if !raw_column_val.is_null() { + let column_val = get_column_value(&row, &column, &sql_type); + // We use i32 instead of i64 (which we use for row_number) here because, unlike + // row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints. + let is_in_tree: i32 = row.get("is_in_tree"); + let is_under: i32 = row.get("is_under"); + if is_in_tree == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-in-tree", + "message": format!("Value '{}' of column {} is not in {}.{}", + column_val, column, tree_table, tree_child).as_str(), + })); + } else if is_under == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-under", + "message": format!("Value '{}' of column {} is not under '{}'", + column_val, column, uval.clone()).as_str(), + })); } - } else { - column_val = get_column_value(&row, &column, &sql_type); - } - - // We use i32 instead of i64 (which we use for row_number) here because, unlike - // row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints. - let is_in_tree: i32 = row.get("is_in_tree"); - let is_under: i32 = row.get("is_under"); - if is_in_tree == 0 { - results.push(json!({ - "row_number": row_number as u32, - "column": column, - "value": column_val, - "level": "error", - "rule": "under:not-in-tree", - "message": format!("Value '{}' of column {} is not in {}.{}", - column_val, column, tree_table, tree_child).as_str(), - })); - } else if is_under == 0 { - results.push(json!({ - "row_number": row_number as u32, - "column": column, - "value": column_val, - "level": "error", - "rule": "under:not-under", - "message": format!("Value '{}' of column {} is not under '{}'", - column_val, column, uval.clone()).as_str(), - })); } } } @@ -699,6 +644,7 @@ pub async fn validate_tree_foreign_keys( .and_then(|t| t.as_array()) .unwrap(); + let view_name = format!("{}_view", table_name); let mut results = vec![]; for tkey in tkeys { let tkey = tkey.as_object().unwrap(); @@ -709,7 +655,8 @@ pub async fn validate_tree_foreign_keys( let with_clause; let params; if let Some(ref extra_row) = extra_row { - (with_clause, params) = select_with_extra_row(&config, extra_row, table_name, pool); + (with_clause, params) = + select_with_extra_row(&config, extra_row, table_name, &view_name, pool); } else { with_clause = String::new(); params = vec![]; @@ -717,29 +664,27 @@ pub async fn validate_tree_foreign_keys( let effective_table_name; if !with_clause.is_empty() { - effective_table_name = format!("{}_ext", table_name); + effective_table_name = format!("{}_ext", view_name); } else { - effective_table_name = table_name.clone(); + effective_table_name = view_name.clone(); } let sql = local_sql_syntax( &pool, &format!( - r#"{} + r#"{with_clause} SELECT - t1."row_number", t1."{}" - FROM "{}" t1 + t1."row_number", t1."{parent_col}" + FROM "{effective_table_name}" t1 WHERE NOT EXISTS ( SELECT 1 - FROM "{}" t2 - WHERE t2."{}" = t1."{}" + FROM "{effective_table_name}" t2 + WHERE t2."{child_col}" = t1."{parent_col}" )"#, - with_clause, - parent_col, - effective_table_name, - effective_table_name, - child_col, - parent_col + with_clause = with_clause, + parent_col = parent_col, + effective_table_name = effective_table_name, + child_col = child_col, ), ); @@ -767,99 +712,18 @@ pub async fn validate_tree_foreign_keys( let raw_parent_val = row .try_get_raw(format!(r#"{}"#, parent_col).as_str()) .unwrap(); - let mut parent_val = String::from(""); if !raw_parent_val.is_null() { - parent_val = get_column_value(&row, &parent_col, &parent_sql_type); - } else { - // If the parent column already contains a different error, its value will be null - // and it will be returned by the above query regardless of whether it actually - // violates the tree's foreign constraint. So we check the value from the message - // table instead: - let message_sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT "value", "level", "rule", "message" - FROM "message" - WHERE "table" = {} - AND "row" = {} - AND "column" = {}"#, - SQL_PARAM, SQL_PARAM, SQL_PARAM - ), - ); - let mut message_query = sqlx_query(&message_sql); - message_query = message_query.bind(&table_name); - message_query = message_query.bind(&row_number); - message_query = message_query.bind(parent_col); - let message_rows = { - if let None = tx { - message_query.fetch_all(pool).await? - } else { - message_query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - // If there are no rows in the message table then the cell is legitimately empty and - // we can skip this row: - if message_rows.is_empty() { - continue; - } - - let mut has_dt_violation = false; - for mrow in &message_rows { - let rule: &str = mrow.get_unchecked("rule"); - if rule.starts_with("datatype:") { - has_dt_violation = true; - break; - } else { - let value: &str = mrow.get_unchecked("value"); - parent_val = value.to_string(); - } - } - // If the value in the parent column has already been deemed to be invalid because - // of a datatype error, then just skip this row. This is to avoid potential database - // errors that might arise if we compare a numeric with a non-numeric type. - if has_dt_violation { - continue; - } - - // Otherwise check if the value from the message table is in the child column. If it - // is there then we are fine, and we can go on to the next row. - let sql_type = - get_sql_type_from_global_config(&config, &table_name, &parent_col, pool) - .unwrap(); - let sql_param = cast_sql_param_from_text(&sql_type); - let sql = local_sql_syntax( - &pool, - &format!( - r#"SELECT 1 FROM "{}" WHERE "{}" = {} LIMIT 1"#, - table_name, child_col, sql_param - ), - ); - let query = sqlx_query(&sql).bind(parent_val.to_string()); - let rows = { - if let None = tx { - query.fetch_all(pool).await? - } else { - query - .fetch_all(tx.as_mut().unwrap().acquire().await?) - .await? - } - }; - if rows.len() > 0 { - continue; - } + let parent_val = get_column_value(&row, &parent_col, &parent_sql_type); + results.push(json!({ + "row_number": row_number as u32, + "column": parent_col, + "value": parent_val, + "level": "error", + "rule": "tree:foreign", + "message": format!("Value '{}' of column {} is not in column {}", + parent_val, parent_col, child_col).as_str(), + })); } - - results.push(json!({ - "row_number": row_number as u32, - "column": parent_col, - "value": parent_val, - "level": "error", - "rule": "tree:foreign", - "message": format!("Value '{}' of column {} is not in column {}", - parent_val, parent_col, child_col).as_str(), - })); } } @@ -894,9 +758,11 @@ pub async fn validate_rows_trees( let context = row.clone(); let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells - // that have datatype violations. We exclude the latter because they can result in + // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - if cell.nulltype == None && (cell.valid || !contains_dt_violation(&cell.messages)) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_trees( config, pool, @@ -951,9 +817,11 @@ pub async fn validate_rows_constraints( for column_name in &column_names { let cell = row.contents.get_mut(column_name).unwrap(); // We don't do any further validation on cells that are legitimately empty, or on cells - // that have datatype violations. We exclude the latter because they can result in + // that have SQL type violations. We exclude the latter because they can result in // database errors when, for instance, we compare a numeric with a non-numeric type. - if cell.nulltype == None && (cell.valid || !contains_dt_violation(&cell.messages)) { + let sql_type = + get_sql_type_from_global_config(&config, table_name, &column_name, pool).unwrap(); + if cell.nulltype == None && !is_sql_type_error(&sql_type, &cell.value) { validate_cell_foreign_constraints( config, pool, @@ -1139,28 +1007,13 @@ fn result_row_to_config_map(incoming: &ResultRow) -> SerdeMap { outgoing } -/// Given a message list, determine if it contains a message corresponding to a dataype violation -fn contains_dt_violation(messages: &Vec) -> bool { - let mut contains_dt_violation = false; - for m in messages { - if m.get("rule") - .and_then(|r| r.as_str()) - .unwrap_or_else(|| "") - .starts_with("datatype:") - { - contains_dt_violation = true; - break; - } - } - contains_dt_violation -} - /// Generate a SQL Select clause that is a union of: (a) the literal values of the given extra row, /// and (b) a Select statement over `table_name` of all the fields in the extra row. fn select_with_extra_row( config: &SerdeMap, extra_row: &ResultRow, - table_name: &str, + table: &str, + effective_table: &str, pool: &AnyPool, ) -> (String, Vec) { let extra_row_len = extra_row.contents.keys().len(); @@ -1173,7 +1026,7 @@ fn select_with_extra_row( let mut second_select = String::from(r#"SELECT "row_number", "#); for (i, (key, content)) in extra_row.contents.iter().enumerate() { - let sql_type = get_sql_type_from_global_config(&config, &table_name, &key, pool).unwrap(); + let sql_type = get_sql_type_from_global_config(&config, &table, &key, pool).unwrap(); let sql_param = cast_sql_param_from_text(&sql_type); // enumerate() begins from 0 but we need to begin at 1: let i = i + 1; @@ -1184,7 +1037,7 @@ fn select_with_extra_row( first_select.push_str(", "); second_select.push_str(", "); } else { - second_select.push_str(format!(r#" FROM "{}""#, table_name).as_str()); + second_select.push_str(format!(r#" FROM "{}""#, effective_table).as_str()); } } @@ -1195,7 +1048,7 @@ fn select_with_extra_row( ( format!( r#"WITH "{}_ext" AS ({} UNION ALL {})"#, - table_name, first_select, second_select + effective_table, first_select, second_select ), params, ) @@ -1235,25 +1088,20 @@ fn with_tree_sql( let sql = format!( r#"WITH RECURSIVE "tree" AS ( - {} - SELECT "{}", "{}" - FROM "{}" - {} + {extra_clause} + SELECT "{child_col}", "{parent_col}" + FROM "{effective_table_name}" + {under_sql} UNION ALL - SELECT "t1"."{}", "t1"."{}" - FROM "{}" AS "t1" - JOIN "tree" AS "t2" ON "t2"."{}" = "t1"."{}" + SELECT "t1"."{child_col}", "t1"."{parent_col}" + FROM "{effective_table_name}" AS "t1" + JOIN "tree" AS "t2" ON "t2"."{parent_col}" = "t1"."{child_col}" )"#, - extra_clause, - child_col, - parent_col, - effective_table_name, - under_sql, - child_col, - parent_col, - effective_table_name, - parent_col, - child_col + extra_clause = extra_clause, + child_col = child_col, + parent_col = parent_col, + effective_table_name = effective_table_name, + under_sql = under_sql, ); (sql, params) @@ -1795,6 +1643,7 @@ async fn validate_cell_trees( get_sql_type_from_global_config(&config, &table_name, &parent_col, pool).unwrap(); let parent_sql_param = cast_sql_param_from_text(&parent_sql_type); let parent_val = cell.value.clone(); + let view_name = format!("{}_view", table_name); for tkey in tkeys { let child_col = tkey.get("child").and_then(|c| c.as_str()).unwrap(); let child_sql_type = @@ -1830,10 +1679,10 @@ async fn validate_cell_trees( let table_name_ext; let extra_clause; if prev_selects.is_empty() { - table_name_ext = table_name.clone(); + table_name_ext = view_name.clone(); extra_clause = String::from(""); } else { - table_name_ext = format!("{}_ext", table_name); + table_name_ext = format!("{}_ext", view_name); extra_clause = format!( r#"WITH "{}" AS ( SELECT "{}", "{}" @@ -1841,7 +1690,7 @@ async fn validate_cell_trees( UNION ALL {} )"#, - table_name_ext, child_col, parent_col, table_name, prev_selects + table_name_ext, child_col, parent_col, view_name, prev_selects ); } @@ -1994,15 +1843,16 @@ async fn validate_cell_unique_constraints( } if is_primary || is_unique || is_tree_child { + let view_name = format!("{}_view", table_name); let mut with_sql = String::new(); - let except_table = format!("{}_exc", table_name); + let except_table = format!("{}_exc", view_name); if let Some(row_number) = row_number { with_sql = format!( r#"WITH "{}" AS ( SELECT * FROM "{}" WHERE "row_number" != {} ) "#, - except_table, table_name, row_number + except_table, view_name, row_number ); } @@ -2010,7 +1860,7 @@ async fn validate_cell_unique_constraints( if !with_sql.is_empty() { query_table = except_table; } else { - query_table = table_name.to_string(); + query_table = view_name.to_string(); } let sql_type = diff --git a/test/expected/messages.tsv b/test/expected/messages.tsv index 353d0906..1ed440af 100644 --- a/test/expected/messages.tsv +++ b/test/expected/messages.tsv @@ -30,6 +30,7 @@ table3 6 parent error tree:foreign Value 'owl:Thing' of column parent is not in table3 7 source error datatype:nonspace source should be text without whitespace CO B table3 7 source error datatype:prefix source should be a prefix for a CURIE CO B table3 7 source error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 7 source error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 8 id error key:unique Values of id must be unique COB:0000013 table3 10 id error key:unique Values of id must be unique VO:0000001 table3 10 label error key:primary Values of label must be unique vaccine diff --git a/test/expected/messages_a1.tsv b/test/expected/messages_a1.tsv index 04153339..8f94c1f4 100644 --- a/test/expected/messages_a1.tsv +++ b/test/expected/messages_a1.tsv @@ -30,6 +30,7 @@ table3 E6 error tree:foreign Value 'owl:Thing' of column parent is not in column table3 A7 error datatype:nonspace source should be text without whitespace CO B table3 A7 error datatype:prefix source should be a prefix for a CURIE CO B table3 A7 error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 A7 error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 B8 error key:unique Values of id must be unique COB:0000013 table3 B10 error key:unique Values of id must be unique VO:0000001 table3 C10 error key:primary Values of label must be unique vaccine diff --git a/test/expected/messages_after_api_test.tsv b/test/expected/messages_after_api_test.tsv index a11ccb0f..f7fe4d42 100644 --- a/test/expected/messages_after_api_test.tsv +++ b/test/expected/messages_after_api_test.tsv @@ -32,6 +32,7 @@ table3 6 parent error tree:foreign Value 'owl:Thing' of column parent is not in table3 7 source error datatype:nonspace source should be text without whitespace CO B table3 7 source error datatype:prefix source should be a prefix for a CURIE CO B table3 7 source error datatype:word source should be a single word: letters, numbers, underscore CO B +table3 7 source error key:foreign Value 'CO B' of column source is not in table1.prefix CO B table3 8 id error key:unique Values of id must be unique COB:0000013 table3 10 id error key:unique Values of id must be unique VO:0000001 table3 10 label error key:primary Values of label must be unique vaccine From 3c0016dfd4536e37e9ce4ae531fbc58de92792e9 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Sat, 28 Oct 2023 15:14:09 -0400 Subject: [PATCH 10/14] use fewer conflict columns; do not set value to null when validating tree-foreigns and under constraints --- src/lib.rs | 66 ++++++++---------------------------------------------- 1 file changed, 9 insertions(+), 57 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fc0cfc91..86fe95a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2297,46 +2297,8 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - - let foreign_targets = global_config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|o| o.get("foreign")) - .and_then(|t| t.as_object()) - .and_then(|o| o.get(table_name)) - .and_then(|t| t.as_array()) - .unwrap() - .iter() - .map(|v| v.as_object().unwrap()) - .filter(|o| o.get("ftable").unwrap().as_str() == Some(table_name)) - .map(|v| v.get("fcolumn").unwrap().clone()) - .collect::>(); - - let tree_parents = global_config - .get("constraints") - .and_then(|c| c.as_object()) - .and_then(|o| o.get("tree")) - .and_then(|t| t.as_object()) - .and_then(|o| o.get(table_name)) - .and_then(|t| t.as_array()) - .unwrap() - .iter() - .map(|v| v.as_object().unwrap()) - .map(|v| v.get("parent").unwrap().clone()) - .collect::>(); - + // We take tree-children because these imply a unique database constraint on the corresponding + // column. let tree_children = global_config .get("constraints") .and_then(|c| c.as_object()) @@ -2350,10 +2312,10 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - let under_sources = global_config + let foreign_sources = global_config .get("constraints") .and_then(|c| c.as_object()) - .and_then(|o| o.get("under")) + .and_then(|o| o.get("foreign")) .and_then(|t| t.as_object()) .and_then(|o| o.get(table_name)) .and_then(|t| t.as_array()) @@ -2363,29 +2325,26 @@ fn get_conflict_columns(global_config: &SerdeMap, table_name: &str) -> Vec>(); - let under_targets = global_config + let foreign_targets = global_config .get("constraints") .and_then(|c| c.as_object()) - .and_then(|o| o.get("under")) + .and_then(|o| o.get("foreign")) .and_then(|t| t.as_object()) .and_then(|o| o.get(table_name)) .and_then(|t| t.as_array()) .unwrap() .iter() .map(|v| v.as_object().unwrap()) - .filter(|o| o.get("ttable").unwrap().as_str() == Some(table_name)) - .map(|v| v.get("tcolumn").unwrap().clone()) + .filter(|o| o.get("ftable").unwrap().as_str() == Some(table_name)) + .map(|v| v.get("fcolumn").unwrap().clone()) .collect::>(); for key_columns in vec![ primaries, uniques, + &tree_children, &foreign_sources, &foreign_targets, - &tree_parents, - &tree_children, - &under_sources, - &under_targets, ] { for column in key_columns { if !conflict_columns.contains(column) { @@ -4508,13 +4467,6 @@ async fn load_db( let rule = record.get("rule").and_then(|s| s.as_str()).unwrap(); let message = record.get("message").and_then(|s| s.as_str()).unwrap(); - let sql = format!( - r#"UPDATE "{}" SET "{}" = NULL WHERE "row_number" = {}"#, - table_name, column_name, row_number - ); - let query = sqlx_query(&sql); - query.execute(pool).await?; - let sql = local_sql_syntax( &pool, &format!( From 2d4ef6dfa8d49b7e3c1e04410f891f28073029d4 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Tue, 31 Oct 2023 09:39:27 -0400 Subject: [PATCH 11/14] small tweaks --- src/lib.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 86fe95a9..d53916cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -837,15 +837,15 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { (drop_view_sql, create_view_sql) } -/// Given the name of a table and a database connection pool, generate SQL for creating a more -/// user-friendly version of the view that is generated by [get_sql_for_standard_view()]. -/// Unlike the standard view generated by that function, the view generated by this function -/// (called my_table_user_view) always shows all of the values of every column in the table, even -/// when those values contain errors. Also unlike the standard view, the datatypes of all columns -/// in the user view are TEXT (this is what makes it possible to always display error values). -/// Like the function for generating a standard view, the SQL generated by this function is in the -/// form of a tuple of Strings, with the first string being a SQL statement for dropping the view, -/// and the second string being a SQL statement for creating it. +/// Given the tables configuration map, the name of a table and a database connection pool, +/// generate SQL for creating a more user-friendly version of the view that is generated by +/// [get_sql_for_standard_view()]. Unlike the standard view generated by that function, the view +/// generated by this function (called my_table_user_view) always shows all of the values of every +/// column in the table, even when those values contain errors. Also unlike the standard view, the +/// datatypes of all columns in the user view are TEXT (this is what makes it possible to always +/// display error values). Like the function for generating a standard view, the SQL generated by +/// this function is in the form of a tuple of Strings, with the first string being a SQL statement +/// for dropping the view, and the second string being a SQL statement for creating it. fn get_sql_for_user_view( tables_config: &mut SerdeMap, table: &str, @@ -2555,10 +2555,6 @@ pub async fn insert_new_row_tx( if is_sql_type_error(&sql_type, value) { insert_values.push(String::from("NULL")); } else { - let sql_type = get_sql_type_from_global_config(&global_config, &table, &column, pool) - .ok_or(SqlxCErr( - format!("Unable to determine SQL type for {}.{}", table, column).into(), - ))?; insert_values.push(cast_sql_param_from_text(&sql_type)); insert_params.push(String::from(value)); } From c8e25a98686e5623f16f7c0d4c1490a284386458 Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Tue, 31 Oct 2023 09:57:47 -0400 Subject: [PATCH 12/14] use new user view in export.py --- scripts/export.py | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/scripts/export.py b/scripts/export.py index 7b300a31..4a5b0b91 100755 --- a/scripts/export.py +++ b/scripts/export.py @@ -132,34 +132,12 @@ def export_data(cursor, is_sqlite, args): columns_info = get_column_order_and_info_for_postgres(cursor, table) unsorted_columns = columns_info["unsorted_columns"] - select = [] - for column in unsorted_columns: - if column == "row_number": - select.append(f'"{column}"') - else: - cast = "" if is_sqlite else "::TEXT" - is_clause = "IS" if is_sqlite else "IS NOT DISTINCT FROM" - select.append( - f""" - CASE - WHEN "{column}" {is_clause} NULL THEN ( - SELECT value - FROM "message" - WHERE "row" = "row_number" - AND "column" = '{column}' - AND "table" = '{table}' - ORDER BY "message_id" DESC - LIMIT 1 - ) - ELSE "{column}"{cast} - END AS "{column}" - """ - ) + select = [f'"{column}"' for column in unsorted_columns] select = ", ".join(select) # Fetch the rows from the table and write them to a corresponding TSV file in the # output directory: - cursor.execute(f'SELECT {select} FROM "{table}_view" ORDER BY "row_number"') + cursor.execute(f'SELECT {select} FROM "{table}_user_view" ORDER BY "row_number"') colnames = [d[0] for d in cursor.description] rows = map(lambda r: dict(zip(colnames, r)), cursor) fieldnames = [c for c in colnames if c != "row_number"] From 86f4b350889471a8470e293c294f1e2839719d5a Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Wed, 1 Nov 2023 10:47:22 -0400 Subject: [PATCH 13/14] create public functions for getting the last (undone) record from the history table --- src/lib.rs | 94 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d53916cd..6c666c88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2047,17 +2047,9 @@ async fn switch_undone_state( Ok(()) } -/// Given a global configuration map, maps of compiled datatype and ruled conditions, a database -/// connection pool, and the user who initiated the undo, find the last recorded change to the -/// database and undo it, indicating in the history table that undo_user is responsible. -#[async_recursion] -pub async fn undo( - global_config: &SerdeMap, - compiled_datatype_conditions: &HashMap, - compiled_rule_conditions: &HashMap>>, - pool: &AnyPool, - undo_user: &str, -) -> Result<(), sqlx::Error> { +/// Given a database pool fetch the last database change represented by the last row inserted to the +/// history table that has not been undone. +pub async fn get_last_change(pool: &AnyPool) -> Result, sqlx::Error> { // Look in the history table, get the row with the greatest ID, get the row number, // from, and to, and determine whether the last operation was a delete, insert, or update. let is_clause = if pool.any_kind() == AnyKind::Sqlite { @@ -2073,20 +2065,55 @@ pub async fn undo( ); let query = sqlx_query(&sql); let result_row = query.fetch_optional(pool).await?; - let result_row = match result_row { + Ok(result_row) +} + +/// Given a database pool fetch the row in the history table that has been most recently marked as +/// undone. +pub async fn get_last_undo(pool: &AnyPool) -> Result, sqlx::Error> { + // Look in the history table, get the row with the greatest ID, get the row number, + // from, and to, and determine whether the last operation was a delete, insert, or update. + let is_not_clause = if pool.any_kind() == AnyKind::Sqlite { + "IS NOT" + } else { + "IS DISTINCT FROM" + }; + let sql = format!( + r#"SELECT * FROM "history" + WHERE "undone_by" {} NULL + ORDER BY "timestamp" DESC LIMIT 1"#, + is_not_clause + ); + let query = sqlx_query(&sql); + let result_row = query.fetch_optional(pool).await?; + Ok(result_row) +} + +/// Given a global configuration map, maps of compiled datatype and ruled conditions, a database +/// connection pool, and the user who initiated the undo, find the last recorded change to the +/// database and undo it, indicating in the history table that undo_user is responsible. +#[async_recursion] +pub async fn undo( + global_config: &SerdeMap, + compiled_datatype_conditions: &HashMap, + compiled_rule_conditions: &HashMap>>, + pool: &AnyPool, + undo_user: &str, +) -> Result<(), sqlx::Error> { + let last_change = match get_last_change(pool).await? { None => { eprintln!("WARN: Nothing to undo."); return Ok(()); } Some(r) => r, }; - let history_id: i32 = result_row.get("history_id"); + let history_id: i32 = last_change.get("history_id"); let history_id = history_id as u16; - let table: &str = result_row.get("table"); - let row_number: i64 = result_row.get("row"); + let table: &str = last_change.get("table"); + let row_number: i64 = last_change.get("row"); let row_number = row_number as u32; - let from = get_json_from_row(&result_row, "from"); - let to = get_json_from_row(&result_row, "to"); + let from = get_json_from_row(&last_change, "from"); + let to = get_json_from_row(&last_change, "to"); match (from, to) { (None, None) => { @@ -2169,42 +2196,27 @@ pub async fn redo( pool: &AnyPool, redo_user: &str, ) -> Result<(), sqlx::Error> { - // Look in the history table, get the row with the greatest ID, get the row number, - // from, and to, and determine whether the last operation was a delete, insert, or update. - let is_not_clause = if pool.any_kind() == AnyKind::Sqlite { - "IS NOT" - } else { - "IS DISTINCT FROM" - }; - let sql = format!( - r#"SELECT * FROM "history" - WHERE "undone_by" {} NULL - ORDER BY "timestamp" DESC LIMIT 1"#, - is_not_clause - ); - let query = sqlx_query(&sql); - let result_row = query.fetch_optional(pool).await?; - let result_row = match result_row { + let last_undo = match get_last_undo(pool).await? { None => { eprintln!("WARN: Nothing to redo."); return Ok(()); } - Some(result_row) => { - let undone_by = result_row.try_get_raw("undone_by")?; + Some(last_undo) => { + let undone_by = last_undo.try_get_raw("undone_by")?; if undone_by.is_null() { eprintln!("WARN: Nothing to redo."); return Ok(()); } - result_row + last_undo } }; - let history_id: i32 = result_row.get("history_id"); + let history_id: i32 = last_undo.get("history_id"); let history_id = history_id as u16; - let table: &str = result_row.get("table"); - let row_number: i64 = result_row.get("row"); + let table: &str = last_undo.get("table"); + let row_number: i64 = last_undo.get("row"); let row_number = row_number as u32; - let from = get_json_from_row(&result_row, "from"); - let to = get_json_from_row(&result_row, "to"); + let from = get_json_from_row(&last_undo, "from"); + let to = get_json_from_row(&last_undo, "to"); match (from, to) { (None, None) => { From 8e17891fa47bae15467381babe55e2368fa0ee0b Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 3 Nov 2023 10:34:00 -0400 Subject: [PATCH 14/14] change 'user_view' to 'text_view'; change get_last_undo() to get_record_to_redo() and get_last_change() to get_record_to_undo() --- scripts/export.py | 2 +- src/lib.rs | 40 +++++++++++++++++++--------------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/scripts/export.py b/scripts/export.py index 4a5b0b91..bc61c259 100755 --- a/scripts/export.py +++ b/scripts/export.py @@ -137,7 +137,7 @@ def export_data(cursor, is_sqlite, args): # Fetch the rows from the table and write them to a corresponding TSV file in the # output directory: - cursor.execute(f'SELECT {select} FROM "{table}_user_view" ORDER BY "row_number"') + cursor.execute(f'SELECT {select} FROM "{table}_text_view" ORDER BY "row_number"') colnames = [d[0] for d in cursor.description] rows = map(lambda r: dict(zip(colnames, r)), cursor) fieldnames = [c for c in colnames if c != "row_number"] diff --git a/src/lib.rs b/src/lib.rs index 6c666c88..981bdf83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -838,15 +838,14 @@ fn get_sql_for_standard_view(table: &str, pool: &AnyPool) -> (String, String) { } /// Given the tables configuration map, the name of a table and a database connection pool, -/// generate SQL for creating a more user-friendly version of the view that is generated by +/// generate SQL for creating a more user-friendly version of the view than the one generated by /// [get_sql_for_standard_view()]. Unlike the standard view generated by that function, the view -/// generated by this function (called my_table_user_view) always shows all of the values of every -/// column in the table, even when those values contain errors. Also unlike the standard view, the -/// datatypes of all columns in the user view are TEXT (this is what makes it possible to always -/// display error values). Like the function for generating a standard view, the SQL generated by -/// this function is in the form of a tuple of Strings, with the first string being a SQL statement +/// generated by this function (called my_table_text_view) always shows all of the values (which are +/// all rendered as text) of every column in the table, even when those values contain SQL datatype +/// errors. Like the function for generating a standard view, the SQL generated by this function is +/// returned in the form of a tuple of Strings, with the first string being a SQL statement /// for dropping the view, and the second string being a SQL statement for creating it. -fn get_sql_for_user_view( +fn get_sql_for_text_view( tables_config: &mut SerdeMap, table: &str, pool: &AnyPool, @@ -867,10 +866,10 @@ fn get_sql_for_user_view( .and_then(|t| Some(t.collect::>())) .unwrap(); - // Add a second "user view" such that the datatypes of all values are TEXT and appear + // Add a second "text view" such that the datatypes of all values are TEXT and appear // directly in their corresponsing columns (rather than as NULLs) even when they have - // errors. - let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_user_view""#, table); + // SQL datatype errors. + let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_text_view""#, table); if pool.any_kind() == AnyKind::Postgres { drop_view_sql.push_str(" CASCADE"); } @@ -928,7 +927,7 @@ fn get_sql_for_user_view( }; let create_view_sql = format!( - r#"CREATE VIEW "{table}_user_view" AS + r#"CREATE VIEW "{table}_text_view" AS SELECT {outer_columns} FROM ( SELECT {inner_columns} @@ -1087,12 +1086,12 @@ pub async fn configure_db( } let (drop_view_sql, create_view_sql) = get_sql_for_standard_view(&table_name, pool); - let (drop_user_view_sql, create_user_view_sql) = - get_sql_for_user_view(tables_config, &table_name, pool); - table_statements.push(drop_user_view_sql); + let (drop_text_view_sql, create_text_view_sql) = + get_sql_for_text_view(tables_config, &table_name, pool); + table_statements.push(drop_text_view_sql); table_statements.push(drop_view_sql); table_statements.push(create_view_sql); - table_statements.push(create_user_view_sql); + table_statements.push(create_text_view_sql); setup_statements.insert(table_name.to_string(), table_statements); } @@ -2047,9 +2046,8 @@ async fn switch_undone_state( Ok(()) } -/// Given a database pool fetch the last database change represented by the last row inserted to the -/// history table that has not been undone. -pub async fn get_last_change(pool: &AnyPool) -> Result, sqlx::Error> { +/// Given a database pool fetch the last row inserted to the history table that has not been undone. +pub async fn get_record_to_undo(pool: &AnyPool) -> Result, sqlx::Error> { // Look in the history table, get the row with the greatest ID, get the row number, // from, and to, and determine whether the last operation was a delete, insert, or update. let is_clause = if pool.any_kind() == AnyKind::Sqlite { @@ -2070,7 +2068,7 @@ pub async fn get_last_change(pool: &AnyPool) -> Result, sqlx::Err /// Given a database pool fetch the row in the history table that has been most recently marked as /// undone. -pub async fn get_last_undo(pool: &AnyPool) -> Result, sqlx::Error> { +pub async fn get_record_to_redo(pool: &AnyPool) -> Result, sqlx::Error> { // Look in the history table, get the row with the greatest ID, get the row number, // from, and to, and determine whether the last operation was a delete, insert, or update. let is_not_clause = if pool.any_kind() == AnyKind::Sqlite { @@ -2100,7 +2098,7 @@ pub async fn undo( pool: &AnyPool, undo_user: &str, ) -> Result<(), sqlx::Error> { - let last_change = match get_last_change(pool).await? { + let last_change = match get_record_to_undo(pool).await? { None => { eprintln!("WARN: Nothing to undo."); return Ok(()); @@ -2196,7 +2194,7 @@ pub async fn redo( pool: &AnyPool, redo_user: &str, ) -> Result<(), sqlx::Error> { - let last_undo = match get_last_undo(pool).await? { + let last_undo = match get_record_to_redo(pool).await? { None => { eprintln!("WARN: Nothing to redo."); return Ok(());