From 7df8f4ba59f407e80bb763ae18676ea595258ae9 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 13:55:51 +0530 Subject: [PATCH 1/6] fix: remove unwrap in create_async_runtime --- Cargo.lock | 27 ++++++++++--------- supabase-wrappers/Cargo.toml | 1 + supabase-wrappers/src/utils.rs | 22 +++++++++++++-- wrappers/Cargo.lock | 10 ++++--- wrappers/Cargo.toml | 14 +++++----- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 15 ++++++++--- wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs | 15 ++++++++--- .../src/fdw/clickhouse_fdw/clickhouse_fdw.rs | 15 ++++++++--- wrappers/src/fdw/firebase_fdw/firebase_fdw.rs | 15 ++++++++--- wrappers/src/fdw/logflare_fdw/logflare_fdw.rs | 15 ++++++++--- wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 15 ++++++++--- 11 files changed, 115 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89dc0486..4e8821cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -82,7 +82,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -207,7 +207,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -360,7 +360,7 @@ checksum = "ccb14d927583dd5c2eac0f2cf264fc4762aefe1ae14c47a8a20fc1939d3a5fc0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -430,7 +430,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -1221,7 +1221,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] @@ -1346,6 +1346,7 @@ dependencies = [ "pgrx", "pgrx-tests", "supabase-wrappers-macros", + "thiserror", "tokio", "uuid", ] @@ -1372,9 +1373,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.16" +version = "2.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -1404,22 +1405,22 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.32", ] [[package]] diff --git a/supabase-wrappers/Cargo.toml b/supabase-wrappers/Cargo.toml index c063d5bd..18b13ceb 100644 --- a/supabase-wrappers/Cargo.toml +++ b/supabase-wrappers/Cargo.toml @@ -22,6 +22,7 @@ pg_test = [] [dependencies] pgrx = {version = "=0.9.8", default-features = false } +thiserror = "1.0.48" tokio = { version = "1.24", features = ["rt"] } uuid = { version = "1.2.2" } supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" } diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 3653704f..8623b16f 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -2,6 +2,7 @@ //! use crate::interface::{Cell, Column, Row}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgBuiltInOids; use pgrx::spi::Spi; use pgrx::IntoDatum; @@ -10,6 +11,7 @@ use std::collections::HashMap; use std::ffi::CStr; use std::num::NonZeroUsize; use std::ptr; +use thiserror::Error; use tokio::runtime::{Builder, Runtime}; use uuid::Uuid; @@ -106,6 +108,19 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { ereport!(PgLogLevel::ERROR, code, msg, "Wrappers"); } +#[derive(Error, Debug)] +pub enum CreateRuntimeError { + #[error("failed to create async runtime")] + FailedToCreateAsyncRuntime, +} + +impl From for ErrorReport { + fn from(value: CreateRuntimeError) -> Self { + let error_message = format!("{value}"); + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "") + } +} + /// Create a Tokio async runtime /// /// Use this runtime to run async code in `block` mode. Run blocked code is @@ -132,8 +147,11 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { /// } /// ``` #[inline] -pub fn create_async_runtime() -> Runtime { - Builder::new_current_thread().enable_all().build().unwrap() +pub fn create_async_runtime() -> Result { + Builder::new_current_thread() + .enable_all() + .build() + .map_err(|_| CreateRuntimeError::FailedToCreateAsyncRuntime) } /// Get required option value from the `options` map diff --git a/wrappers/Cargo.lock b/wrappers/Cargo.lock index a29e1ec9..5f8dc845 100644 --- a/wrappers/Cargo.lock +++ b/wrappers/Cargo.lock @@ -3450,6 +3450,7 @@ version = "0.1.15" dependencies = [ "pgrx", "supabase-wrappers-macros", + "thiserror", "tokio", "uuid 1.4.1", ] @@ -3530,18 +3531,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", @@ -4259,6 +4260,7 @@ dependencies = [ "serde", "serde_json", "supabase-wrappers", + "thiserror", "tokio", "tokio-util", "url", diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index ae0eaf33..8e7cf8ba 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -17,17 +17,17 @@ pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15" ] pg_test = [] helloworld_fdw = [] -bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2"] -clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex"] -stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"] -firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex"] +bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2", "thiserror"] +clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"] +stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] +firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "thiserror"] s3_fdw = [ "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "tokio", "tokio-util", "csv", "async-compression", "serde_json", "http", "parquet", "futures", "arrow-array", "chrono" ] -airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url"] -logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"] +airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"] +logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] # Does not include helloworld_fdw because of its general uselessness all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw"] @@ -72,6 +72,8 @@ http = { version = "0.2", optional = true } parquet = { version = "41.0.0", features = ["async"], optional = true } arrow-array = { version = "41.0.0", optional = true } +thiserror = { version = "1.0.48", optional = true } + [dev-dependencies] pgrx-tests = "=0.9.8" diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index ec196182..a14d5fd1 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use url::Url; use supabase_wrappers::prelude::*; +use thiserror::Error; use super::result::AirtableResponse; @@ -95,11 +96,17 @@ macro_rules! report_fetch_error { }; } -enum AirtableFdwError {} +#[derive(Error, Debug)] +enum AirtableFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: AirtableFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: AirtableFdwError) -> Self { + match value { + AirtableFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -121,7 +128,7 @@ impl ForeignDataWrapper for AirtableFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(Self { - rt: create_async_runtime(), + rt: create_async_runtime()?, client, base_url, scan_result: None, diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 685dba5b..6987fa70 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; +use thiserror::Error; macro_rules! field_type_error { ($field:ident, $err:ident) => {{ @@ -160,18 +161,24 @@ impl BigQueryFdw { } } -enum BigQueryFdwError {} +#[derive(Error, Debug)] +enum BigQueryFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: BigQueryFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: BigQueryFdwError) -> Self { + match value { + BigQueryFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for BigQueryFdw { fn new(options: &HashMap) -> Result { let mut ret = BigQueryFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, client: None, project_id: "".to_string(), dataset_id: "".to_string(), diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 1d33ef0e..9d23f0ac 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -8,6 +8,7 @@ use regex::{Captures, Regex}; use std::collections::HashMap; use supabase_wrappers::prelude::*; +use thiserror::Error; fn field_to_cell(row: &types::Row, i: usize) -> Option { let sql_type = row.sql_type(i).unwrap(); @@ -198,17 +199,23 @@ impl ClickHouseFdw { } } -enum ClickHouseFdwError {} +#[derive(Error, Debug)] +enum ClickHouseFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: ClickHouseFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: ClickHouseFdwError) -> Self { + match value { + ClickHouseFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for ClickHouseFdw { fn new(options: &HashMap) -> Result { - let rt = create_async_runtime(); + let rt = create_async_runtime()?; let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), None => require_option("conn_string_id", options) diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 4d879803..bbe007d1 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -12,6 +12,7 @@ use yup_oauth2::AccessToken; use yup_oauth2::ServiceAccountAuthenticator; use supabase_wrappers::prelude::*; +use thiserror::Error; macro_rules! report_request_error { ($url:ident, $err:ident) => { @@ -247,18 +248,24 @@ impl FirebaseFdw { } } -enum FirebaseFdwError {} +#[derive(Error, Debug)] +enum FirebaseFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: FirebaseFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: FirebaseFdwError) -> Self { + match value { + FirebaseFdwError::CreateRuntimeError(e) => e.into(), + } } } impl ForeignDataWrapper for FirebaseFdw { fn new(options: &HashMap) -> Result { let mut ret = Self { - rt: create_async_runtime(), + rt: create_async_runtime()?, project_id: "".to_string(), client: None, scan_result: None, diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 8868b0f5..38b8e4e8 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; +use thiserror::Error; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = HeaderMap::new(); @@ -215,11 +216,17 @@ impl LogflareFdw { } } -enum LogflareFdwError {} +#[derive(Error, Debug)] +enum LogflareFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: LogflareFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: LogflareFdwError) -> Self { + match value { + LogflareFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -246,7 +253,7 @@ impl ForeignDataWrapper for LogflareFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(LogflareFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index d496889f..59972834 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -8,6 +8,7 @@ use serde_json::{json, Map as JsonMap, Number, Value as JsonValue}; use std::collections::HashMap; use supabase_wrappers::prelude::*; +use thiserror::Error; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -626,11 +627,17 @@ impl StripeFdw { } } -enum StripeFdwError {} +#[derive(Error, Debug)] +enum StripeFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), +} impl From for ErrorReport { - fn from(_value: StripeFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: StripeFdwError) -> Self { + match value { + StripeFdwError::CreateRuntimeError(e) => e.into(), + } } } @@ -659,7 +666,7 @@ impl ForeignDataWrapper for StripeFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); Ok(StripeFdw { - rt: create_async_runtime(), + rt: create_async_runtime()?, base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, From 1dffd1d6e3d317dfec8dd40dc52bc21fce99e60b Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:00:52 +0530 Subject: [PATCH 2/6] fix: don't throw away the underlying error --- supabase-wrappers/src/utils.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 8623b16f..50ede305 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -110,8 +110,8 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) { #[derive(Error, Debug)] pub enum CreateRuntimeError { - #[error("failed to create async runtime")] - FailedToCreateAsyncRuntime, + #[error("failed to create async runtime: {0}")] + FailedToCreateAsyncRuntime(#[from] std::io::Error), } impl From for ErrorReport { @@ -148,10 +148,7 @@ impl From for ErrorReport { /// ``` #[inline] pub fn create_async_runtime() -> Result { - Builder::new_current_thread() - .enable_all() - .build() - .map_err(|_| CreateRuntimeError::FailedToCreateAsyncRuntime) + Ok(Builder::new_current_thread().enable_all().build()?) } /// Get required option value from the `options` map From 943533775adea44829d96ea55eef960a31771807 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:20:39 +0530 Subject: [PATCH 3/6] fix: failing doc tests --- supabase-wrappers/src/utils.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 50ede305..5abc6c77 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -130,6 +130,8 @@ impl From for ErrorReport { /// For example, /// /// ```rust,no_run +/// # use supabase_wrappers::utils::CreateRuntimeError; +/// # fn main() -> Result<(), CreateRuntimeError> { /// # use supabase_wrappers::prelude::create_async_runtime; /// # struct Client { /// # } @@ -138,13 +140,15 @@ impl From for ErrorReport { /// # } /// # let client = Client {}; /// # let sql = ""; -/// let rt = create_async_runtime(); +/// let rt = create_async_runtime()?; /// /// // client.query() is an async function returning a Result /// match rt.block_on(client.query(&sql)) { /// Ok(result) => { } /// Err(err) => { } /// } +/// # Ok(()) +/// # } /// ``` #[inline] pub fn create_async_runtime() -> Result { From 51caff979610d961c432e345da6134c9a435edbe Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 13 Sep 2023 14:25:26 +0530 Subject: [PATCH 4/6] fix: failing clickhouse tests --- wrappers/src/fdw/clickhouse_fdw/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrappers/src/fdw/clickhouse_fdw/tests.rs b/wrappers/src/fdw/clickhouse_fdw/tests.rs index 44b6234b..88134299 100644 --- a/wrappers/src/fdw/clickhouse_fdw/tests.rs +++ b/wrappers/src/fdw/clickhouse_fdw/tests.rs @@ -11,7 +11,7 @@ mod tests { Spi::connect(|mut c| { let clickhouse_pool = ch::Pool::new("tcp://default:@localhost:9000/supa"); - let rt = create_async_runtime(); + let rt = create_async_runtime().expect("failed to create runtime"); let mut handle = rt .block_on(async { clickhouse_pool.get_handle().await }) .expect("handle"); From 5ee3e52a3b76f4e2d2da62b6a7082dd3ccc71e47 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 13 Sep 2023 23:41:55 +1000 Subject: [PATCH 5/6] chore(airtable_fdw): refactor error reporting --- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 94 +++------ wrappers/src/fdw/airtable_fdw/mod.rs | 48 +++++ wrappers/src/fdw/airtable_fdw/result.rs | 182 ++++++++++++------ 3 files changed, 204 insertions(+), 120 deletions(-) diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index ec196182..f832e568 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -1,7 +1,5 @@ use crate::stats; use pgrx::pg_sys; -use pgrx::pg_sys::panic::ErrorReport; -use pgrx::prelude::PgSqlErrorCode; use reqwest::{self, header}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; @@ -11,6 +9,7 @@ use url::Url; use supabase_wrappers::prelude::*; use super::result::AirtableResponse; +use super::{AirtableFdwError, AirtableFdwResult}; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -55,13 +54,12 @@ impl AirtableFdw { } } - #[inline] fn set_limit_offset( &self, url: &str, page_size: Option, offset: Option<&str>, - ) -> Result { + ) -> AirtableFdwResult { let mut params = Vec::new(); if let Some(page_size) = page_size { params.push(("pageSize", format!("{}", page_size))); @@ -70,42 +68,29 @@ impl AirtableFdw { params.push(("offset", offset.to_string())); } - Url::parse_with_params(url, ¶ms).map(|x| x.into()) + Ok(Url::parse_with_params(url, ¶ms).map(|x| x.into())?) } // convert response body text to rows - fn parse_resp(&self, resp_body: &str, columns: &[Column]) -> (Vec, Option) { + fn parse_resp( + &self, + resp_body: &str, + columns: &[Column], + ) -> AirtableFdwResult<(Vec, Option)> { let response: AirtableResponse = serde_json::from_str(resp_body).unwrap(); let mut result = Vec::new(); for record in response.records.iter() { - result.push(record.to_row(columns)); + result.push(record.to_row(columns)?); } - (result, response.offset) - } -} - -macro_rules! report_fetch_error { - ($url:ident, $err:ident) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("fetch {} failed: {}", $url, $err), - ) - }; -} - -enum AirtableFdwError {} - -impl From for ErrorReport { - fn from(_value: AirtableFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + Ok((result, response.offset)) } } // TODO Add support for INSERT, UPDATE, DELETE impl ForeignDataWrapper for AirtableFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> AirtableFdwResult { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -135,7 +120,7 @@ impl ForeignDataWrapper for AirtableFdw { _sorts: &[Sort], // TODO: Propagate sort _limit: &Option, // TODO: maxRecords options: &HashMap, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { require_option("table_id", options) .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) @@ -153,38 +138,23 @@ impl ForeignDataWrapper for AirtableFdw { // Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each // time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more // state so starting with the simpler solution. - let url = match self.set_limit_offset(&url, None, offset.as_deref()) { - Ok(url) => url, - Err(err) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("internal error: {}", err), - ); - return Ok(()); - } - }; - - match self.rt.block_on(client.get(&url).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let (new_rows, new_offset) = self.parse_resp(&body, columns); - rows.extend(new_rows.into_iter()); - - if let Some(new_offset) = new_offset { - offset = Some(new_offset); - } else { - break; - } - } - Err(err) => report_fetch_error!(url, err), - }, - Err(err) => report_fetch_error!(url, err), + let url = self.set_limit_offset(&url, None, offset.as_deref())?; + + let body = self.rt.block_on(client.get(&url).send()).and_then(|resp| { + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + })?; + + let (new_rows, new_offset) = self.parse_resp(&body, columns)?; + rows.extend(new_rows); + + stats::inc_stats(Self::FDW_NAME, stats::Metric::BytesIn, body.len() as i64); + + if let Some(new_offset) = new_offset { + offset = Some(new_offset); + } else { + break; } } } @@ -196,7 +166,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, AirtableFdwError> { + fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { return Ok(result @@ -208,7 +178,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(None) } - fn end_scan(&mut self) -> Result<(), AirtableFdwError> { + fn end_scan(&mut self) -> AirtableFdwResult<()> { self.scan_result.take(); Ok(()) } @@ -216,7 +186,7 @@ impl ForeignDataWrapper for AirtableFdw { fn validator( options: Vec>, catalog: Option, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "base_id"); diff --git a/wrappers/src/fdw/airtable_fdw/mod.rs b/wrappers/src/fdw/airtable_fdw/mod.rs index cb08ddf3..b8fdccbc 100644 --- a/wrappers/src/fdw/airtable_fdw/mod.rs +++ b/wrappers/src/fdw/airtable_fdw/mod.rs @@ -1,3 +1,51 @@ #![allow(clippy::module_inception)] mod airtable_fdw; mod result; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; + +enum AirtableFdwError { + UnsupportedColumnType(String), + ColumnTypeNotMatch(String), + UrlParseError(url::ParseError), + RequestError(reqwest_middleware::Error), +} + +impl From for ErrorReport { + fn from(value: AirtableFdwError) -> Self { + let error_message = match value { + AirtableFdwError::UnsupportedColumnType(s) => { + format!("column '{}' data type is not supported", s) + } + AirtableFdwError::ColumnTypeNotMatch(s) => { + format!("column '{}' data type not match", s) + } + AirtableFdwError::UrlParseError(err) => { + format!("parse url failed: {}", err) + } + AirtableFdwError::RequestError(err) => { + format!( + "fetch {} failed: {}", + err.url().map(|u| u.as_str()).unwrap_or_default(), + err + ) + } + }; + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "") + } +} + +impl From for AirtableFdwError { + fn from(source: url::ParseError) -> Self { + AirtableFdwError::UrlParseError(source) + } +} + +impl From for AirtableFdwError { + fn from(source: reqwest_middleware::Error) -> Self { + AirtableFdwError::RequestError(source) + } +} + +type AirtableFdwResult = Result; diff --git a/wrappers/src/fdw/airtable_fdw/result.rs b/wrappers/src/fdw/airtable_fdw/result.rs index 2aeee2ee..c5526f91 100644 --- a/wrappers/src/fdw/airtable_fdw/result.rs +++ b/wrappers/src/fdw/airtable_fdw/result.rs @@ -1,14 +1,15 @@ use pgrx::pg_sys; -use pgrx::prelude::PgSqlErrorCode; use serde::de::{MapAccess, Visitor}; use serde::{Deserialize, Deserializer}; -use serde_json::{value::Number, Value}; +use serde_json::Value; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; use std::str::FromStr; use supabase_wrappers::prelude::*; +use super::{AirtableFdwError, AirtableFdwResult}; + #[derive(Deserialize, Debug)] pub struct AirtableResponse { pub records: Vec, @@ -84,21 +85,9 @@ impl<'de> Deserialize<'de> for AirtableFields { } impl AirtableRecord { - pub fn to_row(&self, columns: &[Column]) -> Row { + pub(super) fn to_row(&self, columns: &[Column]) -> AirtableFdwResult { let mut row = Row::new(); - macro_rules! col_to_cell { - ($col:ident, $src_type:ident, $conv:expr) => {{ - self.fields.0.get(&$col.name).and_then(|val| { - if let Value::$src_type(v) = val { - $conv(v) - } else { - panic!("column '{}' data type not match", $col.name) - } - }) - }}; - } - for col in columns.iter() { if col.name == "id" { row.push("id", Some(Cell::String(self.id.clone()))); @@ -106,52 +95,129 @@ impl AirtableRecord { } let cell = match col.type_oid { - pg_sys::BOOLOID => col_to_cell!(col, Bool, |v: &bool| Some(Cell::Bool(*v))), - pg_sys::CHAROID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I8(n as i8)) - }), - pg_sys::INT2OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I16(n as i16)) - }), - pg_sys::FLOAT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64().map(|n| Cell::F32(n as f32)) - }), - pg_sys::INT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I32(n as i32)) - }), - pg_sys::FLOAT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_f64().map(Cell::F64) }) - } - pg_sys::INT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_i64().map(Cell::I64) }) - } - pg_sys::NUMERICOID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64() - .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())) - }), - pg_sys::TEXTOID => { - col_to_cell!(col, String, |v: &String| { Some(Cell::String(v.clone())) }) - } - pg_sys::DATEOID => col_to_cell!(col, String, |v: &String| { - pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date) - }), - pg_sys::TIMESTAMPOID => col_to_cell!(col, String, |v: &String| { - pgrx::Timestamp::from_str(v.as_str()) - .ok() - .map(Cell::Timestamp) - }), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("column '{}' data type not supported", col.name), - ); - None - } - }; + pg_sys::BOOLOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Bool(v) = val { + Ok(Some(Cell::Bool(*v))) + } else { + Err(()) + } + }, + ), + pg_sys::CHAROID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I8(n as i8))) + } else { + Err(()) + } + }, + ), + pg_sys::INT2OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I16(n as i16))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(|n| Cell::F32(n as f32))) + } else { + Err(()) + } + }, + ), + pg_sys::INT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I32(n as i32))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(Cell::F64)) + } else { + Err(()) + } + }, + ), + pg_sys::INT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(Cell::I64)) + } else { + Err(()) + } + }, + ), + pg_sys::NUMERICOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + let n = v + .as_f64() + .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())); + Ok(n) + } else { + Err(()) + } + }, + ), + pg_sys::TEXTOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(Some(Cell::String(v.clone()))) + } else { + Err(()) + } + }, + ), + pg_sys::DATEOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date)) + } else { + Err(()) + } + }, + ), + pg_sys::TIMESTAMPOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + let n = pgrx::Timestamp::from_str(v.as_str()) + .ok() + .map(Cell::Timestamp); + Ok(n) + } else { + Err(()) + } + }, + ), + _ => return Err(AirtableFdwError::UnsupportedColumnType(col.name.clone())), + } + .map_err(|_| AirtableFdwError::ColumnTypeNotMatch(col.name.clone()))?; row.push(&col.name, cell); } - row + Ok(row) } } From 945c3dd417dae177e9c8b0f68299d98b5a68656f Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 13 Sep 2023 23:41:55 +1000 Subject: [PATCH 6/6] chore(airtable_fdw): refactor error reporting --- wrappers/src/fdw/airtable_fdw/airtable_fdw.rs | 101 +++------- wrappers/src/fdw/airtable_fdw/mod.rs | 32 +++ wrappers/src/fdw/airtable_fdw/result.rs | 182 ++++++++++++------ 3 files changed, 188 insertions(+), 127 deletions(-) diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index a14d5fd1..613dda63 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -1,7 +1,5 @@ use crate::stats; use pgrx::pg_sys; -use pgrx::pg_sys::panic::ErrorReport; -use pgrx::prelude::PgSqlErrorCode; use reqwest::{self, header}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; @@ -9,9 +7,9 @@ use std::collections::HashMap; use url::Url; use supabase_wrappers::prelude::*; -use thiserror::Error; use super::result::AirtableResponse; +use super::{AirtableFdwError, AirtableFdwResult}; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -56,13 +54,12 @@ impl AirtableFdw { } } - #[inline] fn set_limit_offset( &self, url: &str, page_size: Option, offset: Option<&str>, - ) -> Result { + ) -> AirtableFdwResult { let mut params = Vec::new(); if let Some(page_size) = page_size { params.push(("pageSize", format!("{}", page_size))); @@ -71,48 +68,29 @@ impl AirtableFdw { params.push(("offset", offset.to_string())); } - Url::parse_with_params(url, ¶ms).map(|x| x.into()) + Ok(Url::parse_with_params(url, ¶ms).map(|x| x.into())?) } // convert response body text to rows - fn parse_resp(&self, resp_body: &str, columns: &[Column]) -> (Vec, Option) { + fn parse_resp( + &self, + resp_body: &str, + columns: &[Column], + ) -> AirtableFdwResult<(Vec, Option)> { let response: AirtableResponse = serde_json::from_str(resp_body).unwrap(); let mut result = Vec::new(); for record in response.records.iter() { - result.push(record.to_row(columns)); + result.push(record.to_row(columns)?); } - (result, response.offset) - } -} - -macro_rules! report_fetch_error { - ($url:ident, $err:ident) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("fetch {} failed: {}", $url, $err), - ) - }; -} - -#[derive(Error, Debug)] -enum AirtableFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), -} - -impl From for ErrorReport { - fn from(value: AirtableFdwError) -> Self { - match value { - AirtableFdwError::CreateRuntimeError(e) => e.into(), - } + Ok((result, response.offset)) } } // TODO Add support for INSERT, UPDATE, DELETE impl ForeignDataWrapper for AirtableFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> AirtableFdwResult { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -142,7 +120,7 @@ impl ForeignDataWrapper for AirtableFdw { _sorts: &[Sort], // TODO: Propagate sort _limit: &Option, // TODO: maxRecords options: &HashMap, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { require_option("table_id", options) .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) @@ -160,38 +138,23 @@ impl ForeignDataWrapper for AirtableFdw { // Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each // time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more // state so starting with the simpler solution. - let url = match self.set_limit_offset(&url, None, offset.as_deref()) { - Ok(url) => url, - Err(err) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("internal error: {}", err), - ); - return Ok(()); - } - }; - - match self.rt.block_on(client.get(&url).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let (new_rows, new_offset) = self.parse_resp(&body, columns); - rows.extend(new_rows.into_iter()); - - if let Some(new_offset) = new_offset { - offset = Some(new_offset); - } else { - break; - } - } - Err(err) => report_fetch_error!(url, err), - }, - Err(err) => report_fetch_error!(url, err), + let url = self.set_limit_offset(&url, None, offset.as_deref())?; + + let body = self.rt.block_on(client.get(&url).send()).and_then(|resp| { + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + })?; + + let (new_rows, new_offset) = self.parse_resp(&body, columns)?; + rows.extend(new_rows); + + stats::inc_stats(Self::FDW_NAME, stats::Metric::BytesIn, body.len() as i64); + + if let Some(new_offset) = new_offset { + offset = Some(new_offset); + } else { + break; } } } @@ -203,7 +166,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, AirtableFdwError> { + fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { return Ok(result @@ -215,7 +178,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(None) } - fn end_scan(&mut self) -> Result<(), AirtableFdwError> { + fn end_scan(&mut self) -> AirtableFdwResult<()> { self.scan_result.take(); Ok(()) } @@ -223,7 +186,7 @@ impl ForeignDataWrapper for AirtableFdw { fn validator( options: Vec>, catalog: Option, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "base_id"); diff --git a/wrappers/src/fdw/airtable_fdw/mod.rs b/wrappers/src/fdw/airtable_fdw/mod.rs index cb08ddf3..5a571e28 100644 --- a/wrappers/src/fdw/airtable_fdw/mod.rs +++ b/wrappers/src/fdw/airtable_fdw/mod.rs @@ -1,3 +1,35 @@ #![allow(clippy::module_inception)] mod airtable_fdw; mod result; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::CreateRuntimeError; + +#[derive(Error, Debug)] +enum AirtableFdwError { + #[error("column '{0}' data type is not supported")] + UnsupportedColumnType(String), + + #[error("column '{0}' data type not match")] + ColumnTypeNotMatch(String), + + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("parse url failed: {0}")] + UrlParseError(#[from] url::ParseError), + + #[error("request failed: {0}")] + RequestError(#[from] reqwest_middleware::Error), +} + +impl From for ErrorReport { + fn from(value: AirtableFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type AirtableFdwResult = Result; diff --git a/wrappers/src/fdw/airtable_fdw/result.rs b/wrappers/src/fdw/airtable_fdw/result.rs index 2aeee2ee..c5526f91 100644 --- a/wrappers/src/fdw/airtable_fdw/result.rs +++ b/wrappers/src/fdw/airtable_fdw/result.rs @@ -1,14 +1,15 @@ use pgrx::pg_sys; -use pgrx::prelude::PgSqlErrorCode; use serde::de::{MapAccess, Visitor}; use serde::{Deserialize, Deserializer}; -use serde_json::{value::Number, Value}; +use serde_json::Value; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; use std::str::FromStr; use supabase_wrappers::prelude::*; +use super::{AirtableFdwError, AirtableFdwResult}; + #[derive(Deserialize, Debug)] pub struct AirtableResponse { pub records: Vec, @@ -84,21 +85,9 @@ impl<'de> Deserialize<'de> for AirtableFields { } impl AirtableRecord { - pub fn to_row(&self, columns: &[Column]) -> Row { + pub(super) fn to_row(&self, columns: &[Column]) -> AirtableFdwResult { let mut row = Row::new(); - macro_rules! col_to_cell { - ($col:ident, $src_type:ident, $conv:expr) => {{ - self.fields.0.get(&$col.name).and_then(|val| { - if let Value::$src_type(v) = val { - $conv(v) - } else { - panic!("column '{}' data type not match", $col.name) - } - }) - }}; - } - for col in columns.iter() { if col.name == "id" { row.push("id", Some(Cell::String(self.id.clone()))); @@ -106,52 +95,129 @@ impl AirtableRecord { } let cell = match col.type_oid { - pg_sys::BOOLOID => col_to_cell!(col, Bool, |v: &bool| Some(Cell::Bool(*v))), - pg_sys::CHAROID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I8(n as i8)) - }), - pg_sys::INT2OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I16(n as i16)) - }), - pg_sys::FLOAT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64().map(|n| Cell::F32(n as f32)) - }), - pg_sys::INT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I32(n as i32)) - }), - pg_sys::FLOAT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_f64().map(Cell::F64) }) - } - pg_sys::INT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_i64().map(Cell::I64) }) - } - pg_sys::NUMERICOID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64() - .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())) - }), - pg_sys::TEXTOID => { - col_to_cell!(col, String, |v: &String| { Some(Cell::String(v.clone())) }) - } - pg_sys::DATEOID => col_to_cell!(col, String, |v: &String| { - pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date) - }), - pg_sys::TIMESTAMPOID => col_to_cell!(col, String, |v: &String| { - pgrx::Timestamp::from_str(v.as_str()) - .ok() - .map(Cell::Timestamp) - }), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("column '{}' data type not supported", col.name), - ); - None - } - }; + pg_sys::BOOLOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Bool(v) = val { + Ok(Some(Cell::Bool(*v))) + } else { + Err(()) + } + }, + ), + pg_sys::CHAROID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I8(n as i8))) + } else { + Err(()) + } + }, + ), + pg_sys::INT2OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I16(n as i16))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(|n| Cell::F32(n as f32))) + } else { + Err(()) + } + }, + ), + pg_sys::INT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I32(n as i32))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(Cell::F64)) + } else { + Err(()) + } + }, + ), + pg_sys::INT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(Cell::I64)) + } else { + Err(()) + } + }, + ), + pg_sys::NUMERICOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + let n = v + .as_f64() + .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())); + Ok(n) + } else { + Err(()) + } + }, + ), + pg_sys::TEXTOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(Some(Cell::String(v.clone()))) + } else { + Err(()) + } + }, + ), + pg_sys::DATEOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date)) + } else { + Err(()) + } + }, + ), + pg_sys::TIMESTAMPOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + let n = pgrx::Timestamp::from_str(v.as_str()) + .ok() + .map(Cell::Timestamp); + Ok(n) + } else { + Err(()) + } + }, + ), + _ => return Err(AirtableFdwError::UnsupportedColumnType(col.name.clone())), + } + .map_err(|_| AirtableFdwError::ColumnTypeNotMatch(col.name.clone()))?; row.push(&col.name, cell); } - row + Ok(row) } }