Skip to content

Commit

Permalink
chore(airtable_fdw): refactor error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
burmecia committed Sep 13, 2023
1 parent 5306290 commit 5ee3e52
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 120 deletions.
94 changes: 32 additions & 62 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::stats;
use pgrx::pg_sys;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use reqwest::{self, header};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
Expand All @@ -11,6 +9,7 @@ use url::Url;
use supabase_wrappers::prelude::*;

use super::result::AirtableResponse;
use super::{AirtableFdwError, AirtableFdwResult};

fn create_client(api_key: &str) -> ClientWithMiddleware {
let mut headers = header::HeaderMap::new();
Expand Down Expand Up @@ -55,13 +54,12 @@ impl AirtableFdw {
}
}

#[inline]
fn set_limit_offset(
&self,
url: &str,
page_size: Option<usize>,
offset: Option<&str>,
) -> Result<String, url::ParseError> {
) -> AirtableFdwResult<String> {
let mut params = Vec::new();
if let Some(page_size) = page_size {
params.push(("pageSize", format!("{}", page_size)));
Expand All @@ -70,42 +68,29 @@ impl AirtableFdw {
params.push(("offset", offset.to_string()));
}

Url::parse_with_params(url, &params).map(|x| x.into())
Ok(Url::parse_with_params(url, &params).map(|x| x.into())?)
}

// convert response body text to rows
fn parse_resp(&self, resp_body: &str, columns: &[Column]) -> (Vec<Row>, Option<String>) {
fn parse_resp(
&self,
resp_body: &str,
columns: &[Column],
) -> AirtableFdwResult<(Vec<Row>, Option<String>)> {
let response: AirtableResponse = serde_json::from_str(resp_body).unwrap();
let mut result = Vec::new();

for record in response.records.iter() {
result.push(record.to_row(columns));
result.push(record.to_row(columns)?);
}

(result, response.offset)
}
}

macro_rules! report_fetch_error {
($url:ident, $err:ident) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("fetch {} failed: {}", $url, $err),
)
};
}

enum AirtableFdwError {}

impl From<AirtableFdwError> for ErrorReport {
fn from(_value: AirtableFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
Ok((result, response.offset))
}
}

// TODO Add support for INSERT, UPDATE, DELETE
impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, AirtableFdwError> {
fn new(options: &HashMap<String, String>) -> AirtableFdwResult<Self> {
let base_url = options
.get("api_url")
.map(|t| t.to_owned())
Expand Down Expand Up @@ -135,7 +120,7 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
_sorts: &[Sort], // TODO: Propagate sort
_limit: &Option<Limit>, // TODO: maxRecords
options: &HashMap<String, String>,
) -> Result<(), AirtableFdwError> {
) -> AirtableFdwResult<()> {
let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| {
require_option("table_id", options)
.map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id")))
Expand All @@ -153,38 +138,23 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
// Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each
// time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more
// state so starting with the simpler solution.
let url = match self.set_limit_offset(&url, None, offset.as_deref()) {
Ok(url) => url,
Err(err) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("internal error: {}", err),
);
return Ok(());
}
};

match self.rt.block_on(client.get(&url).send()) {
Ok(resp) => match resp.error_for_status() {
Ok(resp) => {
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::BytesIn,
resp.content_length().unwrap_or(0) as i64,
);
let body = self.rt.block_on(resp.text()).unwrap();
let (new_rows, new_offset) = self.parse_resp(&body, columns);
rows.extend(new_rows.into_iter());

if let Some(new_offset) = new_offset {
offset = Some(new_offset);
} else {
break;
}
}
Err(err) => report_fetch_error!(url, err),
},
Err(err) => report_fetch_error!(url, err),
let url = self.set_limit_offset(&url, None, offset.as_deref())?;

let body = self.rt.block_on(client.get(&url).send()).and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})?;

let (new_rows, new_offset) = self.parse_resp(&body, columns)?;
rows.extend(new_rows);

stats::inc_stats(Self::FDW_NAME, stats::Metric::BytesIn, body.len() as i64);

if let Some(new_offset) = new_offset {
offset = Some(new_offset);
} else {
break;
}
}
}
Expand All @@ -196,7 +166,7 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, AirtableFdwError> {
fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult<Option<()>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return Ok(result
Expand All @@ -208,15 +178,15 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
Ok(None)
}

fn end_scan(&mut self) -> Result<(), AirtableFdwError> {
fn end_scan(&mut self) -> AirtableFdwResult<()> {
self.scan_result.take();
Ok(())
}

fn validator(
options: Vec<Option<String>>,
catalog: Option<pg_sys::Oid>,
) -> Result<(), AirtableFdwError> {
) -> AirtableFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "base_id");
Expand Down
48 changes: 48 additions & 0 deletions wrappers/src/fdw/airtable_fdw/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
#![allow(clippy::module_inception)]
mod airtable_fdw;
mod result;

use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;

enum AirtableFdwError {
UnsupportedColumnType(String),
ColumnTypeNotMatch(String),
UrlParseError(url::ParseError),
RequestError(reqwest_middleware::Error),
}

impl From<AirtableFdwError> for ErrorReport {
fn from(value: AirtableFdwError) -> Self {
let error_message = match value {
AirtableFdwError::UnsupportedColumnType(s) => {
format!("column '{}' data type is not supported", s)
}
AirtableFdwError::ColumnTypeNotMatch(s) => {
format!("column '{}' data type not match", s)
}
AirtableFdwError::UrlParseError(err) => {
format!("parse url failed: {}", err)
}
AirtableFdwError::RequestError(err) => {
format!(
"fetch {} failed: {}",
err.url().map(|u| u.as_str()).unwrap_or_default(),
err
)
}
};
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
}
}

impl From<url::ParseError> for AirtableFdwError {
fn from(source: url::ParseError) -> Self {
AirtableFdwError::UrlParseError(source)
}
}

impl From<reqwest_middleware::Error> for AirtableFdwError {
fn from(source: reqwest_middleware::Error) -> Self {
AirtableFdwError::RequestError(source)
}
}

type AirtableFdwResult<T> = Result<T, AirtableFdwError>;
Loading

0 comments on commit 5ee3e52

Please sign in to comment.