Skip to content

Commit

Permalink
optionally load tables without validating
Browse files Browse the repository at this point in the history
  • Loading branch information
lmcmicu committed Jan 4, 2024
1 parent 8fd68ee commit 7e6bac7
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 105 deletions.
203 changes: 115 additions & 88 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{ast::Expression, valve_grammar::StartParser};
use async_recursion::async_recursion;
use chrono::Utc;
use crossbeam;
use csv::{ReaderBuilder, StringRecord, StringRecordsIter};
use enquote::unquote;
use futures::executor::block_on;
use indexmap::IndexMap;
Expand Down Expand Up @@ -1163,7 +1164,7 @@ impl Valve {
warn!("Unable to open '{}': {}", path.clone(), e);
continue;
}
Ok(table_file) => csv::ReaderBuilder::new()
Ok(table_file) => ReaderBuilder::new()
.has_headers(false)
.delimiter(b'\t')
.from_reader(table_file),
Expand Down Expand Up @@ -1202,7 +1203,7 @@ impl Valve {
// Split the data into chunks of size CHUNK_SIZE before passing them to the validation
// logic:
let chunks = records.chunks(CHUNK_SIZE);
validate_and_insert_chunks(
insert_chunks(
&self.config,
&self.pool,
&self.compiled_datatype_conditions,
Expand All @@ -1212,60 +1213,64 @@ impl Valve {
&headers,
&mut messages_stats,
self.verbose,
validate,
)
.await?;

// We need to wait until all of the rows for a table have been loaded before validating the
// "foreign" constraints on a table's trees, since this checks if the values of one column
// (the tree's parent) are all contained in another column (the tree's child):
// We also need to wait before validating a table's "under" constraints. Although the tree
// associated with such a constraint need not be defined on the same table, it can be.
let mut recs_to_update =
validate_tree_foreign_keys(&self.config, &self.pool, None, &table_name, None)
.await?;
recs_to_update.append(
&mut validate_under(&self.config, &self.pool, None, &table_name, None).await?,
);

for record in recs_to_update {
let row_number = record.get("row_number").unwrap();
let column_name = record.get("column").and_then(|s| s.as_str()).unwrap();
let value = record.get("value").and_then(|s| s.as_str()).unwrap();
let level = record.get("level").and_then(|s| s.as_str()).unwrap();
let rule = record.get("rule").and_then(|s| s.as_str()).unwrap();
let message = record.get("message").and_then(|s| s.as_str()).unwrap();
if validate {
// We need to wait until all of the rows for a table have been loaded before
// validating the "foreign" constraints on a table's trees, since this checks if the
// values of one column (the tree's parent) are all contained in another column (the
// tree's child). We also need to wait before validating a table's "under"
// constraints. Although the tree associated with such a constraint need not be
// defined on the same table, it can be.
let mut recs_to_update =
validate_tree_foreign_keys(&self.config, &self.pool, None, &table_name, None)
.await?;
recs_to_update.append(
&mut validate_under(&self.config, &self.pool, None, &table_name, None).await?,
);

let sql = local_sql_syntax(
&self.pool,
&format!(
r#"INSERT INTO "message"
for record in recs_to_update {
let row_number = record.get("row_number").unwrap();
let column_name = record.get("column").and_then(|s| s.as_str()).unwrap();
let value = record.get("value").and_then(|s| s.as_str()).unwrap();
let level = record.get("level").and_then(|s| s.as_str()).unwrap();
let rule = record.get("rule").and_then(|s| s.as_str()).unwrap();
let message = record.get("message").and_then(|s| s.as_str()).unwrap();

let sql = local_sql_syntax(
&self.pool,
&format!(
r#"INSERT INTO "message"
("table", "row", "column", "value", "level", "rule", "message")
VALUES ({}, {}, {}, {}, {}, {}, {})"#,
SQL_PARAM,
row_number,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM
),
);
let mut query = sqlx_query(&sql);
query = query.bind(&table_name);
query = query.bind(&column_name);
query = query.bind(&value);
query = query.bind(&level);
query = query.bind(&rule);
query = query.bind(&message);
query.execute(&self.pool).await?;

if self.verbose {
// Add the generated message to messages_stats:
let messages = vec![json!({
"message": message,
"level": level,
})];
add_message_counts(&messages, &mut messages_stats);
SQL_PARAM,
row_number,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM,
SQL_PARAM
),
);
let mut query = sqlx_query(&sql);
query = query.bind(&table_name);
query = query.bind(&column_name);
query = query.bind(&value);
query = query.bind(&level);
query = query.bind(&rule);
query = query.bind(&message);
query.execute(&self.pool).await?;

if self.verbose {
// Add the generated message to messages_stats:
let messages = vec![json!({
"message": message,
"level": level,
})];
add_message_counts(&messages, &mut messages_stats);
}
}
}

Expand Down Expand Up @@ -2271,7 +2276,7 @@ fn read_config_files(
// Get the actual columns from the data itself. Note that we set has_headers to
// false(even though the files have header rows) in order to explicitly read the
// header row.
let mut rdr = csv::ReaderBuilder::new()
let mut rdr = ReaderBuilder::new()
.has_headers(false)
.delimiter(b'\t')
.from_reader(File::open(path.clone()).unwrap_or_else(|err| {
Expand Down Expand Up @@ -4115,7 +4120,7 @@ async fn update_row_tx(
/// configuration tables.
fn read_tsv_into_vector(path: &str) -> Vec<ValveRow> {
let mut rdr =
csv::ReaderBuilder::new()
ReaderBuilder::new()
.delimiter(b'\t')
.from_reader(File::open(path).unwrap_or_else(|err| {
panic!("Unable to open '{}': {}", path, err);
Expand Down Expand Up @@ -5400,17 +5405,20 @@ async fn make_inserts(
/// and the chunk number corresponding to the rows, do inter-row validation on the rows and insert
/// them to the table. If the verbose flag is set to true, error/warning/info stats will be
/// collected in messages_stats and later written to stderr.
async fn validate_rows_inter_and_insert(
async fn insert_chunk(
config: &SerdeMap,
pool: &AnyPool,
table_name: &String,
rows: &mut Vec<ResultRow>,
chunk_number: usize,
messages_stats: &mut HashMap<String, usize>,
verbose: bool,
validate: bool,
) -> Result<(), sqlx::Error> {
// First, do the tree validation:
validate_rows_trees(config, pool, table_name, rows).await?;
if validate {
validate_rows_trees(config, pool, table_name, rows).await?;
}

// Try to insert the rows to the db first without validating unique and foreign constraints.
// If there are constraint violations this will cause a database error, in which case we then
Expand Down Expand Up @@ -5477,10 +5485,17 @@ async fn validate_rows_inter_and_insert(
);
}
}
Err(_) => {
validate_rows_constraints(config, pool, table_name, rows).await?;
let (main_sql, main_params, conflict_sql, conflict_params, message_sql, message_params) =
make_inserts(
Err(e) => {
if validate {
validate_rows_constraints(config, pool, table_name, rows).await?;
let (
main_sql,
main_params,
conflict_sql,
conflict_params,
message_sql,
message_params,
) = make_inserts(
config,
table_name,
rows,
Expand All @@ -5491,26 +5506,29 @@ async fn validate_rows_inter_and_insert(
)
.await?;

let main_sql = local_sql_syntax(&pool, &main_sql);
let mut main_query = sqlx_query(&main_sql);
for param in &main_params {
main_query = main_query.bind(param);
}
main_query.execute(pool).await?;
let main_sql = local_sql_syntax(&pool, &main_sql);
let mut main_query = sqlx_query(&main_sql);
for param in &main_params {
main_query = main_query.bind(param);
}
main_query.execute(pool).await?;

let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
let mut conflict_query = sqlx_query(&conflict_sql);
for param in &conflict_params {
conflict_query = conflict_query.bind(param);
}
conflict_query.execute(pool).await?;
let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
let mut conflict_query = sqlx_query(&conflict_sql);
for param in &conflict_params {
conflict_query = conflict_query.bind(param);
}
conflict_query.execute(pool).await?;

let message_sql = local_sql_syntax(&pool, &message_sql);
let mut message_query = sqlx_query(&message_sql);
for param in &message_params {
message_query = message_query.bind(param);
let message_sql = local_sql_syntax(&pool, &message_sql);
let mut message_query = sqlx_query(&message_sql);
for param in &message_params {
message_query = message_query.bind(param);
}
message_query.execute(pool).await?;
} else {
return Err(e);
}
message_query.execute(pool).await?;
}
};

Expand All @@ -5522,36 +5540,42 @@ async fn validate_rows_inter_and_insert(
/// and the headers of the rows to be inserted, validate each chunk and insert the validated rows
/// to the table. If the verbose flag is set to true, error/warning/info stats will be collected in
/// messages_stats and later written to stderr.
async fn validate_and_insert_chunks(
async fn insert_chunks(
config: &SerdeMap,
pool: &AnyPool,
compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
table_name: &String,
chunks: &IntoChunks<csv::StringRecordsIter<'_, std::fs::File>>,
headers: &csv::StringRecord,
chunks: &IntoChunks<StringRecordsIter<'_, std::fs::File>>,
headers: &StringRecord,
messages_stats: &mut HashMap<String, usize>,
verbose: bool,
validate: bool,
) -> Result<(), sqlx::Error> {
if !MULTI_THREADED {
for (chunk_number, chunk) in chunks.into_iter().enumerate() {
let mut rows: Vec<_> = chunk.collect();
let mut intra_validated_rows = validate_rows_intra(
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
headers,
&mut rows,
);
validate_rows_inter_and_insert(
let mut intra_validated_rows = {
let only_nulltype = !validate;
validate_rows_intra(
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
headers,
&mut rows,
only_nulltype,
)
};
insert_chunk(
config,
pool,
table_name,
&mut intra_validated_rows,
chunk_number,
messages_stats,
verbose,
validate,
)
.await?;
}
Expand All @@ -5576,13 +5600,15 @@ async fn validate_and_insert_chunks(
for chunk in batch.into_iter() {
let mut rows: Vec<_> = chunk.collect();
workers.push(scope.spawn(move |_| {
let only_nulltype = !validate;
validate_rows_intra(
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
headers,
&mut rows,
only_nulltype,
)
}));
}
Expand All @@ -5596,14 +5622,15 @@ async fn validate_and_insert_chunks(
.expect("A child thread panicked");

for (chunk_number, mut intra_validated_rows) in results {
validate_rows_inter_and_insert(
insert_chunk(
config,
pool,
table_name,
&mut intra_validated_rows,
chunk_number,
messages_stats,
verbose,
validate,
)
.await?;
}
Expand Down
37 changes: 20 additions & 17 deletions src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,15 +670,16 @@ pub async fn validate_rows_constraints(
}

/// Given a config map, compiled datatype and rule conditions, a table name, the headers for the
/// table, and a number of rows to validate, validate all of the rows and return the validated
/// versions.
/// table, and a number of rows to validate, run intra-row validatation on all of the rows and
/// return the validated versions.
pub fn validate_rows_intra(
config: &SerdeMap,
compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
table_name: &String,
headers: &csv::StringRecord,
rows: &Vec<Result<csv::StringRecord, csv::Error>>,
only_nulltype: bool,
) -> Vec<ResultRow> {
let mut result_rows = vec![];
for row in rows {
Expand Down Expand Up @@ -726,26 +727,28 @@ pub fn validate_rows_intra(
);
}

for column_name in &column_names {
let context = result_row.clone();
let cell = result_row.contents.get_mut(column_name).unwrap();
validate_cell_rules(
config,
compiled_rule_conditions,
table_name,
&column_name,
&context,
cell,
);

if cell.nulltype == None {
validate_cell_datatype(
if !only_nulltype {
for column_name in &column_names {
let context = result_row.clone();
let cell = result_row.contents.get_mut(column_name).unwrap();
validate_cell_rules(
config,
compiled_datatype_conditions,
compiled_rule_conditions,
table_name,
&column_name,
&context,
cell,
);

if cell.nulltype == None {
validate_cell_datatype(
config,
compiled_datatype_conditions,
table_name,
&column_name,
cell,
);
}
}
}
result_rows.push(result_row);
Expand Down

0 comments on commit 7e6bac7

Please sign in to comment.