Skip to content

Commit

Permalink
use ValveRow everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
lmcmicu committed Feb 24, 2024
1 parent eed3fbc commit fc873eb
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 152 deletions.
126 changes: 45 additions & 81 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
validate_row_tx, validate_rows_constraints, validate_rows_intra, validate_rows_trees,
},
valve::{
ValveCellMessage, ValveColumnConfig, ValveConfig, ValveConstraintConfig,
ValveCell, ValveCellMessage, ValveColumnConfig, ValveConfig, ValveConstraintConfig,
ValveDatatypeConfig, ValveError, ValveForeignConstraint, ValveRow, ValveRuleConfig,
ValveSpecialConfig, ValveTableConfig, ValveTreeConstraint, ValveUnderConstraint,
},
Expand Down Expand Up @@ -173,7 +173,7 @@ pub struct QueryAsIf {
// named 'foo' so we need to use an alias:
pub alias: String,
pub row_number: u32,
pub row: Option<SerdeMap>,
pub row: Option<ValveRow>,
}

/// The sense in which a [QueryAsIf] struct should be interpreted.
Expand Down Expand Up @@ -1339,7 +1339,7 @@ pub async fn get_affected_rows(
config: &ValveConfig,
pool: &AnyPool,
tx: &mut Transaction<'_, sqlx::Any>,
) -> Result<IndexMap<u32, SerdeMap>, ValveError> {
) -> Result<Vec<ValveRow>, ValveError> {
// Since the consequence of an update could involve currently invalid rows
// (in the conflict table) becoming valid or vice versa, we need to check rows for
// which the value of the column is the same as `value`
Expand All @@ -1359,9 +1359,9 @@ pub async fn get_affected_rows(
};

let query = sqlx_query(&sql);
let mut table_rows = IndexMap::new();
let mut valve_rows = vec![];
for row in query.fetch_all(tx.acquire().await?).await? {
let mut table_row = SerdeMap::new();
let mut contents = IndexMap::new();
let mut row_number: Option<u32> = None;
for column in row.columns() {
let cname = column.name();
Expand All @@ -1375,20 +1375,18 @@ pub async fn get_affected_rows(
} else {
value = String::from("");
}
let cell = json!({
"value": value,
"valid": true,
"messages": json!([]),
});
table_row.insert(cname.to_string(), json!(cell));
contents.insert(cname.to_string(), ValveCell::new(&json!(value)));
}
}
let row_number =
row_number.ok_or(ValveError::DataError("Row: has no row number".to_string()))?;
table_rows.insert(row_number, table_row);
valve_rows.push(ValveRow {
row_number: Some(row_number),
contents: contents,
});
}

Ok(table_rows)
Ok(valve_rows)
}

/// Given a global configuration map, a database connection pool, a database transaction, a table
Expand Down Expand Up @@ -1541,25 +1539,20 @@ pub async fn get_rows_to_update(
query_as_if: &QueryAsIf,
) -> Result<
(
IndexMap<String, IndexMap<u32, SerdeMap>>,
IndexMap<String, IndexMap<u32, SerdeMap>>,
IndexMap<String, IndexMap<u32, SerdeMap>>,
IndexMap<String, Vec<ValveRow>>,
IndexMap<String, Vec<ValveRow>>,
IndexMap<String, Vec<ValveRow>>,
),
ValveError,
> {
fn get_cell_value(row: &SerdeMap, column: &str) -> Result<String, ValveError> {
match row.get(column).and_then(|cell| cell.get("value")) {
Some(SerdeValue::String(s)) => Ok(format!("{}", s)),
Some(SerdeValue::Number(n)) => Ok(format!("{}", n)),
Some(SerdeValue::Bool(b)) => Ok(format!("{}", b)),
_ => Err(ValveError::DataError(
format!(
"Value missing or of unknown type in column {} of row to update: {:?}",
column, row
)
.into(),
)),
}
fn get_cell_value(row: &ValveRow, column: &str) -> Result<String, ValveError> {
row.contents
.get(column)
.and_then(|cell| Some(cell.strvalue()))
.ok_or(ValveError::InputError(format!(
"Value missing or of unknown type in column {} of row to update: {:?}",
column, row
)))
}

// Collect foreign key dependencies:
Expand Down Expand Up @@ -1595,7 +1588,7 @@ pub async fn get_rows_to_update(
query_as_if.kind
);
}
IndexMap::new()
vec![]
}
_ => {
let current_value = get_db_value(
Expand Down Expand Up @@ -1631,7 +1624,7 @@ pub async fn get_rows_to_update(
query_as_if.kind
);
}
IndexMap::new()
vec![]
}
Some(row) => {
// Fetch the cell corresponding to `column` from `row`, and the value of that cell,
Expand Down Expand Up @@ -1693,7 +1686,7 @@ pub async fn get_rows_to_update(
query_as_if.kind
);
}
IndexMap::new()
vec![]
}
_ => {
let current_value =
Expand Down Expand Up @@ -1737,12 +1730,15 @@ pub async fn process_updates(
compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
pool: &AnyPool,
tx: &mut Transaction<'_, sqlx::Any>,
updates: &IndexMap<String, IndexMap<u32, SerdeMap>>,
updates: &IndexMap<String, Vec<ValveRow>>,
query_as_if: &QueryAsIf,
do_not_recurse: bool,
) -> Result<(), ValveError> {
for (update_table, rows_to_update) in updates {
for (row_number, row) in rows_to_update {
for row in rows_to_update {
let row_number = row
.row_number
.ok_or(ValveError::InputError("Row has no row number".to_string()))?;
// Validate each row 'counterfactually':
let vrow = validate_row_tx(
config,
Expand All @@ -1752,7 +1748,7 @@ pub async fn process_updates(
Some(tx),
update_table,
row,
Some(*row_number),
Some(row_number),
Some(&query_as_if),
)
.await?;
Expand All @@ -1766,7 +1762,7 @@ pub async fn process_updates(
tx,
update_table,
&vrow,
row_number,
&row_number,
false,
do_not_recurse,
)
Expand Down Expand Up @@ -2110,7 +2106,7 @@ pub async fn insert_new_row_tx(
pool: &AnyPool,
tx: &mut Transaction<sqlx::Any>,
table: &str,
row: &SerdeMap,
row: &ValveRow,
new_row_number: Option<u32>,
skip_validation: bool,
) -> Result<u32, ValveError> {
Expand Down Expand Up @@ -2174,64 +2170,28 @@ pub async fn insert_new_row_tx(
let sorted_datatypes = get_sorted_datatypes(config);
let conflict_columns = get_conflict_columns(config, table);
let mut use_conflict_table = false;
for (column, cell) in row.iter() {
for (column, cell) in row.contents.iter() {
insert_columns.append(&mut vec![format!(r#""{}""#, column)]);

let cell = cell.as_object().ok_or(ValveError::InputError(
format!("Cell {:?} is not an object", cell).into(),
))?;
let valid = cell
.get("valid")
.and_then(|v| v.as_bool())
.ok_or(ValveError::InputError(
format!("No bool named 'valid' in {:?}", cell).into(),
))?;
let value = cell
.get("value")
.and_then(|v| v.as_str())
.ok_or(ValveError::InputError(
format!("No string named 'value' in {:?}", cell).into(),
))?;
let messages = sort_messages(
&sorted_datatypes,
&cell
.get("messages")
.and_then(|m| m.as_array())
.ok_or(ValveError::InputError(
format!("No array named 'messages' in {:?}", cell).into(),
))?
.iter()
.map(|m| ValveCellMessage {
level: m.get("level").and_then(|s| s.as_str()).unwrap().to_string(),
rule: m.get("rule").and_then(|s| s.as_str()).unwrap().to_string(),
message: m
.get("message")
.and_then(|s| s.as_str())
.unwrap()
.to_string(),
})
.collect::<Vec<_>>(),
);

let messages = sort_messages(&sorted_datatypes, &cell.messages);
for message in messages {
all_messages.push(json!({
"column": column,
"value": value,
"value": &cell.value,
"level": message.level,
"rule": message.rule,
"message": message.message,
}));
}

let sql_type = get_sql_type_from_global_config(config, table, column, pool);
if is_sql_type_error(&sql_type, value) {
if is_sql_type_error(&sql_type, &cell.strvalue()) {
insert_values.push(String::from("NULL"));
} else {
insert_values.push(cast_sql_param_from_text(&sql_type));
insert_params.push(String::from(value));
insert_params.push(String::from(cell.strvalue()));
}

if !use_conflict_table && !valid && conflict_columns.contains(&column) {
if !use_conflict_table && !cell.valid && conflict_columns.contains(&column) {
use_conflict_table = true;
}
}
Expand Down Expand Up @@ -2279,7 +2239,11 @@ pub async fn insert_new_row_tx(
// Next add any validation messages to the message table:
for m in all_messages {
let column = m.get("column").and_then(|c| c.as_str()).unwrap();
let value = m.get("value").and_then(|c| c.as_str()).unwrap();
let value = match m.get("value") {
Some(SerdeValue::String(s)) => s.to_string(),
Some(v) => format!("{}", v),
_ => panic!("Message '{}' has no value", m),
};
let level = m.get("level").and_then(|c| c.as_str()).unwrap();
let rule = m.get("rule").and_then(|c| c.as_str()).unwrap();
let message = m.get("message").and_then(|c| c.as_str()).unwrap();
Expand Down Expand Up @@ -2417,7 +2381,7 @@ pub async fn update_row_tx(
pool: &AnyPool,
tx: &mut Transaction<sqlx::Any>,
table: &str,
row: &SerdeMap,
row: &ValveRow,
row_number: &u32,
skip_validation: bool,
do_not_recurse: bool,
Expand Down
69 changes: 21 additions & 48 deletions src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ pub async fn validate_row_tx(
pool: &AnyPool,
tx: Option<&mut Transaction<'_, sqlx::Any>>,
table_name: &str,
row: &SerdeMap,
row: &ValveRow,
row_number: Option<u32>,
query_as_if: Option<&QueryAsIf>,
) -> Result<SerdeMap, ValveError> {
) -> Result<ValveRow, ValveError> {
// Fallback to a default transaction if it is not given. Since we do not commit before it falls
// out of scope the transaction will be rolled back at the end of this function. And since this
// function is read-only the rollback is trivial and therefore inconsequential.
Expand All @@ -57,7 +57,7 @@ pub async fn validate_row_tx(
};

// Initialize the result row with the values from the given row:
let mut valve_row = rich_json_to_valve_row(row_number, row)?;
let mut valve_row = row.clone();

// We check all the cells for nulltype first, since the rules validation requires that we
// have this information for all cells.
Expand Down Expand Up @@ -173,7 +173,7 @@ pub async fn validate_row_tx(
}
}

let valve_row = remove_duplicate_messages(&valve_row_to_rich_json(&valve_row))?;
remove_duplicate_messages(&mut valve_row)?;
Ok(valve_row)
}

Expand Down Expand Up @@ -664,41 +664,20 @@ pub fn validate_rows_intra(
valve_rows
}

/// Given a row represented as a [SerdeMap], remove any duplicate messages from the row's cells, so
/// Given a row represented as a [ValveRow], remove any duplicate messages from the row's cells, so
/// that no cell has messages with the same level, rule, and message text.
pub fn remove_duplicate_messages(row: &SerdeMap) -> Result<SerdeMap, ValveError> {
let mut deduped_row = SerdeMap::new();
for (column_name, cell) in row.iter() {
let mut messages = cell
.get("messages")
.and_then(|m| m.as_array())
.unwrap_or(&vec![])
.clone();
fn remove_duplicate_messages(row: &mut ValveRow) -> Result<(), ValveError> {
for (_column_name, cell) in row.contents.iter_mut() {
let mut messages = cell.messages.clone();
messages.sort_by(|a, b| {
let a = format!(
"{}{}{}",
a.get("level").unwrap(),
a.get("rule").unwrap(),
a.get("message").unwrap()
);
let b = format!(
"{}{}{}",
b.get("level").unwrap(),
b.get("rule").unwrap(),
b.get("message").unwrap()
);
let a = format!("{}{}{}", a.level, a.rule, a.message);
let b = format!("{}{}{}", b.level, b.rule, b.message,);
a.partial_cmp(&b).unwrap()
});
messages.dedup_by(|a, b| {
a.get("level").unwrap() == b.get("level").unwrap()
&& a.get("rule").unwrap() == b.get("rule").unwrap()
&& a.get("message").unwrap() == b.get("message").unwrap()
});
let mut cell = cell.as_object().unwrap().clone();
cell.insert("messages".to_string(), json!(messages));
deduped_row.insert(column_name.to_string(), json!(cell));
messages.dedup_by(|a, b| a.level == b.level && a.rule == b.rule && a.message == b.message);
cell.messages = messages;
}
Ok(deduped_row)
Ok(())
}

/// Given a row, represented as a JSON object in the following ('simple') format:
Expand Down Expand Up @@ -1234,29 +1213,23 @@ pub fn as_if_to_sql(
}
QueryAsIfKind::Add | QueryAsIfKind::Replace => {
let row = as_if.row.as_ref().unwrap();
let columns = row.keys().cloned().collect::<Vec<_>>();
let columns = row.contents.keys().cloned().collect::<Vec<_>>();
let values = {
let mut values = vec![];
for column in &columns {
let valid = row
.contents
.get(column)
.and_then(|c| c.get("valid"))
.and_then(|v| v.as_bool())
.and_then(|c| Some(c.valid))
.unwrap();

let value = {
if valid == true {
let value = match row.get(column).and_then(|c| c.get("value")) {
Some(SerdeValue::String(s)) => Ok(format!("{}", s)),
Some(SerdeValue::Number(n)) => Ok(format!("{}", n)),
Some(SerdeValue::Bool(b)) => Ok(format!("{}", b)),
_ => Err(format!(
"Value missing or of unknown type in column {} of row to \
update: {:?}",
column, row
)),
}
.unwrap();
let value = row
.contents
.get(column)
.and_then(|c| Some(c.strvalue()))
.unwrap();
if value == "" {
"NULL".to_string()
} else {
Expand Down
Loading

0 comments on commit fc873eb

Please sign in to comment.