From fc873eb3d93bc2a2e931d14d00988af1f6aacbff Mon Sep 17 00:00:00 2001 From: Michael Cuffaro Date: Fri, 23 Feb 2024 20:23:12 -0500 Subject: [PATCH] use ValveRow everywhere --- src/lib.rs | 126 +++++++++++++++++------------------------------- src/validate.rs | 69 ++++++++------------------ src/valve.rs | 73 +++++++++++++++++++--------- 3 files changed, 116 insertions(+), 152 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f20b8cd2..6d32599a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ use crate::{ validate_row_tx, validate_rows_constraints, validate_rows_intra, validate_rows_trees, }, valve::{ - ValveCellMessage, ValveColumnConfig, ValveConfig, ValveConstraintConfig, + ValveCell, ValveCellMessage, ValveColumnConfig, ValveConfig, ValveConstraintConfig, ValveDatatypeConfig, ValveError, ValveForeignConstraint, ValveRow, ValveRuleConfig, ValveSpecialConfig, ValveTableConfig, ValveTreeConstraint, ValveUnderConstraint, }, @@ -173,7 +173,7 @@ pub struct QueryAsIf { // named 'foo' so we need to use an alias: pub alias: String, pub row_number: u32, - pub row: Option, + pub row: Option, } /// The sense in which a [QueryAsIf] struct should be interpreted. @@ -1339,7 +1339,7 @@ pub async fn get_affected_rows( config: &ValveConfig, pool: &AnyPool, tx: &mut Transaction<'_, sqlx::Any>, -) -> Result, ValveError> { +) -> Result, ValveError> { // Since the consequence of an update could involve currently invalid rows // (in the conflict table) becoming valid or vice versa, we need to check rows for // which the value of the column is the same as `value` @@ -1359,9 +1359,9 @@ pub async fn get_affected_rows( }; let query = sqlx_query(&sql); - let mut table_rows = IndexMap::new(); + let mut valve_rows = vec![]; for row in query.fetch_all(tx.acquire().await?).await? { - let mut table_row = SerdeMap::new(); + let mut contents = IndexMap::new(); let mut row_number: Option = None; for column in row.columns() { let cname = column.name(); @@ -1375,20 +1375,18 @@ pub async fn get_affected_rows( } else { value = String::from(""); } - let cell = json!({ - "value": value, - "valid": true, - "messages": json!([]), - }); - table_row.insert(cname.to_string(), json!(cell)); + contents.insert(cname.to_string(), ValveCell::new(&json!(value))); } } let row_number = row_number.ok_or(ValveError::DataError("Row: has no row number".to_string()))?; - table_rows.insert(row_number, table_row); + valve_rows.push(ValveRow { + row_number: Some(row_number), + contents: contents, + }); } - Ok(table_rows) + Ok(valve_rows) } /// Given a global configuration map, a database connection pool, a database transaction, a table @@ -1541,25 +1539,20 @@ pub async fn get_rows_to_update( query_as_if: &QueryAsIf, ) -> Result< ( - IndexMap>, - IndexMap>, - IndexMap>, + IndexMap>, + IndexMap>, + IndexMap>, ), ValveError, > { - fn get_cell_value(row: &SerdeMap, column: &str) -> Result { - match row.get(column).and_then(|cell| cell.get("value")) { - Some(SerdeValue::String(s)) => Ok(format!("{}", s)), - Some(SerdeValue::Number(n)) => Ok(format!("{}", n)), - Some(SerdeValue::Bool(b)) => Ok(format!("{}", b)), - _ => Err(ValveError::DataError( - format!( - "Value missing or of unknown type in column {} of row to update: {:?}", - column, row - ) - .into(), - )), - } + fn get_cell_value(row: &ValveRow, column: &str) -> Result { + row.contents + .get(column) + .and_then(|cell| Some(cell.strvalue())) + .ok_or(ValveError::InputError(format!( + "Value missing or of unknown type in column {} of row to update: {:?}", + column, row + ))) } // Collect foreign key dependencies: @@ -1595,7 +1588,7 @@ pub async fn get_rows_to_update( query_as_if.kind ); } - IndexMap::new() + vec![] } _ => { let current_value = get_db_value( @@ -1631,7 +1624,7 @@ pub async fn get_rows_to_update( query_as_if.kind ); } - IndexMap::new() + vec![] } Some(row) => { // Fetch the cell corresponding to `column` from `row`, and the value of that cell, @@ -1693,7 +1686,7 @@ pub async fn get_rows_to_update( query_as_if.kind ); } - IndexMap::new() + vec![] } _ => { let current_value = @@ -1737,12 +1730,15 @@ pub async fn process_updates( compiled_rule_conditions: &HashMap>>, pool: &AnyPool, tx: &mut Transaction<'_, sqlx::Any>, - updates: &IndexMap>, + updates: &IndexMap>, query_as_if: &QueryAsIf, do_not_recurse: bool, ) -> Result<(), ValveError> { for (update_table, rows_to_update) in updates { - for (row_number, row) in rows_to_update { + for row in rows_to_update { + let row_number = row + .row_number + .ok_or(ValveError::InputError("Row has no row number".to_string()))?; // Validate each row 'counterfactually': let vrow = validate_row_tx( config, @@ -1752,7 +1748,7 @@ pub async fn process_updates( Some(tx), update_table, row, - Some(*row_number), + Some(row_number), Some(&query_as_if), ) .await?; @@ -1766,7 +1762,7 @@ pub async fn process_updates( tx, update_table, &vrow, - row_number, + &row_number, false, do_not_recurse, ) @@ -2110,7 +2106,7 @@ pub async fn insert_new_row_tx( pool: &AnyPool, tx: &mut Transaction, table: &str, - row: &SerdeMap, + row: &ValveRow, new_row_number: Option, skip_validation: bool, ) -> Result { @@ -2174,49 +2170,13 @@ pub async fn insert_new_row_tx( let sorted_datatypes = get_sorted_datatypes(config); let conflict_columns = get_conflict_columns(config, table); let mut use_conflict_table = false; - for (column, cell) in row.iter() { + for (column, cell) in row.contents.iter() { insert_columns.append(&mut vec![format!(r#""{}""#, column)]); - - let cell = cell.as_object().ok_or(ValveError::InputError( - format!("Cell {:?} is not an object", cell).into(), - ))?; - let valid = cell - .get("valid") - .and_then(|v| v.as_bool()) - .ok_or(ValveError::InputError( - format!("No bool named 'valid' in {:?}", cell).into(), - ))?; - let value = cell - .get("value") - .and_then(|v| v.as_str()) - .ok_or(ValveError::InputError( - format!("No string named 'value' in {:?}", cell).into(), - ))?; - let messages = sort_messages( - &sorted_datatypes, - &cell - .get("messages") - .and_then(|m| m.as_array()) - .ok_or(ValveError::InputError( - format!("No array named 'messages' in {:?}", cell).into(), - ))? - .iter() - .map(|m| ValveCellMessage { - level: m.get("level").and_then(|s| s.as_str()).unwrap().to_string(), - rule: m.get("rule").and_then(|s| s.as_str()).unwrap().to_string(), - message: m - .get("message") - .and_then(|s| s.as_str()) - .unwrap() - .to_string(), - }) - .collect::>(), - ); - + let messages = sort_messages(&sorted_datatypes, &cell.messages); for message in messages { all_messages.push(json!({ "column": column, - "value": value, + "value": &cell.value, "level": message.level, "rule": message.rule, "message": message.message, @@ -2224,14 +2184,14 @@ pub async fn insert_new_row_tx( } let sql_type = get_sql_type_from_global_config(config, table, column, pool); - if is_sql_type_error(&sql_type, value) { + if is_sql_type_error(&sql_type, &cell.strvalue()) { insert_values.push(String::from("NULL")); } else { insert_values.push(cast_sql_param_from_text(&sql_type)); - insert_params.push(String::from(value)); + insert_params.push(String::from(cell.strvalue())); } - if !use_conflict_table && !valid && conflict_columns.contains(&column) { + if !use_conflict_table && !cell.valid && conflict_columns.contains(&column) { use_conflict_table = true; } } @@ -2279,7 +2239,11 @@ pub async fn insert_new_row_tx( // Next add any validation messages to the message table: for m in all_messages { let column = m.get("column").and_then(|c| c.as_str()).unwrap(); - let value = m.get("value").and_then(|c| c.as_str()).unwrap(); + let value = match m.get("value") { + Some(SerdeValue::String(s)) => s.to_string(), + Some(v) => format!("{}", v), + _ => panic!("Message '{}' has no value", m), + }; let level = m.get("level").and_then(|c| c.as_str()).unwrap(); let rule = m.get("rule").and_then(|c| c.as_str()).unwrap(); let message = m.get("message").and_then(|c| c.as_str()).unwrap(); @@ -2417,7 +2381,7 @@ pub async fn update_row_tx( pool: &AnyPool, tx: &mut Transaction, table: &str, - row: &SerdeMap, + row: &ValveRow, row_number: &u32, skip_validation: bool, do_not_recurse: bool, diff --git a/src/validate.rs b/src/validate.rs index 99ed3be2..0d8adab8 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -43,10 +43,10 @@ pub async fn validate_row_tx( pool: &AnyPool, tx: Option<&mut Transaction<'_, sqlx::Any>>, table_name: &str, - row: &SerdeMap, + row: &ValveRow, row_number: Option, query_as_if: Option<&QueryAsIf>, -) -> Result { +) -> Result { // Fallback to a default transaction if it is not given. Since we do not commit before it falls // out of scope the transaction will be rolled back at the end of this function. And since this // function is read-only the rollback is trivial and therefore inconsequential. @@ -57,7 +57,7 @@ pub async fn validate_row_tx( }; // Initialize the result row with the values from the given row: - let mut valve_row = rich_json_to_valve_row(row_number, row)?; + let mut valve_row = row.clone(); // We check all the cells for nulltype first, since the rules validation requires that we // have this information for all cells. @@ -173,7 +173,7 @@ pub async fn validate_row_tx( } } - let valve_row = remove_duplicate_messages(&valve_row_to_rich_json(&valve_row))?; + remove_duplicate_messages(&mut valve_row)?; Ok(valve_row) } @@ -664,41 +664,20 @@ pub fn validate_rows_intra( valve_rows } -/// Given a row represented as a [SerdeMap], remove any duplicate messages from the row's cells, so +/// Given a row represented as a [ValveRow], remove any duplicate messages from the row's cells, so /// that no cell has messages with the same level, rule, and message text. -pub fn remove_duplicate_messages(row: &SerdeMap) -> Result { - let mut deduped_row = SerdeMap::new(); - for (column_name, cell) in row.iter() { - let mut messages = cell - .get("messages") - .and_then(|m| m.as_array()) - .unwrap_or(&vec![]) - .clone(); +fn remove_duplicate_messages(row: &mut ValveRow) -> Result<(), ValveError> { + for (_column_name, cell) in row.contents.iter_mut() { + let mut messages = cell.messages.clone(); messages.sort_by(|a, b| { - let a = format!( - "{}{}{}", - a.get("level").unwrap(), - a.get("rule").unwrap(), - a.get("message").unwrap() - ); - let b = format!( - "{}{}{}", - b.get("level").unwrap(), - b.get("rule").unwrap(), - b.get("message").unwrap() - ); + let a = format!("{}{}{}", a.level, a.rule, a.message); + let b = format!("{}{}{}", b.level, b.rule, b.message,); a.partial_cmp(&b).unwrap() }); - messages.dedup_by(|a, b| { - a.get("level").unwrap() == b.get("level").unwrap() - && a.get("rule").unwrap() == b.get("rule").unwrap() - && a.get("message").unwrap() == b.get("message").unwrap() - }); - let mut cell = cell.as_object().unwrap().clone(); - cell.insert("messages".to_string(), json!(messages)); - deduped_row.insert(column_name.to_string(), json!(cell)); + messages.dedup_by(|a, b| a.level == b.level && a.rule == b.rule && a.message == b.message); + cell.messages = messages; } - Ok(deduped_row) + Ok(()) } /// Given a row, represented as a JSON object in the following ('simple') format: @@ -1234,29 +1213,23 @@ pub fn as_if_to_sql( } QueryAsIfKind::Add | QueryAsIfKind::Replace => { let row = as_if.row.as_ref().unwrap(); - let columns = row.keys().cloned().collect::>(); + let columns = row.contents.keys().cloned().collect::>(); let values = { let mut values = vec![]; for column in &columns { let valid = row + .contents .get(column) - .and_then(|c| c.get("valid")) - .and_then(|v| v.as_bool()) + .and_then(|c| Some(c.valid)) .unwrap(); let value = { if valid == true { - let value = match row.get(column).and_then(|c| c.get("value")) { - Some(SerdeValue::String(s)) => Ok(format!("{}", s)), - Some(SerdeValue::Number(n)) => Ok(format!("{}", n)), - Some(SerdeValue::Bool(b)) => Ok(format!("{}", b)), - _ => Err(format!( - "Value missing or of unknown type in column {} of row to \ - update: {:?}", - column, row - )), - } - .unwrap(); + let value = row + .contents + .get(column) + .and_then(|c| Some(c.strvalue())) + .unwrap(); if value == "" { "NULL".to_string() } else { diff --git a/src/valve.rs b/src/valve.rs index 7d714290..0606356b 100644 --- a/src/valve.rs +++ b/src/valve.rs @@ -10,8 +10,8 @@ use crate::{ get_sql_type_from_global_config, get_table_ddl, insert_chunks, insert_new_row_tx, local_sql_syntax, read_config_files, record_row_change, switch_undone_state, update_row_tx, validate::{ - rich_json_to_valve_row, simple_to_rich_json, validate_row_tx, validate_tree_foreign_keys, - validate_under, with_tree_sql, + rich_json_to_valve_row, validate_row_tx, validate_tree_foreign_keys, validate_under, + valve_row_to_rich_json, with_tree_sql, }, valve_grammar::StartParser, verify_table_deps_and_sort, ColumnRule, CompiledCondition, ParsedStructure, CHUNK_SIZE, @@ -45,22 +45,19 @@ pub struct ValveCell { } impl ValveCell { - pub fn new(value: &SerdeValue) -> Result { + pub fn new(value: &SerdeValue) -> Self { let value = match value { SerdeValue::String(_) | SerdeValue::Number(_) | SerdeValue::Bool(_) => value.clone(), _ => { - return Err(ValveError::InputError(format!( - "Value '{}' is not a simple JSON type", - value - ))) + panic!("Value '{}' is not a simple JSON type", value) } }; - Ok(Self { + Self { nulltype: None, value: value, valid: true, messages: vec![], - }) + } } pub fn strvalue(&self) -> String { @@ -1493,6 +1490,21 @@ impl Valve { Ok(self) } + /// TODO: Add a docstring here. + pub fn simple_json_to_valve_row( + row: &JsonRow, + row_number: Option, + ) -> Result { + let mut valve_cells = IndexMap::new(); + for (column, value) in row.iter() { + valve_cells.insert(column.to_string(), ValveCell::new(value)); + } + Ok(ValveRow { + row_number: row_number, + contents: valve_cells, + }) + } + /// Given a table name and a row, represented as a JSON object in the following ('simple') /// format: /// ``` @@ -1509,8 +1521,8 @@ impl Valve { row: &JsonRow, row_number: Option, ) -> Result { - let row = simple_to_rich_json(row)?; - let row = validate_row_tx( + let row = Self::simple_json_to_valve_row(row, row_number)?; + validate_row_tx( &self.config, &self.datatype_conditions, &self.rule_conditions, @@ -1521,8 +1533,7 @@ impl Valve { row_number, None, ) - .await?; - rich_json_to_valve_row(row_number, &row) + .await } /// Given a table name and a row, represented as a JSON object in the following ('simple') @@ -1542,7 +1553,7 @@ impl Valve { row: &JsonRow, ) -> Result<(u32, ValveRow), ValveError> { let mut tx = self.pool.begin().await?; - let row = simple_to_rich_json(row)?; + let row = Self::simple_json_to_valve_row(row, None)?; let row = validate_row_tx( &self.config, &self.datatype_conditions, @@ -1569,10 +1580,12 @@ impl Valve { ) .await?; - record_row_change(&mut tx, table_name, &rn, None, Some(&row), &self.user).await?; - tx.commit().await?; + // TODO: Instead of calling result_row_to_config_map() here, change record_row_change() + // so that it accepts a ValveRow instead of a SerdeMap. + let serde_row = valve_row_to_rich_json(&row); + record_row_change(&mut tx, table_name, &rn, None, Some(&serde_row), &self.user).await?; - let row = rich_json_to_valve_row(Some(rn), &row)?; + tx.commit().await?; Ok((rn, row)) } @@ -1599,7 +1612,7 @@ impl Valve { let old_row = get_row_from_db(&self.config, &self.pool, &mut tx, table_name, &row_number).await?; - let row = simple_to_rich_json(row)?; + let row = Self::simple_json_to_valve_row(row, Some(*row_number))?; let row = validate_row_tx( &self.config, &self.datatype_conditions, @@ -1628,19 +1641,20 @@ impl Valve { .await?; // Record the row update in the history table: + // TODO: Instead of calling result_row_to_config_map() here, change record_row_change() + // so that it accepts a ValveRow instead of a SerdeMap. + let serde_row = valve_row_to_rich_json(&row); record_row_change( &mut tx, table_name, row_number, Some(&old_row), - Some(&row), + Some(&serde_row), &self.user, ) .await?; tx.commit().await?; - - let row = rich_json_to_valve_row(Some(*row_number), &row)?; Ok(row) } @@ -1780,7 +1794,7 @@ impl Valve { } /// Undo one change and return the change record or None if there was no change to undo. - pub async fn undo(&self) -> Result, ValveError> { + pub async fn undo(&self) -> Result, ValveError> { let last_change = match get_record_to_undo(&self.pool).await? { None => { log::warn!("Nothing to undo."); @@ -1825,6 +1839,10 @@ impl Valve { // Undo a delete: let mut tx = self.pool.begin().await?; + // TODO: Instead of calling result_row_to_config_map() here, change record_row_change() + // so that it accepts a ValveRow instead of a SerdeMap. + let from = rich_json_to_valve_row(Some(row_number), &from)?; + insert_new_row_tx( &self.config, &self.datatype_conditions, @@ -1846,6 +1864,10 @@ impl Valve { // Undo an an update: let mut tx = self.pool.begin().await?; + // TODO: Instead of calling result_row_to_config_map() here, change record_row_change() + // so that it accepts a ValveRow instead of a SerdeMap. + let from = rich_json_to_valve_row(Some(row_number), &from)?; + update_row_tx( &self.config, &self.datatype_conditions, @@ -1868,7 +1890,7 @@ impl Valve { } /// Redo one change and return the change record or None if there was no change to redo. - pub async fn redo(&self) -> Result, ValveError> { + pub async fn redo(&self) -> Result, ValveError> { let last_undo = match get_record_to_redo(&self.pool).await? { None => { log::warn!("Nothing to redo."); @@ -1901,6 +1923,10 @@ impl Valve { // Redo an insert: let mut tx = self.pool.begin().await?; + // TODO: Instead of calling result_row_to_config_map() here, change record_row_change() + // so that it accepts a ValveRow instead of a SerdeMap. + let to = rich_json_to_valve_row(Some(row_number), &to)?; + insert_new_row_tx( &self.config, &self.datatype_conditions, @@ -1941,6 +1967,7 @@ impl Valve { // Redo an an update: let mut tx = self.pool.begin().await?; + let to = rich_json_to_valve_row(Some(row_number), &to)?; update_row_tx( &self.config, &self.datatype_conditions,