Skip to content

Commit

Permalink
use new config structs while loading tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lmcmicu committed Feb 12, 2024
1 parent 174a4ea commit 2dd98e9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 123 deletions.
53 changes: 19 additions & 34 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,6 @@ pub fn get_compiled_rule_conditions(
let tables_config = &config.table;
let rules_config = &config.rule;
for (rules_table, table_rules) in rules_config.iter() {
println!("Rules for {}", rules_table);
for (column_rule_key, column_rules) in table_rules.iter() {
for rule in column_rules {
let table_columns = tables_config
Expand Down Expand Up @@ -3088,7 +3087,7 @@ pub fn get_table_ddl(
.collect::<Vec<_>>()
};

let (primaries, uniques, foreigns, trees, unders) = {
let (primaries, uniques, foreigns, trees, _unders) = {
// Conflict tables have no database constraints:
if table_name.ends_with("_conflict") {
(vec![], vec![], vec![], vec![], vec![])
Expand Down Expand Up @@ -3304,7 +3303,7 @@ pub fn sort_messages(
/// and information messages generated are added to messages_stats, the contents of which will
/// later be written to stderr.
pub async fn make_inserts(
config_old: &SerdeMap,
config: &ValveConfig,
table_name: &String,
rows: &mut Vec<ResultRow>,
chunk_number: usize,
Expand Down Expand Up @@ -3332,7 +3331,7 @@ pub async fn make_inserts(
}

fn generate_sql(
config_old: &SerdeMap,
config: &ValveConfig,
main_table: &String,
columns: &Vec<String>,
rows: &mut Vec<ResultRow>,
Expand All @@ -3354,9 +3353,8 @@ pub async fn make_inserts(
let mut conflict_params = vec![];
let mut message_lines = vec![];
let mut message_params = vec![];
let dummy_config_new = ValveConfig::default();
let sorted_datatypes = get_sorted_datatypes(&dummy_config_new);
let conflict_columns = get_conflict_columns(&dummy_config_new, main_table);
let sorted_datatypes = get_sorted_datatypes(config);
let conflict_columns = get_conflict_columns(config, main_table);
for (i, row) in rows.iter_mut().enumerate() {
// enumerate begins at 0 but we need to begin at 1:
let i = i + 1;
Expand All @@ -3368,12 +3366,7 @@ pub async fn make_inserts(
let cell = row.contents.get(column).unwrap();
// Insert the value of the cell into the column unless inserting it will cause a db
// error or it has the nulltype field set, in which case insert NULL:
let sql_type = get_sql_type_from_global_config(
&ValveConfig::default(),
&main_table,
column,
pool,
);
let sql_type = get_sql_type_from_global_config(config, &main_table, column, pool);
if cell.nulltype != None || is_sql_type_error(&sql_type, &cell.value) {
row_values.push(String::from("NULL"));
} else {
Expand Down Expand Up @@ -3485,21 +3478,13 @@ pub async fn make_inserts(

// Use the "column_order" field of the table config for this table to retrieve the column names
// in the correct order:
let column_names = config_old
.get("table")
.and_then(|t| t.get(table_name))
.and_then(|t| t.get("column_order"))
.and_then(|c| c.as_array())
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let column_names = &config.table.get(table_name).unwrap().column_order;

let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) =
generate_sql(
&config_old,
config,
&table_name,
&column_names,
column_names,
rows,
chunk_number,
messages_stats,
Expand All @@ -3522,7 +3507,7 @@ pub async fn make_inserts(
/// them to the table. If the verbose flag is set to true, error/warning/info stats will be
/// collected in messages_stats and later written to stderr.
pub async fn insert_chunk(
config_old: &SerdeMap,
config: &ValveConfig,
pool: &AnyPool,
table_name: &String,
rows: &mut Vec<ResultRow>,
Expand All @@ -3534,7 +3519,7 @@ pub async fn insert_chunk(
// First, do the tree validation. TODO: I don't remember why this needs to be done first, but
// it does. Add a comment here explaining why.
if validate {
validate_rows_trees(config_old, pool, table_name, rows).await?;
validate_rows_trees(config, pool, table_name, rows).await?;
}

// Try to insert the rows to the db first without validating unique and foreign constraints.
Expand All @@ -3552,7 +3537,7 @@ pub async fn insert_chunk(
tmp_messages_stats.insert("info".to_string(), 0);
let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) =
make_inserts(
config_old,
config,
table_name,
rows,
chunk_number,
Expand Down Expand Up @@ -3604,7 +3589,7 @@ pub async fn insert_chunk(
}
Err(e) => {
if validate {
validate_rows_constraints(config_old, pool, table_name, rows).await?;
validate_rows_constraints(config, pool, table_name, rows).await?;
let (
main_sql,
main_params,
Expand All @@ -3613,7 +3598,7 @@ pub async fn insert_chunk(
message_sql,
message_params,
) = make_inserts(
config_old,
config,
table_name,
rows,
chunk_number,
Expand Down Expand Up @@ -3658,7 +3643,7 @@ pub async fn insert_chunk(
/// to the table. If the verbose flag is set to true, error/warning/info stats will be collected in
/// messages_stats and later written to stderr.
pub async fn insert_chunks(
config_old: &SerdeMap,
config: &ValveConfig,
pool: &AnyPool,
compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
Expand All @@ -3675,7 +3660,7 @@ pub async fn insert_chunks(
let mut intra_validated_rows = {
let only_nulltype = !validate;
validate_rows_intra(
config_old,
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
Expand All @@ -3685,7 +3670,7 @@ pub async fn insert_chunks(
)
};
insert_chunk(
config_old,
config,
pool,
table_name,
&mut intra_validated_rows,
Expand Down Expand Up @@ -3719,7 +3704,7 @@ pub async fn insert_chunks(
workers.push(scope.spawn(move |_| {
let only_nulltype = !validate;
validate_rows_intra(
config_old,
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
Expand All @@ -3740,7 +3725,7 @@ pub async fn insert_chunks(

for (chunk_number, mut intra_validated_rows) in results {
insert_chunk(
config_old,
config,
pool,
table_name,
&mut intra_validated_rows,
Expand Down
35 changes: 18 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ mod api_test;
use crate::api_test::run_api_tests;
use argparse::{ArgumentParser, Store, StoreTrue};
use ontodev_valve::{valve::Valve, valve::ValveError};
use serde_json::{from_str, Value as SerdeValue};
use std::{env, process};

#[async_std::main]
async fn main() -> Result<(), ValveError> {
// Command line parameters and their default values. See below for descriptions. Note that some
// of these are mutually exclusive. This is accounted for below.

// TODO: Use a more powerful command-line parser library that can automatically take care of
// things like mutually exclusive options, since argparse doesn't seem to be able to do it.

let mut ad_hoc = false; // TODO: Remove the ad_hoc parameter before merging this PR.
let mut verbose = false;
let mut api_test = false;
let mut dump_config = false;
Expand All @@ -33,6 +35,10 @@ async fn main() -> Result<(), ValveError> {
// this block limits scope of borrows by ap.refer() method
let mut ap = ArgumentParser::new();
ap.set_description(r#"Valve is a lightweight validation engine written in rust."#);

// TODO: Remove the ad_hoc parameter before merging this PR.
ap.refer(&mut ad_hoc)
.add_option(&["--ad_hoc"], StoreTrue, r#"Do something ad hoc."#);
ap.refer(&mut verbose).add_option(
&["--verbose"],
StoreTrue,
Expand Down Expand Up @@ -138,6 +144,8 @@ async fn main() -> Result<(), ValveError> {
let advice = format!("Run `{} --help` for command line usage.", program_name);

let mutually_exclusive_options = vec![
// TODO: Remove the ad_hoc parameter before merging this PR.
ad_hoc,
api_test,
dump_config,
dump_schema,
Expand Down Expand Up @@ -173,7 +181,11 @@ async fn main() -> Result<(), ValveError> {
process::exit(1);
}

if api_test {
// TODO: Remove the ad_hoc parameter before merging this PR.
if ad_hoc {
let valve = Valve::build(&source, &destination, verbose, initial_load).await?;
valve.save_all_tables(&None)?;
} else if api_test {
run_api_tests(&source, &destination).await?;
} else if save_all || save != "" {
let valve = Valve::build(&source, &destination, verbose, initial_load).await?;
Expand All @@ -192,21 +204,10 @@ async fn main() -> Result<(), ValveError> {
}
} else if dump_config {
let valve = Valve::build(&source, &destination, verbose, initial_load).await?;
let mut config = valve.config.clone();
let datatype_conditions = format!("{:?}", valve.datatype_conditions).replace(r"\", r"\\");
let datatype_conditions: SerdeValue = from_str(&datatype_conditions).unwrap();
config.insert(String::from("datatype_conditions"), datatype_conditions);

let structure_conditions = format!("{:?}", valve.structure_conditions).replace(r"\", r"\\");
let structure_conditions: SerdeValue = from_str(&structure_conditions).unwrap();
config.insert(String::from("structure_conditions"), structure_conditions);

let rule_conditions = format!("{:?}", valve.rule_conditions).replace(r"\", r"\\");
let rule_conditions: SerdeValue = from_str(&rule_conditions).unwrap();
config.insert(String::from("rule_conditions"), rule_conditions);

let config = serde_json::to_string(&config).unwrap();
println!("{}", config);
// TODO: Somehow convert this output to JSON. We will likely have to rewrite the display()
// functions for the structs involved. Note that this is required for the
// test/generate_random_test_data.py to work.
println!("{:#?}", valve);
} else if dump_schema {
let valve = Valve::build(&source, &destination, verbose, initial_load).await?;
valve.dump_schema().await?;
Expand Down
Loading

0 comments on commit 2dd98e9

Please sign in to comment.