Skip to content

Commit

Permalink
add --show_deps_in and --show_deps_out options
Browse files Browse the repository at this point in the history
  • Loading branch information
lmcmicu committed Jan 3, 2024
1 parent 7a1b17c commit 10638b9
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 96 deletions.
182 changes: 88 additions & 94 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ pub struct Valve {
/// TODO: Add docstring here.
pub parsed_structure_conditions: HashMap<String, ParsedStructure>,
/// TODO: Add docstring here.
pub table_dependencies: HashMap<String, Vec<String>>,
pub table_dependencies_in: HashMap<String, Vec<String>>,
/// TODO: Add docstring here.
pub table_dependencies_out: HashMap<String, Vec<String>>,
/// TODO: Add docstring here.
pub pool: AnyPool,
/// TODO: Add docstring here.
Expand Down Expand Up @@ -248,7 +250,8 @@ impl Valve {
rules_config,
constraints_config,
sorted_table_list,
table_dependencies,
table_dependencies_in,
table_dependencies_out,
) = read_config_files(table_path, &parser, &pool);

let mut global_config = SerdeMap::new();
Expand Down Expand Up @@ -295,7 +298,8 @@ impl Valve {
compiled_datatype_conditions: compiled_datatype_conditions,
compiled_rule_conditions: compiled_rule_conditions,
parsed_structure_conditions: parsed_structure_conditions,
table_dependencies: table_dependencies,
table_dependencies_in: table_dependencies_in,
table_dependencies_out: table_dependencies_out,
pool: pool,
user: String::from("VALVE"),
verbose: verbose,
Expand Down Expand Up @@ -538,7 +542,7 @@ impl Valve {
Ok(false)
};

let (columns_config, configured_column_order, description, table_type, path) = {
let (columns_config, configured_column_order) = {
let table_config = self
.global_config
.get("table")
Expand Down Expand Up @@ -570,32 +574,8 @@ impl Valve {
);
configured_column_order
};
let description = table_config
.get("description")
.and_then(|c| c.as_str())
.unwrap();
let table_type = {
if table != "message" && table != "history" {
table_config.get("type").and_then(|c| c.as_str())
} else {
None
}
};
let path = {
if table != "message" && table != "history" {
table_config.get("path").and_then(|c| c.as_str())
} else {
None
}
};

(
columns_config,
configured_column_order,
description,
table_type,
path,
)
(columns_config, configured_column_order)
};

let db_columns_in_order = {
Expand Down Expand Up @@ -679,47 +659,6 @@ impl Valve {
return Ok(true);
}

// Check, for tables other than "message" and "history", whether the corresponding entries
// for 'description', 'type', and 'path' in the configuration match the contents of the
// table table:
if table != "message" && table != "history" {
for table_param in vec![
("description", Some(description)),
("type", table_type),
("path", path),
] {
let column = table_param.0;
let is_clause = if self.pool.any_kind() == AnyKind::Sqlite {
"IS"
} else {
"IS NOT DISTINCT FROM"
};
let eq_value = match table_param.1 {
Some(value) => format!("= '{}'", value),
None => format!("{} NULL", is_clause),
};
let sql = format!(
r#"SELECT 1 from "table" WHERE "table" = '{}' AND "{}" {}"#,
table, column, eq_value,
);
let rows = sqlx_query(&sql).fetch_all(&self.pool).await?;
if rows.len() == 0 {
if self.verbose {
info!(
"The table '{table}' will be recreated because the entries in the \
table table for '{table}' have changed.",
table = table
);
}
return Ok(true);
} else if rows.len() > 1 {
if self.verbose {
warn!("More than one row was returned from the query '{}'", sql);
}
}
}
}

// Check, for all tables, whether their column configuration matches the contents of the
// database:
for (cname, ctype, pk) in &db_columns_in_order {
Expand Down Expand Up @@ -916,20 +855,6 @@ impl Valve {
Ok(())
}

/// TODO: Add docstring here.
fn get_dependent_tables(&self, table: &str) -> Vec<String> {
let mut dependent_tables = vec![];
if table != "message" && table != "history" {
let direct_deps = self.table_dependencies.get(table).unwrap().to_vec();
for direct_dep in direct_deps {
let mut indirect_deps = self.get_dependent_tables(&direct_dep);
dependent_tables.append(&mut indirect_deps);
dependent_tables.push(direct_dep);
}
}
dependent_tables
}

/// Create all configured database tables and views if they do not already exist as configured.
pub async fn create_all_tables(&self) -> Result<&Self, sqlx::Error> {
// DatabaseError
Expand Down Expand Up @@ -977,10 +902,31 @@ impl Valve {
return Ok(rows.len() > 0);
}

pub fn order_tables(&self, tables: &Vec<&str>, deletion_order: bool) -> Vec<String> {
/// TODO: Add docstring here.
fn get_dependencies(&self, table: &str, incoming: bool) -> Vec<String> {
let mut dependent_tables = vec![];
if table != "message" && table != "history" {
let direct_deps = {
if incoming {
self.table_dependencies_in.get(table).unwrap().to_vec()
} else {
self.table_dependencies_out.get(table).unwrap().to_vec()
}
};
for direct_dep in direct_deps {
let mut indirect_deps = self.get_dependencies(&direct_dep, incoming);
dependent_tables.append(&mut indirect_deps);
dependent_tables.push(direct_dep);
}
}
dependent_tables
}

/// TODO: Add docstring here.
fn add_dependencies(&self, tables: &Vec<&str>, deletion_order: bool) -> Vec<String> {
let mut with_dups = vec![];
for table in tables {
let dependent_tables = self.get_dependent_tables(table);
let dependent_tables = self.get_dependencies(table, true);
for dep_table in dependent_tables {
with_dups.push(dep_table.to_string());
}
Expand All @@ -1001,6 +947,40 @@ impl Valve {
tables_in_order
}

/// TODO: Add docstring here.
fn _order_tables(&self, tables: &Vec<&str>, reverse: bool) -> Vec<String> {
let constraints_config = self
.global_config
.get("constraints")
.and_then(|c| c.as_object())
.unwrap();

// Filter out message and history since they are not represented in the constraints config.
// They will be added implicitly to the list returned by verify_table_deps_and_sort.
let tables = tables
.iter()
.filter(|m| **m != "history" && **m != "message")
.map(|s| s.to_string())
.collect::<Vec<_>>();

let (mut sorted_table_list, _, _) =
verify_table_deps_and_sort(&tables, &constraints_config);
if reverse {
sorted_table_list.reverse();
}
sorted_table_list
}

/// TODO: Add docstring here.
pub fn collect_dependencies(&self, incoming: bool) -> IndexMap<String, Vec<String>> {
let tables = self.get_sorted_table_list(false);
let mut dependencies = IndexMap::new();
for table in tables {
dependencies.insert(table.to_string(), self.get_dependencies(table, incoming));
}
dependencies
}

/// Drop all configured tables, in reverse dependency order.
pub async fn drop_all_tables(&self) -> Result<&Self, sqlx::Error> {
// DatabaseError
Expand All @@ -1016,7 +996,7 @@ impl Valve {
pub async fn drop_tables(&self, tables: &Vec<&str>) -> Result<&Self, sqlx::Error> {
// DatabaseError

let drop_list = self.order_tables(tables, true);
let drop_list = self.add_dependencies(tables, true);
if self.interactive {
let auto_drops = drop_list
.iter()
Expand Down Expand Up @@ -1063,7 +1043,7 @@ impl Valve {

self.create_all_tables().await?;

let truncate_list = self.order_tables(tables, true);
let truncate_list = self.add_dependencies(tables, true);
if self.interactive {
let auto_truncates = truncate_list
.iter()
Expand Down Expand Up @@ -1962,6 +1942,7 @@ fn read_config_files(
SerdeMap,
Vec<String>,
HashMap<String, Vec<String>>,
HashMap<String, Vec<String>>,
) {
let special_table_types = json!({
"table": {"required": true},
Expand Down Expand Up @@ -2503,7 +2484,7 @@ fn read_config_files(

// Sort the tables (aside from the message and history tables) according to their foreign key
// dependencies so that tables are always loaded after the tables they depend on.
let (sorted_tables, table_dependencies) = verify_table_deps_and_sort(
let (sorted_tables, table_dependencies_in, table_dependencies_out) = verify_table_deps_and_sort(
&tables_config
.keys()
.cloned()
Expand All @@ -2524,7 +2505,8 @@ fn read_config_files(
rules_config,
constraints_config,
sorted_tables,
table_dependencies,
table_dependencies_in,
table_dependencies_out,
)
}

Expand Down Expand Up @@ -4493,7 +4475,11 @@ fn local_sql_syntax(pool: &AnyPool, sql: &String) -> String {
fn verify_table_deps_and_sort(
table_list: &Vec<String>,
constraints: &SerdeMap,
) -> (Vec<String>, HashMap<String, Vec<String>>) {
) -> (
Vec<String>,
HashMap<String, Vec<String>>,
HashMap<String, Vec<String>>,
) {
fn get_cycles(g: &DiGraphMap<&str, ()>) -> Result<Vec<String>, Vec<Vec<String>>> {
let mut cycles = vec![];
match toposort(&g, None) {
Expand Down Expand Up @@ -4622,18 +4608,26 @@ fn verify_table_deps_and_sort(

match get_cycles(&dependency_graph) {
Ok(sorted_table_list) => {
let mut table_dependencies = HashMap::new();
let mut table_dependencies_in = HashMap::new();
for node in dependency_graph.nodes() {
let neighbors = dependency_graph
.neighbors_directed(node, petgraph::Direction::Incoming)
.map(|n| n.to_string())
.collect::<Vec<_>>();
table_dependencies.insert(node.to_string(), neighbors);
table_dependencies_in.insert(node.to_string(), neighbors);
}
let mut table_dependencies_out = HashMap::new();
for node in dependency_graph.nodes() {
let neighbors = dependency_graph
.neighbors_directed(node, petgraph::Direction::Outgoing)
.map(|n| n.to_string())
.collect::<Vec<_>>();
table_dependencies_out.insert(node.to_string(), neighbors);
}
let mut sorted_table_list = sorted_table_list.clone();
let mut with_specials = vec!["message".to_string(), "history".to_string()];
with_specials.append(&mut sorted_table_list);
return (with_specials, table_dependencies);
return (with_specials, table_dependencies_in, table_dependencies_out);
}
Err(cycles) => {
let mut message = String::new();
Expand Down
37 changes: 35 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ async fn main() -> Result<(), sqlx::Error> {
let mut dump_config = false;
let mut dump_schema = false;
let mut table_order = false;
let mut show_deps_in = false;
let mut show_deps_out = false;
let mut drop_all = false;
let mut create_only = false;
let mut initial_load = false;
Expand Down Expand Up @@ -68,6 +70,16 @@ async fn main() -> Result<(), sqlx::Error> {
StoreTrue,
r#"Display the order in which tables must be created or dropped."#,
);
ap.refer(&mut show_deps_in).add_option(
&["--show_deps_in"],
StoreTrue,
r#"Display the incoming dependencies for each configured table."#,
);
ap.refer(&mut show_deps_out).add_option(
&["--show_deps_out"],
StoreTrue,
r#"Display the outgoing dependencies for each configured table."#,
);
ap.refer(&mut drop_all).add_option(
&["--drop_all"],
StoreTrue,
Expand Down Expand Up @@ -146,8 +158,29 @@ async fn main() -> Result<(), sqlx::Error> {
valve.dump_schema().await?;
} else if table_order {
let valve = Valve::build(&source, &destination, verbose, initial_load, interactive).await?;
let dependency_order = valve.get_sorted_table_list(false);
println!("{}", dependency_order.join(", "));
let sorted_table_list = valve.get_sorted_table_list(false);
println!("{}", sorted_table_list.join(", "));
} else if show_deps_in || show_deps_out {
let valve = Valve::build(&source, &destination, verbose, initial_load, interactive).await?;
let dependencies = valve.collect_dependencies(show_deps_in);
for (table, deps) in dependencies.iter() {
let deps = {
let deps = deps.iter().map(|s| format!("'{}'", s)).collect::<Vec<_>>();
if deps.is_empty() {
"None".to_string()
} else {
deps.join(", ")
}
};
let preamble = {
if show_deps_in {
format!("Tables that depend on '{}'", table)
} else {
format!("Table '{}' depends on", table)
}
};
println!("{}: {}", preamble, deps);
}
} else if drop_all {
let valve = Valve::build(&source, &destination, verbose, initial_load, interactive).await?;
valve.drop_all_tables().await?;
Expand Down

0 comments on commit 10638b9

Please sign in to comment.