Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for qdrant foreign data wrapper #149

Merged
merged 11 commits into from
Sep 15, 2023
8 changes: 3 additions & 5 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::prelude::*;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::*;

use super::utils;

// create a fdw instance
pub(super) unsafe fn create_fdw_instance<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
ftable_id: pg_sys::Oid,
) -> W {
let ftable = pg_sys::GetForeignTable(ftable_id);
let fserver = pg_sys::GetForeignServer((*ftable).serverid);
let fserver_opts = utils::options_to_hashmap((*fserver).options);
let fserver_opts = options_to_hashmap((*fserver).options).report_unwrap();
let wrapper = W::new(&fserver_opts);
wrapper.map_err(|e| e.into()).report()
wrapper.report_unwrap()
}
2 changes: 2 additions & 0 deletions supabase-wrappers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,13 @@
//! - [Logflare](https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw): A FDW for [Logflare](https://logflare.app/) which supports data read only.

pub mod interface;
pub mod options;
pub mod utils;

/// The prelude includes all necessary imports to make Wrappers work
pub mod prelude {
pub use crate::interface::*;
pub use crate::options::*;
pub use crate::utils::*;
pub use crate::wrappers_fdw;
pub use ::tokio::runtime::Runtime;
Expand Down
25 changes: 9 additions & 16 deletions supabase-wrappers/src/modify.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{
debug2, memcxt::PgMemoryContexts, pg_sys::Oid, prelude::*, rel::PgRelation,
tupdesc::PgTupleDesc, FromDatum, PgSqlErrorCode,
Expand Down Expand Up @@ -81,12 +81,8 @@ pub(super) extern "C" fn add_foreign_update_targets(
unsafe {
// get rowid column name from table options
let ftable = pg_sys::GetForeignTable((*target_relation).rd_id);
let opts = utils::options_to_hashmap((*ftable).options);
let rowid_name = if let Some(name) = require_option("rowid_column", &opts) {
name
} else {
return;
};
let opts = options_to_hashmap((*ftable).options).report_unwrap();
let rowid_name = require_option("rowid_column", &opts).report_unwrap();

// find rowid attribute
let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att);
Expand Down Expand Up @@ -139,7 +135,7 @@ pub(super) extern "C" fn plan_foreign_modify<E: Into<ErrorReport>, W: ForeignDat

// get rowid column name from table options
let ftable = pg_sys::GetForeignTable(rel.oid());
let opts = utils::options_to_hashmap((*ftable).options);
let opts = options_to_hashmap((*ftable).options).report_unwrap();
let rowid_name = opts.get("rowid_column");
if rowid_name.is_none() {
report_error(
Expand Down Expand Up @@ -208,7 +204,7 @@ pub(super) extern "C" fn begin_foreign_modify<E: Into<ErrorReport>, W: ForeignDa
state.rowid_attno =
pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c);

state.begin_modify().map_err(|e| e.into()).report();
state.begin_modify().report_unwrap();

(*rinfo).ri_FdwState = state.into_pg() as _;
}
Expand All @@ -228,7 +224,7 @@ pub(super) extern "C" fn exec_foreign_insert<E: Into<ErrorReport>, W: ForeignDat
);

let row = utils::tuple_table_slot_to_row(slot);
state.insert(&row).map_err(|e| e.into()).report();
state.insert(&row).report_unwrap();
}

slot
Expand Down Expand Up @@ -258,7 +254,7 @@ pub(super) extern "C" fn exec_foreign_delete<E: Into<ErrorReport>, W: ForeignDat

let cell = get_rowid_cell(&state, plan_slot);
if let Some(rowid) = cell {
state.delete(&rowid).map_err(|e| e.into()).report();
state.delete(&rowid).report_unwrap();
}
}

Expand Down Expand Up @@ -292,10 +288,7 @@ pub(super) extern "C" fn exec_foreign_update<E: Into<ErrorReport>, W: ForeignDat
}) && state.rowid_name != col.as_str()
});

state
.update(&rowid, &new_row)
.map_err(|e| e.into())
.report();
state.update(&rowid, &new_row).report_unwrap();
}
}

Expand All @@ -312,7 +305,7 @@ pub(super) extern "C" fn end_foreign_modify<E: Into<ErrorReport>, W: ForeignData
let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState<E, W>;
if !fdw_state.is_null() {
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(fdw_state);
state.end_modify().map_err(|e| e.into()).report();
state.end_modify().report_unwrap();
}
}
}
121 changes: 121 additions & 0 deletions supabase-wrappers/src/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{pg_sys, PgList, PgSqlErrorCode};
use std::collections::HashMap;
use std::ffi::CStr;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum OptionsError {
#[error("required option `{0}` is not specified")]
OptionNameNotFound(String),
#[error("option name `{0}` is not a valid UTF-8 string")]
OptionNameIsInvalidUtf8(String),
#[error("option value `{0}` is not a valid UTF-8 string")]
OptionValueIsInvalidUtf8(String),
}

impl From<OptionsError> for ErrorReport {
fn from(value: OptionsError) -> Self {
let error_message = format!("{value}");
match value {
OptionsError::OptionNameNotFound(_) => ErrorReport::new(
PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND,
error_message,
"",
),
OptionsError::OptionNameIsInvalidUtf8(_)
| OptionsError::OptionValueIsInvalidUtf8(_) => ErrorReport::new(
PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT,
error_message,
"",
),
}
}
}

/// Get required option value from the `options` map
///
/// Get the required option's value from `options` map, return None and report
/// error and stop current transaction if it does not exist.
///
/// For example,
///
/// ```rust,no_run
/// # use supabase_wrappers::prelude::require_option;
/// # use std::collections::HashMap;
/// # use supabase_wrappers::options::OptionsError;
/// # fn main() -> Result<(), OptionsError> {
/// # let options = &HashMap::new();
/// require_option("my_option", options)?;
/// # Ok(())
/// # }
/// ```
pub fn require_option<'map>(
opt_name: &str,
options: &'map HashMap<String, String>,
) -> Result<&'map str, OptionsError> {
options
.get(opt_name)
.map(|t| t.as_ref())
.ok_or(OptionsError::OptionNameNotFound(opt_name.to_string()))
}

/// Get required option value from the `options` map or a provided default
///
/// Get the required option's value from `options` map, return default if it does not exist.
///
/// For example,
///
/// ```rust,no_run
/// # use supabase_wrappers::prelude::require_option_or;
/// # use std::collections::HashMap;
/// # let options = &HashMap::new();
/// require_option_or("my_option", options, "default value");
/// ```
pub fn require_option_or<'a>(
opt_name: &str,
options: &'a HashMap<String, String>,
default: &'a str,
) -> &'a str {
options.get(opt_name).map(|t| t.as_ref()).unwrap_or(default)
}

/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator)
pub fn check_options_contain(opt_list: &[Option<String>], tgt: &str) -> Result<(), OptionsError> {
let search_key = tgt.to_owned() + "=";
if !opt_list.iter().any(|opt| {
if let Some(s) = opt {
s.starts_with(&search_key)
} else {
false
}
}) {
Err(OptionsError::OptionNameNotFound(tgt.to_string()))
} else {
Ok(())
}
}

// convert options definition to hashmap
pub(super) unsafe fn options_to_hashmap(
options: *mut pg_sys::List,
) -> Result<HashMap<String, String>, OptionsError> {
let mut ret = HashMap::new();
let options: PgList<pg_sys::DefElem> = PgList::from_pg(options);
for option in options.iter_ptr() {
let name = CStr::from_ptr((*option).defname);
let value = CStr::from_ptr(pg_sys::defGetString(option));
let name = name.to_str().map_err(|_| {
OptionsError::OptionNameIsInvalidUtf8(
String::from_utf8_lossy(name.to_bytes()).to_string(),
)
})?;
let value = value.to_str().map_err(|_| {
OptionsError::OptionValueIsInvalidUtf8(
String::from_utf8_lossy(value.to_bytes()).to_string(),
)
})?;
ret.insert(name.to_string(), value.to_string());
}
Ok(ret)
}
17 changes: 9 additions & 8 deletions supabase-wrappers/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ use pgrx::{
use std::collections::HashMap;
use std::marker::PhantomData;

use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use std::os::raw::c_int;
use std::ptr;

use crate::instance;
use crate::interface::{Cell, Column, Limit, Qual, Row, Sort, Value};
use crate::limit::*;
use crate::memctx;
use crate::options::options_to_hashmap;
use crate::polyfill;
use crate::prelude::ForeignDataWrapper;
use crate::qual::*;
use crate::sort::*;
use crate::utils::{self, report_error, SerdeList};
use crate::utils::{self, report_error, ReportableError, SerdeList};

// Fdw private state for scan
struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
Expand Down Expand Up @@ -137,10 +138,10 @@ pub(super) extern "C" fn get_foreign_rel_size<E: Into<ErrorReport>, W: ForeignDa

// get foreign table options
let ftable = pg_sys::GetForeignTable(foreigntableid);
state.opts = utils::options_to_hashmap((*ftable).options);
state.opts = options_to_hashmap((*ftable).options).report_unwrap();

// get estimate row count and mean row width
let (rows, width) = state.get_rel_size().map_err(|e| e.into()).report();
let (rows, width) = state.get_rel_size().report_unwrap();
(*baserel).rows = rows as f64;
(*(*baserel).reltarget).width = width;

Expand Down Expand Up @@ -305,7 +306,7 @@ pub(super) extern "C" fn begin_foreign_scan<E: Into<ErrorReport>, W: ForeignData

// begin scan if it is not EXPLAIN statement
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 {
state.begin_scan().map_err(|e| e.into()).report();
state.begin_scan().report_unwrap();

let rel = scan_state.ss_currentRelation;
let tup_desc = (*rel).rd_att;
Expand Down Expand Up @@ -336,7 +337,7 @@ pub(super) extern "C" fn iterate_foreign_scan<E: Into<ErrorReport>, W: ForeignDa
polyfill::exec_clear_tuple(slot);

state.row.clear();
if state.iter_scan().map_err(|e| e.into()).report().is_some() {
if state.iter_scan().report_unwrap().is_some() {
if state.row.cols.len() != state.tgts.len() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
Expand Down Expand Up @@ -375,7 +376,7 @@ pub(super) extern "C" fn re_scan_foreign_scan<E: Into<ErrorReport>, W: ForeignDa
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
if !fdw_state.is_null() {
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
state.re_scan().map_err(|e| e.into()).report();
state.re_scan().report_unwrap();
}
}
}
Expand All @@ -392,6 +393,6 @@ pub(super) extern "C" fn end_foreign_scan<E: Into<ErrorReport>, W: ForeignDataWr
}

let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
state.end_scan().map_err(|e| e.into()).report();
state.end_scan().report_unwrap();
}
}
Loading