From a1c0745870614abdb80874b31608c6907a8546b0 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 17:05:28 +0530 Subject: [PATCH 01/10] refactor: separate options module --- supabase-wrappers/Cargo.toml | 1 + supabase-wrappers/src/instance.rs | 8 +- supabase-wrappers/src/lib.rs | 2 + supabase-wrappers/src/modify.rs | 25 ++-- supabase-wrappers/src/options.rs | 114 ++++++++++++++++++ supabase-wrappers/src/scan.rs | 17 +-- supabase-wrappers/src/utils.rs | 94 +++------------ wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 14 +-- 8 files changed, 160 insertions(+), 115 deletions(-) create mode 100644 supabase-wrappers/src/options.rs diff --git a/supabase-wrappers/Cargo.toml b/supabase-wrappers/Cargo.toml index c063d5bd..a59dc1ba 100644 --- a/supabase-wrappers/Cargo.toml +++ b/supabase-wrappers/Cargo.toml @@ -25,6 +25,7 @@ pgrx = {version = "=0.9.8", default-features = false } tokio = { version = "1.24", features = ["rt"] } uuid = { version = "1.2.2" } supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" } +thiserror = "1.0.48" [dev-dependencies] pgrx-tests = "=0.9.8" diff --git a/supabase-wrappers/src/instance.rs b/supabase-wrappers/src/instance.rs index 6ee800f2..c1e41133 100644 --- a/supabase-wrappers/src/instance.rs +++ b/supabase-wrappers/src/instance.rs @@ -1,16 +1,14 @@ use crate::prelude::*; -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::*; -use super::utils; - // create a fdw instance pub(super) unsafe fn create_fdw_instance, W: ForeignDataWrapper>( ftable_id: pg_sys::Oid, ) -> W { let ftable = pg_sys::GetForeignTable(ftable_id); let fserver = pg_sys::GetForeignServer((*ftable).serverid); - let fserver_opts = utils::options_to_hashmap((*fserver).options); + let fserver_opts = options_to_hashmap((*fserver).options); let wrapper = W::new(&fserver_opts); - wrapper.map_err(|e| e.into()).report() + wrapper.report_unwrap() } diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index aef3d24c..c85a03fe 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -288,11 +288,13 @@ //! - [Logflare](https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw): A FDW for [Logflare](https://logflare.app/) which supports data read only. pub mod interface; +pub mod options; pub mod utils; /// The prelude includes all necessary imports to make Wrappers work pub mod prelude { pub use crate::interface::*; + pub use crate::options::*; pub use crate::utils::*; pub use crate::wrappers_fdw; pub use ::tokio::runtime::Runtime; diff --git a/supabase-wrappers/src/modify.rs b/supabase-wrappers/src/modify.rs index fd90406b..f4c6773f 100644 --- a/supabase-wrappers/src/modify.rs +++ b/supabase-wrappers/src/modify.rs @@ -1,4 +1,4 @@ -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{ debug2, memcxt::PgMemoryContexts, pg_sys::Oid, prelude::*, rel::PgRelation, tupdesc::PgTupleDesc, FromDatum, PgSqlErrorCode, @@ -81,12 +81,8 @@ pub(super) extern "C" fn add_foreign_update_targets( unsafe { // get rowid column name from table options let ftable = pg_sys::GetForeignTable((*target_relation).rd_id); - let opts = utils::options_to_hashmap((*ftable).options); - let rowid_name = if let Some(name) = require_option("rowid_column", &opts) { - name - } else { - return; - }; + let opts = options_to_hashmap((*ftable).options); + let rowid_name = require_option("rowid_column", &opts).report_unwrap(); // find rowid attribute let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att); @@ -139,7 +135,7 @@ pub(super) extern "C" fn plan_foreign_modify, W: ForeignDat // get rowid column name from table options let ftable = pg_sys::GetForeignTable(rel.oid()); - let opts = utils::options_to_hashmap((*ftable).options); + let opts = options_to_hashmap((*ftable).options); let rowid_name = opts.get("rowid_column"); if rowid_name.is_none() { report_error( @@ -208,7 +204,7 @@ pub(super) extern "C" fn begin_foreign_modify, W: ForeignDa state.rowid_attno = pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c); - state.begin_modify().map_err(|e| e.into()).report(); + state.begin_modify().report_unwrap(); (*rinfo).ri_FdwState = state.into_pg() as _; } @@ -228,7 +224,7 @@ pub(super) extern "C" fn exec_foreign_insert, W: ForeignDat ); let row = utils::tuple_table_slot_to_row(slot); - state.insert(&row).map_err(|e| e.into()).report(); + state.insert(&row).report_unwrap(); } slot @@ -258,7 +254,7 @@ pub(super) extern "C" fn exec_foreign_delete, W: ForeignDat let cell = get_rowid_cell(&state, plan_slot); if let Some(rowid) = cell { - state.delete(&rowid).map_err(|e| e.into()).report(); + state.delete(&rowid).report_unwrap(); } } @@ -292,10 +288,7 @@ pub(super) extern "C" fn exec_foreign_update, W: ForeignDat }) && state.rowid_name != col.as_str() }); - state - .update(&rowid, &new_row) - .map_err(|e| e.into()) - .report(); + state.update(&rowid, &new_row).report_unwrap(); } } @@ -312,7 +305,7 @@ pub(super) extern "C" fn end_foreign_modify, W: ForeignData let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - state.end_modify().map_err(|e| e.into()).report(); + state.end_modify().report_unwrap(); } } } diff --git a/supabase-wrappers/src/options.rs b/supabase-wrappers/src/options.rs new file mode 100644 index 00000000..ec486c23 --- /dev/null +++ b/supabase-wrappers/src/options.rs @@ -0,0 +1,114 @@ +use crate::utils::report_error; +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::{pg_sys, PgList, PgSqlErrorCode}; +use std::collections::HashMap; +use std::ffi::CStr; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum OptionsError { + #[error("required option `{0}` is not specified")] + OptionNameNotFound(String), + #[error("an option name is not a valid UTF-8 string")] + OptionNameIsInvalidUtf8, + #[error("an option value is not a valid UTF-8 string")] + OptionValueIsInvalidUtf8, +} + +impl From for ErrorReport { + fn from(value: OptionsError) -> Self { + let error_message = format!("{value}"); + match value { + OptionsError::OptionNameNotFound(_) => ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, + error_message, + "", + ), + OptionsError::OptionNameIsInvalidUtf8 | OptionsError::OptionValueIsInvalidUtf8 => { + ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, + error_message, + "", + ) + } + } + } +} + +/// Get required option value from the `options` map +/// +/// Get the required option's value from `options` map, return None and report +/// error and stop current transaction if it does not exist. +/// +/// For example, +/// +/// ```rust,no_run +/// # use supabase_wrappers::prelude::require_option; +/// # use std::collections::HashMap; +/// # let options = &HashMap::new(); +/// require_option("my_option", options); +/// ``` +pub fn require_option<'map>( + opt_name: &str, + options: &'map HashMap, +) -> Result<&'map str, OptionsError> { + options + .get(opt_name) + .map(|t| t.as_ref()) + .ok_or(OptionsError::OptionNameNotFound(opt_name.to_string())) +} + +/// Get required option value from the `options` map or a provided default +/// +/// Get the required option's value from `options` map, return default if it does not exist. +/// +/// For example, +/// +/// ```rust,no_run +/// # use supabase_wrappers::prelude::require_option_or; +/// # use std::collections::HashMap; +/// # let options = &HashMap::new(); +/// require_option_or("my_option", options, "default value".to_string()); +/// ``` +pub fn require_option_or( + opt_name: &str, + options: &HashMap, + default: String, +) -> String { + options + .get(opt_name) + .map(|t| t.to_owned()) + .unwrap_or(default) +} + +/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) +pub fn check_options_contain(opt_list: &[Option], tgt: &str) { + let search_key = tgt.to_owned() + "="; + if !opt_list.iter().any(|opt| { + if let Some(s) = opt { + s.starts_with(&search_key) + } else { + false + } + }) { + report_error( + PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, + &format!("required option \"{}\" is not specified", tgt), + ); + } +} + +// convert options definition to hashmap +pub(super) unsafe fn options_to_hashmap(options: *mut pg_sys::List) -> HashMap { + let mut ret = HashMap::new(); + let options: PgList = PgList::from_pg(options); + for option in options.iter_ptr() { + let name = CStr::from_ptr((*option).defname); + let value = CStr::from_ptr(pg_sys::defGetString(option)); + ret.insert( + name.to_str().unwrap().to_owned(), + value.to_str().unwrap().to_owned(), + ); + } + ret +} diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index 431f6859..a880077f 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -6,7 +6,7 @@ use pgrx::{ use std::collections::HashMap; use std::marker::PhantomData; -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use std::os::raw::c_int; use std::ptr; @@ -14,11 +14,12 @@ use crate::instance; use crate::interface::{Cell, Column, Limit, Qual, Row, Sort, Value}; use crate::limit::*; use crate::memctx; +use crate::options::options_to_hashmap; use crate::polyfill; use crate::prelude::ForeignDataWrapper; use crate::qual::*; use crate::sort::*; -use crate::utils::{self, report_error, SerdeList}; +use crate::utils::{self, report_error, ReportableError, SerdeList}; // Fdw private state for scan struct FdwState, W: ForeignDataWrapper> { @@ -137,10 +138,10 @@ pub(super) extern "C" fn get_foreign_rel_size, W: ForeignDa // get foreign table options let ftable = pg_sys::GetForeignTable(foreigntableid); - state.opts = utils::options_to_hashmap((*ftable).options); + state.opts = options_to_hashmap((*ftable).options); // get estimate row count and mean row width - let (rows, width) = state.get_rel_size().map_err(|e| e.into()).report(); + let (rows, width) = state.get_rel_size().report_unwrap(); (*baserel).rows = rows as f64; (*(*baserel).reltarget).width = width; @@ -305,7 +306,7 @@ pub(super) extern "C" fn begin_foreign_scan, W: ForeignData // begin scan if it is not EXPLAIN statement if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 { - state.begin_scan().map_err(|e| e.into()).report(); + state.begin_scan().report_unwrap(); let rel = scan_state.ss_currentRelation; let tup_desc = (*rel).rd_att; @@ -336,7 +337,7 @@ pub(super) extern "C" fn iterate_foreign_scan, W: ForeignDa polyfill::exec_clear_tuple(slot); state.row.clear(); - if state.iter_scan().map_err(|e| e.into()).report().is_some() { + if state.iter_scan().report_unwrap().is_some() { if state.row.cols.len() != state.tgts.len() { report_error( PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER, @@ -375,7 +376,7 @@ pub(super) extern "C" fn re_scan_foreign_scan, W: ForeignDa let fdw_state = (*node).fdw_state as *mut FdwState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - state.re_scan().map_err(|e| e.into()).report(); + state.re_scan().report_unwrap(); } } } @@ -392,6 +393,6 @@ pub(super) extern "C" fn end_foreign_scan, W: ForeignDataWr } let mut state = PgBox::>::from_pg(fdw_state); - state.end_scan().map_err(|e| e.into()).report(); + state.end_scan().report_unwrap(); } } diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 3653704f..63c06ade 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -2,11 +2,11 @@ //! use crate::interface::{Cell, Column, Row}; +use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use pgrx::prelude::PgBuiltInOids; use pgrx::spi::Spi; use pgrx::IntoDatum; use pgrx::*; -use std::collections::HashMap; use std::ffi::CStr; use std::num::NonZeroUsize; use std::ptr; @@ -136,52 +136,6 @@ pub fn create_async_runtime() -> Runtime { Builder::new_current_thread().enable_all().build().unwrap() } -/// Get required option value from the `options` map -/// -/// Get the required option's value from `options` map, return None and report -/// error and stop current transaction if it does not exist. -/// -/// For example, -/// -/// ```rust,no_run -/// # use supabase_wrappers::prelude::require_option; -/// # use std::collections::HashMap; -/// # let options = &HashMap::new(); -/// require_option("my_option", options); -/// ``` -pub fn require_option(opt_name: &str, options: &HashMap) -> Option { - options.get(opt_name).map(|t| t.to_owned()).or_else(|| { - report_error( - PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, - &format!("required option \"{}\" is not specified", opt_name), - ); - None - }) -} - -/// Get required option value from the `options` map or a provided default -/// -/// Get the required option's value from `options` map, return default if it does not exist. -/// -/// For example, -/// -/// ```rust,no_run -/// # use supabase_wrappers::prelude::require_option_or; -/// # use std::collections::HashMap; -/// # let options = &HashMap::new(); -/// require_option_or("my_option", options, "default value".to_string()); -/// ``` -pub fn require_option_or( - opt_name: &str, - options: &HashMap, - default: String, -) -> String { - options - .get(opt_name) - .map(|t| t.to_owned()) - .unwrap_or(default) -} - /// Get decrypted secret from Vault /// /// Get decrypted secret as string from Vault. Vault is an extension for storing @@ -217,21 +171,6 @@ pub fn get_vault_secret(secret_id: &str) -> Option { } } -// convert options definition to hashmap -pub(super) unsafe fn options_to_hashmap(options: *mut pg_sys::List) -> HashMap { - let mut ret = HashMap::new(); - let options: PgList = PgList::from_pg(options); - for option in options.iter_ptr() { - let name = CStr::from_ptr((*option).defname); - let value = CStr::from_ptr(pg_sys::defGetString(option)); - ret.insert( - name.to_str().unwrap().to_owned(), - value.to_str().unwrap().to_owned(), - ); - } - ret -} - pub(super) unsafe fn tuple_table_slot_to_row(slot: *mut pg_sys::TupleTableSlot) -> Row { let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor); @@ -308,23 +247,6 @@ pub(super) unsafe fn extract_target_columns( ret } -/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) -pub fn check_options_contain(opt_list: &[Option], tgt: &str) { - let search_key = tgt.to_owned() + "="; - if !opt_list.iter().any(|opt| { - if let Some(s) = opt { - s.starts_with(&search_key) - } else { - false - } - }) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, - &format!("required option \"{}\" is not specified", tgt), - ); - } -} - // trait for "serialize" and "deserialize" state from specified memory context, // so that it is safe to be carried between the planning and the execution pub(super) trait SerdeList { @@ -367,3 +289,17 @@ pub(super) trait SerdeList { PgBox::::from_pg(ptr as _) } } + +pub(crate) trait ReportableError { + type Output; + + fn report_unwrap(self) -> Self::Output; +} + +impl> ReportableError for Result { + type Output = T; + + fn report_unwrap(self) -> Self::Output { + self.map_err(|e| e.into()).report() + } +} diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 8868b0f5..1c33b26f 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -215,7 +215,11 @@ impl LogflareFdw { } } -enum LogflareFdwError {} +#[derive(Error, Debug)] +enum LogflareFdwError { + #[error("{0}")] + Options(#[from] OptionsError), +} impl From for ErrorReport { fn from(_value: LogflareFdwError) -> Self { @@ -238,7 +242,7 @@ impl ForeignDataWrapper for LogflareFdw { .unwrap_or_else(|| LogflareFdw::BASE_URL.to_string()); let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) + None => require_option("api_key_id", options)? .and_then(|key_id| get_vault_secret(&key_id)) .map(|api_key| create_client(&api_key)), }; @@ -262,11 +266,7 @@ impl ForeignDataWrapper for LogflareFdw { _limit: &Option, options: &HashMap, ) -> Result<(), LogflareFdwError> { - let endpoint = if let Some(name) = require_option("endpoint", options) { - name - } else { - return Ok(()); - }; + let endpoint = require_option("endpoint", options)?; // extract params self.params = if let Some(params) = extract_params(quals) { From ba084a9872e108fc043d57f239a7c9d374c01c21 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 13:55:51 +0530 Subject: [PATCH 02/10] fix: remove unwrap in create_async_runtime --- Cargo.lock | 27 ++++++++++--------- supabase-wrappers/Cargo.toml | 1 + supabase-wrappers/src/utils.rs | 21 +++++++++++++-- wrappers/Cargo.lock | 10 ++++--- wrappers/Cargo.toml | 14 +++++----- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 15 ++++++++--- wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs | 15 ++++++++--- .../src/fdw/clickhouse_fdw/clickhouse_fdw.rs | 15 ++++++++--- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 15 ++++++++--- wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 11 +++++--- wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 15 ++++++++--- 11 files changed, 111 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89dc0486..4e8821cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -82,7 +82,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -207,7 +207,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -360,7 +360,7 @@ checksum = "ccb14d927583dd5c2eac0f2cf264fc4762aefe1ae14c47a8a20fc1939d3a5fc0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -430,7 +430,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -1221,7 +1221,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -1346,6 +1346,7 @@ dependencies = [ "pgrx", "pgrx-tests", "supabase-wrappers-macros", + "thiserror", "tokio", "uuid", ] @@ -1372,9 +1373,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.16" +version = "2.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -1404,22 +1405,22 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] diff --git a/supabase-wrappers/Cargo.toml b/supabase-wrappers/Cargo.toml index a59dc1ba..18d876dc 100644 --- a/supabase-wrappers/Cargo.toml +++ b/supabase-wrappers/Cargo.toml @@ -22,6 +22,7 @@ pg_test = [] [dependencies] pgrx = {version = "=0.9.8", default-features = false } +thiserror = "1.0.48" tokio = { version = "1.24", features = ["rt"] } uuid = { version = "1.2.2" } supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" } diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 63c06ade..dc589601 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -10,6 +10,7 @@ use pgrx::*; use std::ffi::CStr; use std::num::NonZeroUsize; use std::ptr; +use thiserror::Error; use tokio::runtime::{Builder, Runtime}; use uuid::Uuid; @@ -106,6 +107,19 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { ereport!(PgLogLevel::ERROR, code, msg, "Wrappers"); } +#[derive(Error, Debug)] +pub enum CreateRuntimeError { + #[error("failed to create async runtime")] + FailedToCreateAsyncRuntime, +} + +impl From for ErrorReport { + fn from(value: CreateRuntimeError) -> Self { + let error_message = format!("{value}"); + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "") + } +} + /// Create a Tokio async runtime /// /// Use this runtime to run async code in `block` mode. Run blocked code is @@ -132,8 +146,11 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { /// } /// ``` #[inline] -pub fn create_async_runtime() -> Runtime { - Builder::new_current_thread().enable_all().build().unwrap() +pub fn create_async_runtime() -> Result { + Builder::new_current_thread() + .enable_all() + .build() + .map_err(|_| CreateRuntimeError::FailedToCreateAsyncRuntime) } /// Get decrypted secret from Vault diff --git a/wrappers/Cargo.lock b/wrappers/Cargo.lock index a29e1ec9..5f8dc845 100644 --- a/wrappers/Cargo.lock +++ b/wrappers/Cargo.lock @@ -3450,6 +3450,7 @@ version = "0.1.15" dependencies = [ "pgrx", "supabase-wrappers-macros", + "thiserror", "tokio", "uuid 1.4.1", ] @@ -3530,18 +3531,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", @@ -4259,6 +4260,7 @@ dependencies = [ "serde", "serde_json", "supabase-wrappers", + "thiserror", "tokio", "tokio-util", "url", diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index ae0eaf33..8e7cf8ba 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -17,17 +17,17 @@ pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15" ] pg_test = [] helloworld_fdw = [] -bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2"] -clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex"] -stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"] -firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex"] +bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2", "thiserror"] +clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"] +stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] +firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "thiserror"] s3_fdw = [ "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "tokio", "tokio-util", "csv", "async-compression", "serde_json", "http", "parquet", "futures", "arrow-array", "chrono" ] -airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url"] -logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"] +airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"] +logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] # Does not include helloworld_fdw because of its general uselessness all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw"] @@ -72,6 +72,8 @@ http = { version = "0.2", optional = true } parquet = { version = "41.0.0", features = ["async"], optional = true } arrow-array = { version = "41.0.0", optional = true } +thiserror = { version = "1.0.48", optional = true } + [dev-dependencies] pgrx-tests = "=0.9.8" diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index ec196182..a14d5fd1 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use url::Url; use supabase_wrappers::prelude::*; +use thiserror::Error; use super::result::AirtableResponse; @@ -95,11 +96,17 @@ macro_rules! report_fetch_error { }; } -enum AirtableFdwError {} +#[derive(Error, Debug)] +enum AirtableFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: AirtableFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: AirtableFdwError) -> Self { + match value { + AirtableFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -121,7 +128,7 @@ impl ForeignDataWrapper for AirtableFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(Self { - rt: create_async_runtime(), + rt: create_async_runtime()?, client, base_url, scan_result: None, diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 685dba5b..6987fa70 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; +use thiserror::Error; macro_rules! field_type_error { ($field:ident, $err:ident) => {{ @@ -160,18 +161,24 @@ impl BigQueryFdw { } } -enum BigQueryFdwError {} +#[derive(Error, Debug)] +enum BigQueryFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: BigQueryFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: BigQueryFdwError) -> Self { + match value { + BigQueryFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for BigQueryFdw { fn new(options: &HashMap) -> Result { let mut ret = BigQueryFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, client: None, project_id: "".to_string(), dataset_id: "".to_string(), diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 1d33ef0e..9d23f0ac 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -8,6 +8,7 @@ use regex::{Captures, Regex}; use std::collections::HashMap; use supabase_wrappers::prelude::*; +use thiserror::Error; fn field_to_cell(row: &types::Row, i: usize) -> Option { let sql_type = row.sql_type(i).unwrap(); @@ -198,17 +199,23 @@ impl ClickHouseFdw { } } -enum ClickHouseFdwError {} +#[derive(Error, Debug)] +enum ClickHouseFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: ClickHouseFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: ClickHouseFdwError) -> Self { + match value { + ClickHouseFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for ClickHouseFdw { fn new(options: &HashMap) -> Result { - let rt = create_async_runtime(); + let rt = create_async_runtime()?; let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), None => require_option("conn_string_id", options) diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 4d879803..bbe007d1 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -12,6 +12,7 @@ use yup_oauth2::AccessToken; use yup_oauth2::ServiceAccountAuthenticator; use supabase_wrappers::prelude::*; +use thiserror::Error; macro_rules! report_request_error { ($url:ident, $err:ident) => { @@ -247,18 +248,24 @@ impl FirebaseFdw { } } -enum FirebaseFdwError {} +#[derive(Error, Debug)] +enum FirebaseFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: FirebaseFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: FirebaseFdwError) -> Self { + match value { + FirebaseFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for FirebaseFdw { fn new(options: &HashMap) -> Result { let mut ret = Self { - rt: create_async_runtime(), + rt: create_async_runtime()?, project_id: "".to_string(), client: None, scan_result: None, diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 1c33b26f..3f2fe088 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; +use thiserror::Error; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = HeaderMap::new(); @@ -219,11 +220,15 @@ impl LogflareFdw { enum LogflareFdwError { #[error("{0}")] Options(#[from] OptionsError), + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), } impl From for ErrorReport { - fn from(_value: LogflareFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: LogflareFdwError) -> Self { + match value { + LogflareFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -250,7 +255,7 @@ impl ForeignDataWrapper for LogflareFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(LogflareFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index d496889f..59972834 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -8,6 +8,7 @@ use serde_json::{json, Map as JsonMap, Number, Value as JsonValue}; use std::collections::HashMap; use supabase_wrappers::prelude::*; +use thiserror::Error; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -626,11 +627,17 @@ impl StripeFdw { } } -enum StripeFdwError {} +#[derive(Error, Debug)] +enum StripeFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: StripeFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: StripeFdwError) -> Self { + match value { + StripeFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -659,7 +666,7 @@ impl ForeignDataWrapper for StripeFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(StripeFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, From 2fd43a39a5814d7f1238b203d85e12506e970b94 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:00:52 +0530 Subject: [PATCH 03/10] fix: don't throw away the underlying error --- supabase-wrappers/src/utils.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index dc589601..aff9fe2e 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -109,8 +109,8 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { #[derive(Error, Debug)] pub enum CreateRuntimeError { - #[error("failed to create async runtime")] - FailedToCreateAsyncRuntime, + #[error("failed to create async runtime: {0}")] + FailedToCreateAsyncRuntime(#[from] std::io::Error), } impl From for ErrorReport { @@ -147,10 +147,7 @@ impl From for ErrorReport { /// ``` #[inline] pub fn create_async_runtime() -> Result { - Builder::new_current_thread() - .enable_all() - .build() - .map_err(|_| CreateRuntimeError::FailedToCreateAsyncRuntime) + Ok(Builder::new_current_thread().enable_all().build()?) } /// Get decrypted secret from Vault From b62f83726e891b3ff3e75b1478f4bb33ac3d7be2 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:20:39 +0530 Subject: [PATCH 04/10] fix: failing doc tests --- supabase-wrappers/src/utils.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index aff9fe2e..d5a6abf0 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -129,6 +129,8 @@ impl From for ErrorReport { /// For example, /// /// ```rust,no_run +/// # use supabase_wrappers::utils::CreateRuntimeError; +/// # fn main() -> Result<(), CreateRuntimeError> { /// # use supabase_wrappers::prelude::create_async_runtime; /// # struct Client { /// # } @@ -137,13 +139,15 @@ impl From for ErrorReport { /// # } /// # let client = Client {}; /// # let sql = ""; -/// let rt = create_async_runtime(); +/// let rt = create_async_runtime()?; /// /// // client.query() is an async function returning a Result /// match rt.block_on(client.query(&sql)) { /// Ok(result) => { } /// Err(err) => { } /// } +/// # Ok(()) +/// # } /// ``` #[inline] pub fn create_async_runtime() -> Result { From e1aa8ff0044203ae4b5d9d6d8b48d9c049fa7e1f Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:25:26 +0530 Subject: [PATCH 05/10] fix: failing clickhouse tests --- wrappers/src/fdw/clickhouse_fdw/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrappers/src/fdw/clickhouse_fdw/tests.rs b/wrappers/src/fdw/clickhouse_fdw/tests.rs index 44b6234b..88134299 100644 --- a/wrappers/src/fdw/clickhouse_fdw/tests.rs +++ b/wrappers/src/fdw/clickhouse_fdw/tests.rs @@ -11,7 +11,7 @@ mod tests { Spi::connect(|mut c| { let clickhouse_pool = ch::Pool::new("tcp://default:@localhost:9000/supa"); - let rt = create_async_runtime(); + let rt = create_async_runtime().expect("failed to create runtime"); let mut handle = rt .block_on(async { clickhouse_pool.get_handle().await }) .expect("handle"); From 6913cf6d67d1020e530ed32a529399fa2328fd29 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 18:07:44 +0530 Subject: [PATCH 06/10] fix: require_option returns result --- supabase-wrappers/Cargo.toml | 1 - wrappers/Cargo.toml | 2 +- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 22 +++--- wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs | 36 +++------- .../src/fdw/clickhouse_fdw/clickhouse_fdw.rs | 25 +++---- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 20 ++---- wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 12 ++-- wrappers/src/fdw/s3_fdw/s3_fdw.rs | 72 ++++++++++--------- wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 20 +++--- 9 files changed, 91 insertions(+), 119 deletions(-) diff --git a/supabase-wrappers/Cargo.toml b/supabase-wrappers/Cargo.toml index 18d876dc..18b13ceb 100644 --- a/supabase-wrappers/Cargo.toml +++ b/supabase-wrappers/Cargo.toml @@ -26,7 +26,6 @@ thiserror = "1.0.48" tokio = { version = "1.24", features = ["rt"] } uuid = { version = "1.2.2" } supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" } -thiserror = "1.0.48" [dev-dependencies] pgrx-tests = "=0.9.8" diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index 8e7cf8ba..90de9c8d 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -24,7 +24,7 @@ firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", s3_fdw = [ "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "tokio", "tokio-util", "csv", "async-compression", "serde_json", - "http", "parquet", "futures", "arrow-array", "chrono" + "http", "parquet", "futures", "arrow-array", "chrono", "thiserror" ] airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"] logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index a14d5fd1..5ac9bb85 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -100,12 +100,15 @@ macro_rules! report_fetch_error { enum AirtableFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: AirtableFdwError) -> Self { match value { AirtableFdwError::CreateRuntimeError(e) => e.into(), + AirtableFdwError::Options(e) => e.into(), } } } @@ -120,9 +123,10 @@ impl ForeignDataWrapper for AirtableFdw { let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -143,14 +147,10 @@ impl ForeignDataWrapper for AirtableFdw { _limit: &Option, // TODO: maxRecords options: &HashMap, ) -> Result<(), AirtableFdwError> { - let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { - require_option("table_id", options) - .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) - }) { - url - } else { - return Ok(()); - }; + let base_id = require_option("base_id", options)?; + let table_id = require_option("table_id", options)?; + let view_id = options.get("view_id"); + let url = self.build_url(&base_id, &table_id, view_id); let mut rows = Vec::new(); if let Some(client) = &self.client { diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 6987fa70..6fb7ef32 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -165,12 +165,15 @@ impl BigQueryFdw { enum BigQueryFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: BigQueryFdwError) -> Self { match value { BigQueryFdwError::CreateRuntimeError(e) => e.into(), + BigQueryFdwError::Options(e) => e.into(), } } } @@ -180,8 +183,8 @@ impl ForeignDataWrapper for BigQueryFdw { let mut ret = BigQueryFdw { rt: create_async_runtime()?, client: None, - project_id: "".to_string(), - dataset_id: "".to_string(), + project_id: require_option("project_id", options)?.to_string(), + dataset_id: require_option("dataset_id", options)?.to_string(), table: "".to_string(), rowid_col: "".to_string(), tgt_cols: Vec::new(), @@ -189,15 +192,6 @@ impl ForeignDataWrapper for BigQueryFdw { auth_mock: None, }; - let project_id = require_option("project_id", options); - let dataset_id = require_option("dataset_id", options); - - if project_id.is_none() || dataset_id.is_none() { - return Ok(ret); - } - ret.project_id = project_id.unwrap(); - ret.dataset_id = dataset_id.unwrap(); - // Is authentication mocked let mock_auth: bool = options .get("mock_auth") @@ -223,10 +217,7 @@ impl ForeignDataWrapper for BigQueryFdw { false => match options.get("sa_key") { Some(sa_key) => sa_key.to_owned(), None => { - let sa_key_id = match require_option("sa_key_id", options) { - Some(sa_key_id) => sa_key_id, - None => return Ok(ret), - }; + let sa_key_id = require_option("sa_key_id", options)?; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, None => return Ok(ret), @@ -285,11 +276,7 @@ impl ForeignDataWrapper for BigQueryFdw { limit: &Option, options: &HashMap, ) -> Result<(), BigQueryFdwError> { - let table = require_option("table", options); - if table.is_none() { - return Ok(()); - } - self.table = table.unwrap(); + self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); let location = options @@ -432,13 +419,8 @@ impl ForeignDataWrapper for BigQueryFdw { } fn begin_modify(&mut self, options: &HashMap) -> Result<(), BigQueryFdwError> { - let table = require_option("table", options); - let rowid_col = require_option("rowid_column", options); - if table.is_none() || rowid_col.is_none() { - return Ok(()); - } - self.table = table.unwrap(); - self.rowid_col = rowid_col.unwrap(); + self.table = require_option("table", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 9d23f0ac..6100a43c 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -203,12 +203,15 @@ impl ClickHouseFdw { enum ClickHouseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: ClickHouseFdwError) -> Self { match value { ClickHouseFdwError::CreateRuntimeError(e) => e.into(), + ClickHouseFdwError::Options(e) => e.into(), } } } @@ -218,9 +221,10 @@ impl ForeignDataWrapper for ClickHouseFdw { let rt = create_async_runtime()?; let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), - None => require_option("conn_string_id", options) - .and_then(|conn_str_id| get_vault_secret(&conn_str_id)) - .unwrap_or_default(), + None => { + let conn_str_id = require_option("conn_string_id", options)?; + get_vault_secret(&conn_str_id).unwrap_or_default() + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -248,11 +252,7 @@ impl ForeignDataWrapper for ClickHouseFdw { ) -> Result<(), ClickHouseFdwError> { self.create_client(); - let table = require_option("table", options); - if table.is_none() { - return Ok(()); - } - self.table = table.unwrap(); + self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); self.row_idx = 0; @@ -329,13 +329,8 @@ impl ForeignDataWrapper for ClickHouseFdw { ) -> Result<(), ClickHouseFdwError> { self.create_client(); - let table = require_option("table", options); - let rowid_col = require_option("rowid_column", options); - if table.is_none() || rowid_col.is_none() { - return Ok(()); - } - self.table = table.unwrap(); - self.rowid_col = rowid_col.unwrap(); + self.table = require_option("table", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index bbe007d1..77a6456f 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -252,12 +252,15 @@ impl FirebaseFdw { enum FirebaseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: FirebaseFdwError) -> Self { match value { FirebaseFdwError::CreateRuntimeError(e) => e.into(), + FirebaseFdwError::Options(e) => e.into(), } } } @@ -266,16 +269,11 @@ impl ForeignDataWrapper for FirebaseFdw { fn new(options: &HashMap) -> Result { let mut ret = Self { rt: create_async_runtime()?, - project_id: "".to_string(), + project_id: require_option("project_id", options)?.to_string(), client: None, scan_result: None, }; - ret.project_id = match require_option("project_id", options) { - Some(project_id) => project_id, - None => return Ok(ret), - }; - // get oauth2 access token if it is directly defined in options let token = if let Some(access_token) = options.get("access_token") { access_token.to_owned() @@ -284,10 +282,7 @@ impl ForeignDataWrapper for FirebaseFdw { let sa_key = match options.get("sa_key") { Some(sa_key) => sa_key.to_owned(), None => { - let sa_key_id = match require_option("sa_key_id", options) { - Some(sa_key_id) => sa_key_id, - None => return Ok(ret), - }; + let sa_key_id = require_option("sa_key_id", options)?; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, None => return Ok(ret), @@ -330,10 +325,7 @@ impl ForeignDataWrapper for FirebaseFdw { _limit: &Option, options: &HashMap, ) -> Result<(), FirebaseFdwError> { - let obj = match require_option("object", options) { - Some(obj) => obj, - None => return Ok(()), - }; + let obj = require_option("object", options)?; let row_cnt_limit = options .get("limit") .map(|n| n.parse::().unwrap()) diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 3f2fe088..527f99b6 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -218,16 +218,17 @@ impl LogflareFdw { #[derive(Error, Debug)] enum LogflareFdwError { - #[error("{0}")] - Options(#[from] OptionsError), #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: LogflareFdwError) -> Self { match value { LogflareFdwError::CreateRuntimeError(e) => e.into(), + LogflareFdwError::Options(e) => e.into(), } } } @@ -247,9 +248,10 @@ impl ForeignDataWrapper for LogflareFdw { .unwrap_or_else(|| LogflareFdw::BASE_URL.to_string()); let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options)? - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index dc88313d..e4706a18 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -10,6 +10,7 @@ use std::collections::{HashMap, VecDeque}; use std::env; use std::io::Cursor; use std::pin::Pin; +use thiserror::Error; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use super::parquet::*; @@ -127,11 +128,17 @@ impl S3Fdw { } } -enum S3FdwError {} +#[derive(Error, Debug)] +enum S3FdwError { + #[error("{0}")] + Options(#[from] OptionsError), +} impl From for ErrorReport { - fn from(_value: S3FdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: S3FdwError) -> Self { + match value { + S3FdwError::Options(e) => e.into(), + } } } @@ -159,17 +166,18 @@ impl ForeignDataWrapper for S3Fdw { match options.get("vault_access_key_id") { Some(vault_access_key_id) => { // if using credentials stored in Vault - require_option("vault_secret_access_key", options).and_then( - |vault_secret_access_key| { - get_vault_secret(vault_access_key_id) - .zip(get_vault_secret(&vault_secret_access_key)) - }, - ) + let vault_secret_access_key = + require_option("vault_secret_access_key", options)?; + get_vault_secret(vault_access_key_id) + .zip(get_vault_secret(&vault_secret_access_key)) } None => { // if using credentials directly specified - require_option("aws_access_key_id", options) - .zip(require_option("aws_secret_access_key", options)) + let aws_access_key_id = + require_option("aws_access_key_id", options)?.to_string(); + let aws_secret_access_key = + require_option("aws_secret_access_key", options)?.to_string(); + Some((aws_access_key_id, aws_secret_access_key)) } } }; @@ -221,7 +229,8 @@ impl ForeignDataWrapper for S3Fdw { options: &HashMap, ) -> Result<(), S3FdwError> { // extract s3 bucket and object path from uri option - let (bucket, object) = if let Some(uri) = require_option("uri", options) { + let (bucket, object) = { + let uri = require_option("uri", options)?; match uri.parse::() { Ok(uri) => { if uri.scheme_str() != Option::Some("s3") @@ -245,8 +254,6 @@ impl ForeignDataWrapper for S3Fdw { return Ok(()); } } - } else { - return Ok(()); }; let has_header: bool = options.get("has_header") == Some(&"true".to_string()); @@ -255,28 +262,23 @@ impl ForeignDataWrapper for S3Fdw { if let Some(client) = &self.client { // initialise parser according to format option - if let Some(format) = require_option("format", options) { - // create dummy parser - match format.as_str() { - "csv" => { - self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))) - } - "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), - "parquet" => self.parser = Parser::Parquet(S3Parquet::default()), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!( - "invalid format option: {}, it can only be 'csv', 'jsonl' or 'parquet'", - format - ), - ); - return Ok(()); - } + let format = require_option("format", options)?; + // create dummy parser + match format { + "csv" => self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))), + "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), + "parquet" => self.parser = Parser::Parquet(S3Parquet::default()), + _ => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!( + "invalid format option: {}, it can only be 'csv', 'jsonl' or 'parquet'", + format + ), + ); + return Ok(()); } - } else { - return Ok(()); - }; + } let stream = match self .rt diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index 59972834..af3ac182 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -631,12 +631,15 @@ impl StripeFdw { enum StripeFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + Options(#[from] OptionsError), } impl From for ErrorReport { fn from(value: StripeFdwError) -> Self { match value { StripeFdwError::CreateRuntimeError(e) => e.into(), + StripeFdwError::Options(e) => e.into(), } } } @@ -658,9 +661,10 @@ impl ForeignDataWrapper for StripeFdw { .unwrap_or_else(|| "https://api.stripe.com/v1/".to_string()); let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -683,11 +687,7 @@ impl ForeignDataWrapper for StripeFdw { limit: &Option, options: &HashMap, ) -> Result<(), StripeFdwError> { - let obj = if let Some(name) = require_option("object", options) { - name - } else { - return Ok(()); - }; + let obj = require_option("object", options)?; if let Some(client) = &self.client { let page_size = 100; // maximum page size limit for Stripe API @@ -792,8 +792,8 @@ impl ForeignDataWrapper for StripeFdw { } fn begin_modify(&mut self, options: &HashMap) -> Result<(), StripeFdwError> { - self.obj = require_option("object", options).unwrap_or_default(); - self.rowid_col = require_option("rowid_column", options).unwrap_or_default(); + self.obj = require_option("object", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } From cd1a47b90fc1c9f82b34cd1727c17a5143a25034 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 14 Sep 2023 09:16:45 +0530 Subject: [PATCH 07/10] wip --- supabase-wrappers/src/options.rs | 37 ++++--------------- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 9 ++--- 2 files changed, 12 insertions(+), 34 deletions(-) diff --git a/supabase-wrappers/src/options.rs b/supabase-wrappers/src/options.rs index ec486c23..5dd03705 100644 --- a/supabase-wrappers/src/options.rs +++ b/supabase-wrappers/src/options.rs @@ -45,8 +45,11 @@ impl From for ErrorReport { /// ```rust,no_run /// # use supabase_wrappers::prelude::require_option; /// # use std::collections::HashMap; +/// # fn main() -> Result<(), OptionsError> { /// # let options = &HashMap::new(); -/// require_option("my_option", options); +/// require_option("my_option", options)?; +/// # Ok(()) +/// # } /// ``` pub fn require_option<'map>( opt_name: &str, @@ -58,31 +61,8 @@ pub fn require_option<'map>( .ok_or(OptionsError::OptionNameNotFound(opt_name.to_string())) } -/// Get required option value from the `options` map or a provided default -/// -/// Get the required option's value from `options` map, return default if it does not exist. -/// -/// For example, -/// -/// ```rust,no_run -/// # use supabase_wrappers::prelude::require_option_or; -/// # use std::collections::HashMap; -/// # let options = &HashMap::new(); -/// require_option_or("my_option", options, "default value".to_string()); -/// ``` -pub fn require_option_or( - opt_name: &str, - options: &HashMap, - default: String, -) -> String { - options - .get(opt_name) - .map(|t| t.to_owned()) - .unwrap_or(default) -} - /// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) -pub fn check_options_contain(opt_list: &[Option], tgt: &str) { +pub fn check_options_contain(opt_list: &[Option], tgt: &str) -> Result { let search_key = tgt.to_owned() + "="; if !opt_list.iter().any(|opt| { if let Some(s) = opt { @@ -91,11 +71,10 @@ pub fn check_options_contain(opt_list: &[Option], tgt: &str) { false } }) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, - &format!("required option \"{}\" is not specified", tgt), - ); + Err(OptionsError::OptionNameNotFound(tgt.to_string())) } + + Ok(true) } // convert options definition to hashmap diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 77a6456f..59429a26 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -223,11 +223,10 @@ impl FirebaseFdw { // ref: https://firebase.google.com/docs/firestore/reference/rest/v1beta1/projects.databases.documents/listDocuments let re = Regex::new(r"^firestore/(?P[^/]+)").unwrap(); if let Some(caps) = re.captures(obj) { - let base_url = require_option_or( - "base_url", - options, - Self::DEFAULT_FIRESTORE_BASE_URL.to_owned(), - ); + let base_url = options + .get("base_url") + .map(|v| v.as_ref()) + .unwrap_or(Self::DEFAULT_FIRESTORE_BASE_URL); let collection = caps.name("collection").unwrap().as_str(); let mut ret = format!( "{}/{}/databases/(default)/documents/{}?pageSize={}", From 85f0e65697e36a56ee40ee2aefc61e131c655a7e Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 14 Sep 2023 11:00:05 +0530 Subject: [PATCH 08/10] fix: options_to_hashmap returns result --- supabase-wrappers/src/instance.rs | 2 +- supabase-wrappers/src/modify.rs | 4 +- supabase-wrappers/src/options.rs | 70 +++++++++++++------ supabase-wrappers/src/scan.rs | 2 +- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 4 +- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 8 +-- wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 2 +- wrappers/src/fdw/s3_fdw/s3_fdw.rs | 4 +- wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 2 +- 9 files changed, 62 insertions(+), 36 deletions(-) diff --git a/supabase-wrappers/src/instance.rs b/supabase-wrappers/src/instance.rs index c1e41133..61542fbf 100644 --- a/supabase-wrappers/src/instance.rs +++ b/supabase-wrappers/src/instance.rs @@ -8,7 +8,7 @@ pub(super) unsafe fn create_fdw_instance, W: ForeignDataWra ) -> W { let ftable = pg_sys::GetForeignTable(ftable_id); let fserver = pg_sys::GetForeignServer((*ftable).serverid); - let fserver_opts = options_to_hashmap((*fserver).options); + let fserver_opts = options_to_hashmap((*fserver).options).report_unwrap(); let wrapper = W::new(&fserver_opts); wrapper.report_unwrap() } diff --git a/supabase-wrappers/src/modify.rs b/supabase-wrappers/src/modify.rs index f4c6773f..130b5f4a 100644 --- a/supabase-wrappers/src/modify.rs +++ b/supabase-wrappers/src/modify.rs @@ -81,7 +81,7 @@ pub(super) extern "C" fn add_foreign_update_targets( unsafe { // get rowid column name from table options let ftable = pg_sys::GetForeignTable((*target_relation).rd_id); - let opts = options_to_hashmap((*ftable).options); + let opts = options_to_hashmap((*ftable).options).report_unwrap(); let rowid_name = require_option("rowid_column", &opts).report_unwrap(); // find rowid attribute @@ -135,7 +135,7 @@ pub(super) extern "C" fn plan_foreign_modify, W: ForeignDat // get rowid column name from table options let ftable = pg_sys::GetForeignTable(rel.oid()); - let opts = options_to_hashmap((*ftable).options); + let opts = options_to_hashmap((*ftable).options).report_unwrap(); let rowid_name = opts.get("rowid_column"); if rowid_name.is_none() { report_error( diff --git a/supabase-wrappers/src/options.rs b/supabase-wrappers/src/options.rs index 5dd03705..a4297177 100644 --- a/supabase-wrappers/src/options.rs +++ b/supabase-wrappers/src/options.rs @@ -1,4 +1,3 @@ -use crate::utils::report_error; use pgrx::pg_sys::panic::ErrorReport; use pgrx::{pg_sys, PgList, PgSqlErrorCode}; use std::collections::HashMap; @@ -9,10 +8,10 @@ use thiserror::Error; pub enum OptionsError { #[error("required option `{0}` is not specified")] OptionNameNotFound(String), - #[error("an option name is not a valid UTF-8 string")] - OptionNameIsInvalidUtf8, - #[error("an option value is not a valid UTF-8 string")] - OptionValueIsInvalidUtf8, + #[error("option name `{0}` is not a valid UTF-8 string")] + OptionNameIsInvalidUtf8(String), + #[error("option value `{0}` is not a valid UTF-8 string")] + OptionValueIsInvalidUtf8(String), } impl From for ErrorReport { @@ -24,13 +23,12 @@ impl From for ErrorReport { error_message, "", ), - OptionsError::OptionNameIsInvalidUtf8 | OptionsError::OptionValueIsInvalidUtf8 => { - ErrorReport::new( - PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, - error_message, - "", - ) - } + OptionsError::OptionNameIsInvalidUtf8(_) + | OptionsError::OptionValueIsInvalidUtf8(_) => ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, + error_message, + "", + ), } } } @@ -45,6 +43,7 @@ impl From for ErrorReport { /// ```rust,no_run /// # use supabase_wrappers::prelude::require_option; /// # use std::collections::HashMap; +/// # use supabase_wrappers::options::OptionsError; /// # fn main() -> Result<(), OptionsError> { /// # let options = &HashMap::new(); /// require_option("my_option", options)?; @@ -61,8 +60,28 @@ pub fn require_option<'map>( .ok_or(OptionsError::OptionNameNotFound(opt_name.to_string())) } +/// Get required option value from the `options` map or a provided default +/// +/// Get the required option's value from `options` map, return default if it does not exist. +/// +/// For example, +/// +/// ```rust,no_run +/// # use supabase_wrappers::prelude::require_option_or; +/// # use std::collections::HashMap; +/// # let options = &HashMap::new(); +/// require_option_or("my_option", options, "default value"); +/// ``` +pub fn require_option_or<'a>( + opt_name: &str, + options: &'a HashMap, + default: &'a str, +) -> &'a str { + options.get(opt_name).map(|t| t.as_ref()).unwrap_or(default) +} + /// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) -pub fn check_options_contain(opt_list: &[Option], tgt: &str) -> Result { +pub fn check_options_contain(opt_list: &[Option], tgt: &str) -> Result<(), OptionsError> { let search_key = tgt.to_owned() + "="; if !opt_list.iter().any(|opt| { if let Some(s) = opt { @@ -72,22 +91,31 @@ pub fn check_options_contain(opt_list: &[Option], tgt: &str) -> Result HashMap { +pub(super) unsafe fn options_to_hashmap( + options: *mut pg_sys::List, +) -> Result, OptionsError> { let mut ret = HashMap::new(); let options: PgList = PgList::from_pg(options); for option in options.iter_ptr() { let name = CStr::from_ptr((*option).defname); let value = CStr::from_ptr(pg_sys::defGetString(option)); - ret.insert( - name.to_str().unwrap().to_owned(), - value.to_str().unwrap().to_owned(), - ); + let name = name.to_str().map_err(|_| { + OptionsError::OptionNameIsInvalidUtf8( + String::from_utf8_lossy(name.to_bytes()).to_string(), + ) + })?; + let value = value.to_str().map_err(|_| { + OptionsError::OptionNameIsInvalidUtf8( + String::from_utf8_lossy(value.to_bytes()).to_string(), + ) + })?; + ret.insert(name.to_string(), value.to_string()); } - ret + Ok(ret) } diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index a880077f..82a7fd8f 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -138,7 +138,7 @@ pub(super) extern "C" fn get_foreign_rel_size, W: ForeignDa // get foreign table options let ftable = pg_sys::GetForeignTable(foreigntableid); - state.opts = options_to_hashmap((*ftable).options); + state.opts = options_to_hashmap((*ftable).options).report_unwrap(); // get estimate row count and mean row width let (rows, width) = state.get_rel_size().report_unwrap(); diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index 5ac9bb85..a2fceb46 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -226,8 +226,8 @@ impl ForeignDataWrapper for AirtableFdw { ) -> Result<(), AirtableFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "base_id"); - check_options_contain(&options, "table_id"); + check_options_contain(&options, "base_id")?; + check_options_contain(&options, "table_id")?; } } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 59429a26..5fce82d6 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -223,10 +223,8 @@ impl FirebaseFdw { // ref: https://firebase.google.com/docs/firestore/reference/rest/v1beta1/projects.databases.documents/listDocuments let re = Regex::new(r"^firestore/(?P[^/]+)").unwrap(); if let Some(caps) = re.captures(obj) { - let base_url = options - .get("base_url") - .map(|v| v.as_ref()) - .unwrap_or(Self::DEFAULT_FIRESTORE_BASE_URL); + let base_url = + require_option_or("base_url", options, Self::DEFAULT_FIRESTORE_BASE_URL); let collection = caps.name("collection").unwrap().as_str(); let mut ret = format!( "{}/{}/databases/(default)/documents/{}?pageSize={}", @@ -409,7 +407,7 @@ impl ForeignDataWrapper for FirebaseFdw { ) -> Result<(), FirebaseFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "object"); + check_options_contain(&options, "object")?; } } diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 527f99b6..0bc50cf7 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -356,7 +356,7 @@ impl ForeignDataWrapper for LogflareFdw { ) -> Result<(), LogflareFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "endpoint"); + check_options_contain(&options, "endpoint")?; } } diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index e4706a18..7d81072d 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -465,8 +465,8 @@ impl ForeignDataWrapper for S3Fdw { ) -> Result<(), S3FdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "uri"); - check_options_contain(&options, "format"); + check_options_contain(&options, "uri")?; + check_options_contain(&options, "format")?; } } diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index af3ac182..5b258d6e 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -945,7 +945,7 @@ impl ForeignDataWrapper for StripeFdw { ) -> Result<(), StripeFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "object"); + check_options_contain(&options, "object")?; } } From 63774bb0a2cc6124e8b86eea304b93276bd61ad2 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 14 Sep 2023 11:13:20 +0530 Subject: [PATCH 09/10] fix: use correct enum variant --- supabase-wrappers/src/options.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supabase-wrappers/src/options.rs b/supabase-wrappers/src/options.rs index a4297177..5ef417cf 100644 --- a/supabase-wrappers/src/options.rs +++ b/supabase-wrappers/src/options.rs @@ -111,7 +111,7 @@ pub(super) unsafe fn options_to_hashmap( ) })?; let value = value.to_str().map_err(|_| { - OptionsError::OptionNameIsInvalidUtf8( + OptionsError::OptionValueIsInvalidUtf8( String::from_utf8_lossy(value.to_bytes()).to_string(), ) })?; From 841485447138cdd28c32cec68d14826f2a3cbbac Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 14 Sep 2023 12:01:48 +0530 Subject: [PATCH 10/10] chore: rename Options variants to OptionsError --- wrappers/src/fdw/airtable_fdw/mod.rs | 4 ++-- wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs | 4 ++-- wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs | 4 ++-- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 4 ++-- wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 4 ++-- wrappers/src/fdw/s3_fdw/s3_fdw.rs | 4 ++-- wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 4 ++-- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/wrappers/src/fdw/airtable_fdw/mod.rs b/wrappers/src/fdw/airtable_fdw/mod.rs index bc7895f1..366a8ad6 100644 --- a/wrappers/src/fdw/airtable_fdw/mod.rs +++ b/wrappers/src/fdw/airtable_fdw/mod.rs @@ -26,14 +26,14 @@ enum AirtableFdwError { RequestError(#[from] reqwest_middleware::Error), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: AirtableFdwError) -> Self { match value { AirtableFdwError::CreateRuntimeError(e) => e.into(), - AirtableFdwError::Options(e) => e.into(), + AirtableFdwError::OptionsError(e) => e.into(), _ => ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), ""), } } diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 6fb7ef32..56dea395 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -166,14 +166,14 @@ enum BigQueryFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: BigQueryFdwError) -> Self { match value { BigQueryFdwError::CreateRuntimeError(e) => e.into(), - BigQueryFdwError::Options(e) => e.into(), + BigQueryFdwError::OptionsError(e) => e.into(), } } } diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 6100a43c..3417c478 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -204,14 +204,14 @@ enum ClickHouseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: ClickHouseFdwError) -> Self { match value { ClickHouseFdwError::CreateRuntimeError(e) => e.into(), - ClickHouseFdwError::Options(e) => e.into(), + ClickHouseFdwError::OptionsError(e) => e.into(), } } } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 5fce82d6..b563a108 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -250,14 +250,14 @@ enum FirebaseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: FirebaseFdwError) -> Self { match value { FirebaseFdwError::CreateRuntimeError(e) => e.into(), - FirebaseFdwError::Options(e) => e.into(), + FirebaseFdwError::OptionsError(e) => e.into(), } } } diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 0bc50cf7..5e6647e3 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -221,14 +221,14 @@ enum LogflareFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: LogflareFdwError) -> Self { match value { LogflareFdwError::CreateRuntimeError(e) => e.into(), - LogflareFdwError::Options(e) => e.into(), + LogflareFdwError::OptionsError(e) => e.into(), } } } diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index 7d81072d..5cdcf4c4 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -131,13 +131,13 @@ impl S3Fdw { #[derive(Error, Debug)] enum S3FdwError { #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: S3FdwError) -> Self { match value { - S3FdwError::Options(e) => e.into(), + S3FdwError::OptionsError(e) => e.into(), } } } diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index 5b258d6e..0785622f 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -632,14 +632,14 @@ enum StripeFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), #[error("{0}")] - Options(#[from] OptionsError), + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: StripeFdwError) -> Self { match value { StripeFdwError::CreateRuntimeError(e) => e.into(), - StripeFdwError::Options(e) => e.into(), + StripeFdwError::OptionsError(e) => e.into(), } } }