diff --git a/src/validate.rs b/src/validate.rs index 5c21cc2..058c094 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -145,6 +145,190 @@ pub async fn validate_row_tx( Ok(valve_row) } +/// Given a config map, a db connection pool, a table name, and an optional extra row, validate +/// any associated under constraints for the current column. Optionally, if a transaction is +/// given, use that instead of the pool for database access. +pub async fn validate_under( + config: &ValveConfig, + pool: &AnyPool, + mut tx: Option<&mut Transaction<'_, sqlx::Any>>, + table_name: &String, + extra_row: Option<&ValveRow>, +) -> Result> { + let mut results = vec![]; + let ukeys = config + .constraint + .under + .get(table_name) + .expect(&format!("Undefined table '{}'", table_name)); + + let table_options = get_table_options(config, table_name)?; + let query_table = { + if !table_options.contains("conflict") { + table_name.to_string() + } else { + format!("{}_view", table_name) + } + }; + for ukey in ukeys { + let tree_table = &ukey.ttable; + let tree_child = &ukey.tcolumn; + let column = &ukey.column; + let sql_type = get_sql_type_from_global_config(&config, &table_name, &column, pool); + let tree = config + .constraint + .tree + .get(tree_table) + .unwrap() + .iter() + .find(|tkey| tkey.child == *tree_child) + .unwrap(); + let tree_parent = &tree.parent; + let mut extra_clause; + let mut params; + if let Some(ref extra_row) = extra_row { + (extra_clause, params) = + select_with_extra_row(config, extra_row, table_name, &query_table, pool); + } else { + extra_clause = String::new(); + params = vec![]; + } + + // For each value of the column to be checked: + // (1) Determine whether it is in the tree's child column. + // (2) Create a sub-tree of the given tree whose root is the given "under value" + // (i.e., ukey["value"]). Now on the one hand, if the value to be checked is in the + // parent column of that sub-tree, then it follows that that value is _not_ under the + // under value, but above it. On the other hand, if the value to be checked is not in + // the parent column of the sub-tree, then if condition (1) is also satisfied it follows + // that it _is_ under the under_value. + // Note that "under" is interpreted in the inclusive sense; i.e., values are trivially + // understood to be under themselves. + let effective_table; + if !extra_clause.is_empty() { + effective_table = format!("{}_ext", query_table); + } else { + effective_table = query_table.clone(); + } + + let effective_tree; + if tree_table == table_name { + effective_tree = effective_table.to_string(); + } else { + effective_tree = tree_table.to_string(); + } + + let uval = ukey + .value + .as_str() + .and_then(|v| Some(v.to_string())) + .unwrap(); + let (tree_sql, mut tree_params) = with_tree_sql( + config, + &tree, + &table_name, + &effective_tree, + Some(&uval), + None, + pool, + ); + // Add the tree params to the beginning of the parameter list: + tree_params.append(&mut params); + params = tree_params; + + // Remove the 'WITH' part of the extra clause since it is redundant given the tree sql and + // will therefore result in a syntax error: + if !extra_clause.is_empty() { + extra_clause = format!(", {}", &extra_clause[5..]); + } + let sql = local_sql_syntax( + &pool, + &format!( + r#"{tree_sql} {extra_clause} + SELECT + "row_number", + "{effective_table}"."{column}", + CASE + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_child}" FROM "{effective_tree}" + ) + THEN 1 ELSE 0 + END AS "is_in_tree", + CASE + WHEN "{effective_table}"."{column}" IN ( + SELECT "{tree_parent}" FROM "tree" + ) + THEN 0 ELSE 1 + END AS "is_under" + FROM "{effective_table}""#, + tree_sql = tree_sql, + extra_clause = extra_clause, + effective_table = effective_table, + column = column, + tree_child = tree_child, + effective_tree = effective_tree, + tree_parent = tree_parent, + ), + ); + + let mut query = sqlx_query(&sql); + for param in ¶ms { + query = query.bind(param); + } + let rows = { + if let None = tx { + query.fetch_all(pool).await? + } else { + query + .fetch_all(tx.as_mut().unwrap().acquire().await?) + .await? + } + }; + + for row in rows { + let raw_row_number = row.try_get_raw("row_number").unwrap(); + let row_number: i64; + if raw_row_number.is_null() { + row_number = 0; + } else { + row_number = row.get("row_number"); + } + + let raw_column_val = row.try_get_raw(format!(r#"{}"#, column).as_str()).unwrap(); + if !raw_column_val.is_null() { + let column_val = get_column_value_as_string(&row, &column, &sql_type); + // We use i32 instead of i64 (which we use for row_number) here because, unlike + // row_number, which is a BIGINT, 0 and 1 are being interpreted as normal sized ints. + let is_in_tree: i32 = row.get("is_in_tree"); + let is_under: i32 = row.get("is_under"); + if is_in_tree == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-in-tree", + "message": format!("Value '{}' of column {} is not in {}.{}", + column_val, column, tree_table, tree_child).as_str(), + })); + } else if is_under == 0 { + results.push(json!({ + "row_number": row_number as u32, + "column": column, + "value": column_val, + "level": "error", + "rule": "under:not-under", + "message": format!("Value '{}' of column {} is not under '{}'", + column_val, column, uval.clone()).as_str(), + })); + } + } + } + } + + Ok(results) +} + /// Given a config map, a db connection pool, and a table name, validate whether there is a /// 'foreign key' violation for any of the table's trees; i.e., for a given tree: tree(child) which /// has a given parent column, validate that all of the values in the parent column are in the child