diff --git a/src/main.rs b/src/main.rs index bbee1b0..f76d383 100644 --- a/src/main.rs +++ b/src/main.rs @@ -295,7 +295,7 @@ async fn main() -> Result<()> { } => { exit_unless_tsv(source); let valve = build_valve(source, destination).unwrap(); - valve.save_all_tables(&save_dir).unwrap(); + valve.save_all_tables(&save_dir).await.unwrap(); } Commands::Save { save_dir, @@ -310,7 +310,7 @@ async fn main() -> Result<()> { .filter(|s| *s != "") .map(|s| s.as_str()) .collect::>(); - valve.save_tables(&tables, &save_dir).unwrap(); + valve.save_tables(&tables, &save_dir).await.unwrap(); } Commands::Guess { sample_size, diff --git a/src/toolkit.rs b/src/toolkit.rs index 91681d0..5565469 100644 --- a/src/toolkit.rs +++ b/src/toolkit.rs @@ -4023,7 +4023,7 @@ pub fn verify_table_deps_and_sort( } /// Given a global configuration struct and a table name, returns the options for the table. -pub fn get_table_options(config: &ValveConfig, table: &str) -> Result> { +pub fn get_table_options_from_config(config: &ValveConfig, table: &str) -> Result> { Ok(config .table .get(table) diff --git a/src/validate.rs b/src/validate.rs index acdb938..267edcf 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -4,9 +4,9 @@ use crate::{ ast::Expression, toolkit::{ cast_sql_param_from_text, get_column_value, get_column_value_as_string, - get_datatype_ancestors, get_sql_type_from_global_config, get_table_options, get_value_type, - is_sql_type_error, local_sql_syntax, ColumnRule, CompiledCondition, QueryAsIf, - QueryAsIfKind, ValueType, + get_datatype_ancestors, get_sql_type_from_global_config, get_table_options_from_config, + get_value_type, is_sql_type_error, local_sql_syntax, ColumnRule, CompiledCondition, + QueryAsIf, QueryAsIfKind, ValueType, }, valve::{ ValveCell, ValveCellMessage, ValveConfig, ValveRow, ValveRuleConfig, ValveTreeConstraint, @@ -165,7 +165,7 @@ pub async fn validate_tree_foreign_keys( .get(table_name) .expect(&format!("Undefined table '{}'", table_name)); - let table_options = get_table_options(config, table_name)?; + let table_options = get_table_options_from_config(config, table_name)?; let query_table = { if !table_options.contains("conflict") { table_name.to_string() @@ -1400,7 +1400,7 @@ pub async fn validate_cell_unique_constraints( _ => cached_value.to_string() == cell.value, })), None => { - let table_options = get_table_options(config, table_name)?; + let table_options = get_table_options_from_config(config, table_name)?; // If the table does not have a conflict table then there is no view to check, so // we check the table itself. let mut query_table = { diff --git a/src/valve.rs b/src/valve.rs index 89f7d26..ce9824b 100644 --- a/src/valve.rs +++ b/src/valve.rs @@ -708,7 +708,7 @@ impl Valve { .iter() .filter(|t| { let table_options = self - .get_table_options(t) + .get_table_options_from_config(t) .expect(&format!("Error getting options for table '{}'", t)); table_options.contains(option) }) @@ -1568,8 +1568,8 @@ impl Valve { } /// Given a table name, returns the options of the table. - pub fn get_table_options(&self, table: &str) -> Result> { - toolkit::get_table_options(&self.config, table) + pub fn get_table_options_from_config(&self, table: &str) -> Result> { + toolkit::get_table_options_from_config(&self.config, table) } /// Drop all configured tables, in reverse dependency order. @@ -1632,7 +1632,7 @@ impl Valve { }; for table in &truncate_list { - let table_options = self.get_table_options(table)?; + let table_options = self.get_table_options_from_config(table)?; if table_options.contains("truncate") { let sql = truncate_sql(&table); self.execute_sql(&sql).await?; @@ -1875,45 +1875,64 @@ impl Valve { Ok(self) } + /// Returns true if the Valve instance has options enabled, according to the currently loaded + /// configuration. + pub async fn options_enabled_in_config(&self) -> Result { + // Calling is_empty() is good enough to determine if the options column is enabled, since + // if the options are blank in the .tsv file for the table table, then it will be given + // the default option set implicitly when normalized_options() is called while reading + // the file. + Ok(!self + .config + .table + .get("table") + .expect("Could not retrieve table table configuration") + .options + .is_empty()) + } + + /// Returns true if the Valve instance has options enabled, according to the database. + pub async fn options_enabled_in_db(&self) -> Result { + if self.pool.any_kind() == AnyKind::Postgres { + let sql = r#"SELECT 1 + FROM "information_schema"."columns" + WHERE "table_name" = 'table' + AND "column_name" = 'options' + LIMIT 1"#; + Ok(!block_on(sqlx_query(&sql).fetch_all(&self.pool))?.is_empty()) + } else { + let sql = r#"PRAGMA TABLE_INFO("table")"#; + Ok(!block_on(sqlx_query(&sql).fetch_all(&self.pool))? + .iter() + .filter(|r| r.get::<&str, &str>("name") == "options") + .collect::>() + .is_empty()) + } + } + /// Save all configured editable tables to their configured paths, unless save_dir is specified, /// in which case save them there instead. - pub fn save_all_tables(&self, save_dir: &Option) -> Result<&Self> { - let options_missing = { - if self.pool.any_kind() == AnyKind::Postgres { - let sql = r#"SELECT 1 - FROM "information_schema"."columns" - WHERE "table_name" = 'table' - AND "column_name" = 'options' - LIMIT 1"#; - block_on(sqlx_query(&sql).fetch_all(&self.pool))?.is_empty() - } else { - let sql = r#"PRAGMA TABLE_INFO("table")"#; - block_on(sqlx_query(&sql).fetch_all(&self.pool))? - .iter() - .filter(|r| r.get::<&str, &str>("name") == "options") - .collect::>() - .is_empty() - } - }; + pub async fn save_all_tables(&self, save_dir: &Option) -> Result<&Self> { + let options_enabled = self.options_enabled_in_db().await?; // Collect tables from the 'table' table. let mut tables = vec![]; let sql = { - if options_missing { + if !options_enabled { r#"SELECT "table" FROM "table""# } else { r#"SELECT "table", "options" FROM "table""# } }; let mut stream = sqlx_query(&sql).fetch(&self.pool); - while let Some(row) = block_on(stream.try_next())? { + while let Some(row) = stream.try_next().await? { let table = row .try_get::<&str, &str>("table") .ok() .ok_or(ValveError::InputError( "No column \"table\" found in row".to_string(), ))?; - if !options_missing { + if options_enabled { let options = row .try_get::<&str, &str>("options")? .split(" ") @@ -1930,54 +1949,87 @@ impl Valve { } let tables = tables.iter().map(|s| s.as_str()).collect_vec(); - self.save_tables(&tables, save_dir)?; + self.save_tables(&tables, save_dir).await?; Ok(self) } /// Given a vector of table names, save those tables to their configured path's, unless /// save_dir is specified, in which case save them there instead. - pub fn save_tables(&self, tables: &Vec<&str>, save_dir: &Option) -> Result<&Self> { - // TODO: Be more careful about the options column + pub async fn save_tables( + &self, + tables: &Vec<&str>, + save_dir: &Option, + ) -> Result<&Self> { if self.verbose { println!("Saving tables: {} ...", tables.join(", ")); } + let options_enabled = self.options_enabled_in_db().await?; + let sql = { + if options_enabled { + format!( + r#"SELECT "table", "path", "options" FROM "table" WHERE "table" IN ('{}')"#, + tables.join("', '") + ) + } else { + format!( + r#"SELECT "table", "path" FROM "table" WHERE "table" IN ('{}')"#, + tables.join("', '") + ) + } + }; - let sql = format!( - r#"SELECT "table", "path", "options" FROM "table" WHERE "table" IN ('{}')"#, - tables.join("', '") - ); let mut stream = sqlx_query(&sql).fetch(&self.pool); - while let Some(row) = block_on(stream.try_next())? { - let table = row.try_get::<&str, &str>("table").ok().unwrap_or_default(); + while let Some(row) = stream.try_next().await? { + let table = row + .try_get::<&str, &str>("table") + .ok() + .ok_or(ValveError::InputError( + "No column \"table\" found in row".to_string(), + ))?; let options = row .try_get::<&str, &str>("options") + .unwrap_or_default() + .split(" ") + .collect::>(); + let (options, _) = normalize_options(&options, 0)?; + + let path = row + .try_get::<&str, &str>("path") .ok() - .unwrap_or_default(); - let row_options = options.split(" ").collect::>(); - match normalize_options(&row_options, 0) { - Err(_) => { - log::warn!("Invalid table options: {options}"); - continue; - } - Ok((row_options, _)) => { - if !row_options.contains("save") { - log::warn!("Saving '{}' is not supported", table,); - continue; + .ok_or(ValveError::InputError( + "No column \"table\" found in row".to_string(), + ))?; + let path = match save_dir { + Some(save_dir) => { + if !options.contains("save") { + let path_dir = Path::new(path) + .parent() + .and_then(|n| n.to_str()) + .unwrap_or(""); + if path_dir == save_dir { + return Err(ValveError::InputError(format!( + "Option 'save' is not set for table '{}'. Cannot overwrite '{}'", + table, path + )) + .into()); + } } + format!( + "{}/{}", + save_dir, + Path::new(path).file_name().and_then(|n| n.to_str()).ok_or( + ValveError::InputError(format!("Unable to save to '{}'", path)) + )? + ) } - }; - - let path = row.try_get::<&str, &str>("path").ok().unwrap_or_default(); - let path = match save_dir { - Some(s) => format!( - "{}/{}", - s, - Path::new(path).file_name().and_then(|n| n.to_str()).ok_or( - ValveError::InputError(format!("Unable to save to '{}'", path)) - )?, - ), None => { - if !path.ends_with(".tsv") { + if !options.contains("save") { + return Err(ValveError::InputError(format!( + "Saving '{}' is not supported", + table + )) + .into()); + } else if !path.ends_with(".tsv") { return Err(ValveError::InputError(format!( "Refusing to save to non-tsv file '{}'", path @@ -1987,14 +2039,14 @@ impl Valve { path.to_string() } }; - let _ = self.save_table(table, path.as_str()); + self.save_table(table, path.as_str()).await?; } Ok(self) } /// Save the given table with the given columns at the given path as a TSV file. - pub fn save_table(&self, table: &str, path: &str) -> Result<&Self> { + pub async fn save_table(&self, table: &str, path: &str) -> Result<&Self> { // TODO: Be more careful about the options and format columns. // Uses the given (unverified) printf-style format string and the given compiled regular // expression (which is used to verify the given format) to format the given cell. @@ -2056,7 +2108,7 @@ impl Valve { // Check that the table options allow save. // TODO: handle "Save as..." case let sql = format!("SELECT \"options\" FROM \"table\" WHERE \"table\" = '{table}'"); - match block_on(sqlx_query(&sql).fetch_one(&self.pool)) { + match sqlx_query(&sql).fetch_one(&self.pool).await { Err(_) => { return Err( ValveError::DataError(format!("No such table in database: '{table}'")).into(), @@ -2096,7 +2148,7 @@ impl Valve { r#"SELECT "column", "label", "format" FROM "column" LEFT JOIN "datatype" ON "column"."datatype" = "datatype"."datatype" WHERE "table" = '{table}'"# ); let mut stream = sqlx_query(&sql).fetch(&self.pool); - while let Some(row) = block_on(stream.try_next())? { + while let Some(row) = stream.try_next().await? { let column = row.try_get::<&str, &str>("column").ok().unwrap_or_default(); let label = row.try_get::<&str, &str>("label").ok().unwrap_or_default(); let format = row.try_get::<&str, &str>("format").ok().unwrap_or_default(); @@ -2142,7 +2194,7 @@ impl Valve { .from_path(path)?; writer.write_record(labels)?; let mut stream = sqlx_query(&sql).fetch(&self.pool); - while let Some(row) = block_on(stream.try_next())? { + while let Some(row) = stream.try_next().await? { let mut record: Vec = vec![]; for (i, column) in columns.iter().enumerate() { let cell = row.try_get::<&str, &str>(column).ok().unwrap_or_default(); @@ -2249,7 +2301,7 @@ impl Valve { /// validate and insert the row to the table and return the row number of the inserted row /// and the row itself in the form of a [ValveRow]. pub async fn insert_row(&self, table_name: &str, row: &JsonRow) -> Result<(u32, ValveRow)> { - let table_options = &self.get_table_options(table_name)?; + let table_options = &self.get_table_options_from_config(table_name)?; if !table_options.contains("edit") { return Err(ValveError::InputError(format!( "Inserting to table '{}' is not allowed", @@ -2309,7 +2361,7 @@ impl Valve { row_number: &u32, row: &JsonRow, ) -> Result { - let table_options = &self.get_table_options(table_name)?; + let table_options = &self.get_table_options_from_config(table_name)?; if !table_options.contains("edit") { return Err(ValveError::InputError(format!( "Updating table '{}' is not allowed", @@ -2369,7 +2421,7 @@ impl Valve { /// Given a table name and a row number, delete that row from the table. pub async fn delete_row(&self, table_name: &str, row_number: &u32) -> Result<()> { - let table_options = &self.get_table_options(table_name)?; + let table_options = &self.get_table_options_from_config(table_name)?; if !table_options.contains("edit") { return Err(ValveError::InputError(format!( "Deleting from table '{}' is not allowed",