Skip to content

Commit

Permalink
Revert "remove no longer needed validate_under()"
Browse files Browse the repository at this point in the history
(we will remove this function later along with all of the code related
to under validation, but for now we will leave it)

This reverts commit ddccd78.
  • Loading branch information
lmcmicu committed Jul 17, 2024
1 parent ddccd78 commit 1858972
Showing 1 changed file with 184 additions and 0 deletions.
184 changes: 184 additions & 0 deletions src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,190 @@ pub async fn validate_row_tx(
Ok(valve_row)
}

/// Given a config map, a db connection pool, a table name, and an optional extra row, validate
/// any associated under constraints for the current column. Optionally, if a transaction is
/// given, use that instead of the pool for database access.
pub async fn validate_under(
config: &ValveConfig,
pool: &AnyPool,
mut tx: Option<&mut Transaction<'_, sqlx::Any>>,
table_name: &String,
extra_row: Option<&ValveRow>,
) -> Result<Vec<SerdeValue>> {
let mut results = vec![];
let ukeys = config
.constraint
.under
.get(table_name)
.expect(&format!("Undefined table '{}'", table_name));

let table_options = get_table_options(config, table_name)?;
let query_table = {
if !table_options.contains("conflict") {
table_name.to_string()
} else {
format!("{}_view", table_name)
}
};
for ukey in ukeys {
let tree_table = &ukey.ttable;
let tree_child = &ukey.tcolumn;
let column = &ukey.column;
let sql_type = get_sql_type_from_global_config(&config, &table_name, &column, pool);
let tree = config
.constraint
.tree
.get(tree_table)
.unwrap()
.iter()
.find(|tkey| tkey.child == *tree_child)
.unwrap();
let tree_parent = &tree.parent;
let mut extra_clause;
let mut params;
if let Some(ref extra_row) = extra_row {
(extra_clause, params) =
select_with_extra_row(config, extra_row, table_name, &query_table, pool);
} else {
extra_clause = String::new();
params = vec![];
}

// For each value of the column to be checked:
// (1) Determine whether it is in the tree's child column.
// (2) Create a sub-tree of the given tree whose root is the given "under value"
// (i.e., ukey["value"]). Now on the one hand, if the value to be checked is in the
// parent column of that sub-tree, then it follows that that value is _not_ under the
// under value, but above it. On the other hand, if the value to be checked is not in
// the parent column of the sub-tree, then if condition (1) is also satisfied it follows
// that it _is_ under the under_value.
// Note that "under" is interpreted in the inclusive sense; i.e., values are trivially
// understood to be under themselves.
let effective_table;
if !extra_clause.is_empty() {
effective_table = format!("{}_ext", query_table);
} else {
effective_table = query_table.clone();
}

let effective_tree;
if tree_table == table_name {
effective_tree = effective_table.to_string();
} else {
effective_tree = tree_table.to_string();
}

let uval = ukey
.value
.as_str()
.and_then(|v| Some(v.to_string()))
.unwrap();
let (tree_sql, mut tree_params) = with_tree_sql(
config,
&tree,
&table_name,
&effective_tree,
Some(&uval),
None,
pool,
);
// Add the tree params to the beginning of the parameter list:
tree_params.append(&mut params);
params = tree_params;

// Remove the 'WITH' part of the extra clause since it is redundant given the tree sql and
// will therefore result in a syntax error:
if !extra_clause.is_empty() {
extra_clause = format!(", {}", &extra_clause[5..]);
}
let sql = local_sql_syntax(
&pool,
&format!(
r#"{tree_sql} {extra_clause}
SELECT
"row_number",
"{effective_table}"."{column}",
CASE
WHEN "{effective_table}"."{column}" IN (
SELECT "{tree_child}" FROM "{effective_tree}"
)
THEN 1 ELSE 0
END AS "is_in_tree",
CASE
WHEN "{effective_table}"."{column}" IN (
SELECT "{tree_parent}" FROM "tree"
)
THEN 0 ELSE 1
END AS "is_under"
FROM "{effective_table}""#,
tree_sql = tree_sql,
extra_clause = extra_clause,
effective_table = effective_table,
column = column,
tree_child = tree_child,
effective_tree = effective_tree,
tree_parent = tree_parent,
),
);

let mut query = sqlx_query(&sql);
for param in &params {
query = query.bind(param);
}
let rows = {
if let None = tx {
query.fetch_all(pool).await?
} else {
query
.fetch_all(tx.as_mut().unwrap().acquire().await?)
.await?
}
};

for row in rows {
let raw_row_number = row.try_get_raw("row_number").unwrap();
let row_number: i64;
if raw_row_number.is_null() {
row_number = 0;
} else {
row_number = row.get("row_number");
}

let raw_column_val = row.try_get_raw(format!(r#"{}"#, column).as_str()).unwrap();
if !raw_column_val.is_null() {
let column_val = get_column_value_as_string(&row, &column, &sql_type);
// We use i32 instead of i64 (which we use for row_number) here because, unlike
// row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints.
let is_in_tree: i32 = row.get("is_in_tree");
let is_under: i32 = row.get("is_under");
if is_in_tree == 0 {
results.push(json!({
"row_number": row_number as u32,
"column": column,
"value": column_val,
"level": "error",
"rule": "under:not-in-tree",
"message": format!("Value '{}' of column {} is not in {}.{}",
column_val, column, tree_table, tree_child).as_str(),
}));
} else if is_under == 0 {
results.push(json!({
"row_number": row_number as u32,
"column": column,
"value": column_val,
"level": "error",
"rule": "under:not-under",
"message": format!("Value '{}' of column {} is not under '{}'",
column_val, column, uval.clone()).as_str(),
}));
}
}
}
}

Ok(results)
}

/// Given a config map, a db connection pool, and a table name, validate whether there is a
/// 'foreign key' violation for any of the table's trees; i.e., for a given tree: tree(child) which
/// has a given parent column, validate that all of the values in the parent column are in the child
Expand Down

0 comments on commit 1858972

Please sign in to comment.