Skip to content

Commit

Permalink
Merge pull request #169 from supabase/chore/stripe-fdw-error
Browse files Browse the repository at this point in the history
 chore(stripe_fdw): remove a few unwrap calls
  • Loading branch information
imor authored Oct 4, 2023
2 parents 0b36187 + 208d60a commit f256e95
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
15 changes: 14 additions & 1 deletion wrappers/src/fdw/stripe_fdw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod stripe_fdw;
mod tests;

use http::header::InvalidHeaderValue;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use thiserror::Error;
Expand All @@ -26,10 +27,22 @@ enum StripeFdwError {
UrlParseError(#[from] url::ParseError),

#[error("request failed: {0}")]
RequestError(#[from] reqwest_middleware::Error),
RequestError(#[from] reqwest::Error),

#[error("request middleware failed: {0}")]
RequestMiddlewareError(#[from] reqwest_middleware::Error),

#[error("parse JSON response failed: {0}")]
JsonParseError(#[from] serde_json::Error),

#[error("invalid api_key header: {0}")]
InvalidApiKeyHeader(#[from] InvalidHeaderValue),

#[error("invalid response")]
InvalidResponse,

#[error("invalid stats: {0}")]
InvalidStats(String),
}

impl From<StripeFdwError> for ErrorReport {
Expand Down
63 changes: 29 additions & 34 deletions wrappers/src/fdw/stripe_fdw/stripe_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@ use supabase_wrappers::prelude::*;

use super::{StripeFdwError, StripeFdwResult};

fn create_client(api_key: &str) -> ClientWithMiddleware {
fn create_client(api_key: &str) -> StripeFdwResult<ClientWithMiddleware> {
let mut headers = header::HeaderMap::new();
let value = format!("Bearer {}", api_key);
let mut auth_value = header::HeaderValue::from_str(&value).unwrap();
let mut auth_value = header::HeaderValue::from_str(&value)?;
auth_value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, auth_value);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()
.unwrap();
.build()?;
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
ClientBuilder::new(client)
Ok(ClientBuilder::new(client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build()
.build())
}

fn body_to_rows(
Expand Down Expand Up @@ -57,8 +56,10 @@ fn body_to_rows(
.as_object()
.and_then(|v| v.get(*bal_type))
.and_then(|v| v.as_array())
.map(|v| v[0].as_object().unwrap().clone())
.unwrap();
.map(|v| v[0].as_object().ok_or(StripeFdwError::InvalidResponse))
.transpose()?
.ok_or(StripeFdwError::InvalidResponse)?
.clone();
obj.insert(
"balance_type".to_string(),
JsonValue::String(bal_type.to_string()),
Expand All @@ -71,14 +72,14 @@ fn body_to_rows(
value
.as_object()
.map(|v| vec![JsonValue::Object(v.clone())])
.unwrap()
.ok_or(StripeFdwError::InvalidResponse)?
};
let objs = if is_list {
value
.as_object()
.and_then(|v| v.get("data"))
.and_then(|v| v.as_array())
.unwrap()
.ok_or(StripeFdwError::InvalidResponse)?
} else {
&single_wrapped
};
Expand Down Expand Up @@ -237,10 +238,14 @@ fn set_stats_metadata(stats_metadata: JsonB) {

// increase stats metadata 'request_cnt' by 1
#[inline]
fn inc_stats_request_cnt(stats_metadata: &mut JsonB) {
fn inc_stats_request_cnt(stats_metadata: &mut JsonB) -> StripeFdwResult<()> {
if let Some(v) = stats_metadata.0.get_mut("request_cnt") {
*v = (v.as_i64().unwrap() + 1).into();
*v = (v.as_i64().ok_or(StripeFdwError::InvalidStats(
"`request_cnt` is not a number".to_string(),
))? + 1)
.into();
};
Ok(())
}

#[wrappers_fdw(
Expand Down Expand Up @@ -628,7 +633,8 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
let key_id = require_option("api_key_id", options)?;
get_vault_secret(key_id).map(|api_key| create_client(&api_key))
}
};
}
.transpose()?;

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);

Expand Down Expand Up @@ -671,12 +677,11 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
while page < page_cnt {
// build url
let url = self.build_url(obj, quals, page_size, &cursor)?;
if url.is_none() {
let Some(url) = url else {
return Ok(());
}
let url = url.unwrap();
};

inc_stats_request_cnt(&mut stats_metadata);
inc_stats_request_cnt(&mut stats_metadata)?;

// make api call
let body = self.rt.block_on(client.get(url).send()).and_then(|resp| {
Expand Down Expand Up @@ -755,15 +760,15 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {

fn insert(&mut self, src: &Row) -> StripeFdwResult<()> {
if let Some(ref mut client) = self.client {
let url = self.base_url.join(&self.obj).unwrap();
let url = self.base_url.join(&self.obj)?;
let body = row_to_body(src)?;
if body.is_null() {
return Ok(());
}

let mut stats_metadata = get_stats_metadata();

inc_stats_request_cnt(&mut stats_metadata);
inc_stats_request_cnt(&mut stats_metadata)?;

// call Stripe API
let body = self
Expand Down Expand Up @@ -798,18 +803,13 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {

match rowid {
Cell::String(rowid) => {
let url = self
.base_url
.join(&format!("{}/", self.obj))
.unwrap()
.join(rowid)
.unwrap();
let url = self.base_url.join(&format!("{}/", self.obj))?.join(rowid)?;
let body = row_to_body(new_row)?;
if body.is_null() {
return Ok(());
}

inc_stats_request_cnt(&mut stats_metadata);
inc_stats_request_cnt(&mut stats_metadata)?;

// call Stripe API
let body = self
Expand Down Expand Up @@ -847,14 +847,9 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {

match rowid {
Cell::String(rowid) => {
let url = self
.base_url
.join(&format!("{}/", self.obj))
.unwrap()
.join(rowid)
.unwrap();

inc_stats_request_cnt(&mut stats_metadata);
let url = self.base_url.join(&format!("{}/", self.obj))?.join(rowid)?;

inc_stats_request_cnt(&mut stats_metadata)?;

// call Stripe API
let body = self
Expand Down

0 comments on commit f256e95

Please sign in to comment.