Skip to content

Commit

Permalink
Merge branch 'main' into fix/options-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 14, 2023
2 parents 85f0e65 + e3bb501 commit fc65429
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 129 deletions.
103 changes: 32 additions & 71 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::stats;
use pgrx::pg_sys;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use reqwest::{self, header};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
Expand All @@ -12,6 +10,7 @@ use supabase_wrappers::prelude::*;
use thiserror::Error;

use super::result::AirtableResponse;
use super::{AirtableFdwError, AirtableFdwResult};

fn create_client(api_key: &str) -> ClientWithMiddleware {
let mut headers = header::HeaderMap::new();
Expand Down Expand Up @@ -56,13 +55,12 @@ impl AirtableFdw {
}
}

#[inline]
fn set_limit_offset(
&self,
url: &str,
page_size: Option<usize>,
offset: Option<&str>,
) -> Result<String, url::ParseError> {
) -> AirtableFdwResult<String> {
let mut params = Vec::new();
if let Some(page_size) = page_size {
params.push(("pageSize", format!("{}", page_size)));
Expand All @@ -71,51 +69,29 @@ impl AirtableFdw {
params.push(("offset", offset.to_string()));
}

Url::parse_with_params(url, &params).map(|x| x.into())
Ok(Url::parse_with_params(url, &params).map(|x| x.into())?)
}

// convert response body text to rows
fn parse_resp(&self, resp_body: &str, columns: &[Column]) -> (Vec<Row>, Option<String>) {
fn parse_resp(
&self,
resp_body: &str,
columns: &[Column],
) -> AirtableFdwResult<(Vec<Row>, Option<String>)> {
let response: AirtableResponse = serde_json::from_str(resp_body).unwrap();
let mut result = Vec::new();

for record in response.records.iter() {
result.push(record.to_row(columns));
result.push(record.to_row(columns)?);
}

(result, response.offset)
}
}

macro_rules! report_fetch_error {
($url:ident, $err:ident) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("fetch {} failed: {}", $url, $err),
)
};
}

#[derive(Error, Debug)]
enum AirtableFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<AirtableFdwError> for ErrorReport {
fn from(value: AirtableFdwError) -> Self {
match value {
AirtableFdwError::CreateRuntimeError(e) => e.into(),
AirtableFdwError::Options(e) => e.into(),
}
Ok((result, response.offset))
}
}

// TODO Add support for INSERT, UPDATE, DELETE
impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, AirtableFdwError> {
fn new(options: &HashMap<String, String>) -> AirtableFdwResult<Self> {
let base_url = options
.get("api_url")
.map(|t| t.to_owned())
Expand Down Expand Up @@ -146,7 +122,7 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
_sorts: &[Sort], // TODO: Propagate sort
_limit: &Option<Limit>, // TODO: maxRecords
options: &HashMap<String, String>,
) -> Result<(), AirtableFdwError> {
) -> AirtableFdwResult<()> {
let base_id = require_option("base_id", options)?;
let table_id = require_option("table_id", options)?;
let view_id = options.get("view_id");
Expand All @@ -160,38 +136,23 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
// Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each
// time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more
// state so starting with the simpler solution.
let url = match self.set_limit_offset(&url, None, offset.as_deref()) {
Ok(url) => url,
Err(err) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("internal error: {}", err),
);
return Ok(());
}
};

match self.rt.block_on(client.get(&url).send()) {
Ok(resp) => match resp.error_for_status() {
Ok(resp) => {
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::BytesIn,
resp.content_length().unwrap_or(0) as i64,
);
let body = self.rt.block_on(resp.text()).unwrap();
let (new_rows, new_offset) = self.parse_resp(&body, columns);
rows.extend(new_rows.into_iter());

if let Some(new_offset) = new_offset {
offset = Some(new_offset);
} else {
break;
}
}
Err(err) => report_fetch_error!(url, err),
},
Err(err) => report_fetch_error!(url, err),
let url = self.set_limit_offset(&url, None, offset.as_deref())?;

let body = self.rt.block_on(client.get(&url).send()).and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})?;

let (new_rows, new_offset) = self.parse_resp(&body, columns)?;
rows.extend(new_rows);

stats::inc_stats(Self::FDW_NAME, stats::Metric::BytesIn, body.len() as i64);

if let Some(new_offset) = new_offset {
offset = Some(new_offset);
} else {
break;
}
}
}
Expand All @@ -203,7 +164,7 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, AirtableFdwError> {
fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult<Option<()>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return Ok(result
Expand All @@ -215,15 +176,15 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
Ok(None)
}

fn end_scan(&mut self) -> Result<(), AirtableFdwError> {
fn end_scan(&mut self) -> AirtableFdwResult<()> {
self.scan_result.take();
Ok(())
}

fn validator(
options: Vec<Option<String>>,
catalog: Option<pg_sys::Oid>,
) -> Result<(), AirtableFdwError> {
) -> AirtableFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "base_id")?;
Expand Down
39 changes: 39 additions & 0 deletions wrappers/src/fdw/airtable_fdw/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,42 @@
#![allow(clippy::module_inception)]
mod airtable_fdw;
mod result;

use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use thiserror::Error;

use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError};

#[derive(Error, Debug)]
enum AirtableFdwError {
#[error("column '{0}' data type is not supported")]
UnsupportedColumnType(String),

#[error("column '{0}' data type not match")]
ColumnTypeNotMatch(String),

#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),

#[error("parse url failed: {0}")]
UrlParseError(#[from] url::ParseError),

#[error("request failed: {0}")]
RequestError(#[from] reqwest_middleware::Error),

#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<AirtableFdwError> for ErrorReport {
fn from(value: AirtableFdwError) -> Self {
match value {
AirtableFdwError::CreateRuntimeError(e) => e.into(),
AirtableFdwError::Options(e) => e.into(),
_ => ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), ""),
}
}
}

type AirtableFdwResult<T> = Result<T, AirtableFdwError>;
Loading

0 comments on commit fc65429

Please sign in to comment.