Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return Row from iter_scan instead of mutating in-place #155

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::collections::HashMap;
use std::ffi::CStr;
use std::fmt;
use std::iter::Zip;
use std::mem;
use std::slice::Iter;

// fdw system catalog oids
Expand Down Expand Up @@ -231,12 +230,6 @@ impl Row {
self.cells.retain(|_| *iter.next().unwrap());
}

/// Replace `self` with the source row
#[inline]
pub fn replace_with(&mut self, src: Row) {
let _ = mem::replace(self, src);
}

/// Clear the row, removing all column names and cells
pub fn clear(&mut self) {
self.cols.clear();
Expand Down Expand Up @@ -517,7 +510,7 @@ pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
/// FDW must save fetched foreign data into the [`Row`], or return `None` if no more rows to read.
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, E>;
fn iter_scan(&mut self) -> Result<Option<Row>, E>;

/// Called when restart the scan from the beginning.
///
Expand Down
7 changes: 4 additions & 3 deletions supabase-wrappers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
//! Ok(())
//! }
//!
//! fn iter_scan(&mut self, row: &mut Row) -> HelloWorldFdwResult<Option<()>> {
//! fn iter_scan(&mut self) -> HelloWorldFdwResult<Option<Row>> {
//! // Return None when done
//! Ok(None)
//! }
Expand Down Expand Up @@ -195,7 +195,8 @@
//! Ok(())
//! }
//!
//! fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, HelloWorldFdwError> {
//! fn iter_scan(&mut self) -> Result<Option<Row>, HelloWorldFdwError> {
//! let mut row = Row::new();
//! // this is called on each row and we only return one row here
//! if self.row_cnt < 1 {
//! // add values to row if they are in target column list
Expand All @@ -210,7 +211,7 @@
//! self.row_cnt += 1;
//!
//! // return the 'Some(())' to Postgres and continue data scan
//! return Ok(Some(()));
//! return Ok(Some(row));
//! }
//!
//! // return 'None' to stop data scan
Expand Down
7 changes: 4 additions & 3 deletions supabase-wrappers/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> {
}

#[inline]
fn iter_scan(&mut self) -> Result<Option<()>, E> {
self.instance.iter_scan(&mut self.row)
fn iter_scan(&mut self) -> Result<Option<Row>, E> {
self.instance.iter_scan()
}

#[inline]
Expand Down Expand Up @@ -337,7 +337,8 @@ pub(super) extern "C" fn iterate_foreign_scan<E: Into<ErrorReport>, W: ForeignDa
polyfill::exec_clear_tuple(slot);

state.row.clear();
if state.iter_scan().report_unwrap().is_some() {
if let Some(row) = state.iter_scan().report_unwrap() {
let _ = std::mem::replace(&mut state.row, row);
if state.row.cols.len() != state.tgts.len() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
Expand Down
8 changes: 2 additions & 6 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::collections::HashMap;
use url::Url;

use supabase_wrappers::prelude::*;
use thiserror::Error;

use super::result::AirtableResponse;
use super::{AirtableFdwError, AirtableFdwResult};
Expand Down Expand Up @@ -164,13 +163,10 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult<Option<()>> {
fn iter_scan(&mut self) -> AirtableFdwResult<Option<Row>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return Ok(result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row)));
return Ok(result.drain(0..1).last());
}
}
Ok(None)
Expand Down
7 changes: 4 additions & 3 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, BigQueryFdwError> {
fn iter_scan(&mut self) -> Result<Option<Row>, BigQueryFdwError> {
let mut row = Row::new();
if let Some(client) = &self.client {
if let Some(ref mut rs) = self.scan_result {
let mut extract_row = |rs: &mut ResultSet| {
Expand All @@ -374,7 +375,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
};

if extract_row(rs) {
return Ok(Some(()));
return Ok(Some(row));
}

// deal with pagination
Expand All @@ -394,7 +395,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
// replace result set with data from the new page
*rs = ResultSet::new(QueryResponse::from(resp));
if extract_row(rs) {
return Ok(Some(()));
return Ok(Some(row));
}
}
Err(err) => {
Expand Down
5 changes: 3 additions & 2 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, ClickHouseFdwError> {
fn iter_scan(&mut self) -> Result<Option<Row>, ClickHouseFdwError> {
let mut row = Row::new();
if let Some(block) = &self.scan_blk {
let mut rows = block.rows();

Expand All @@ -312,7 +313,7 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
row.push(col_name, cell);
}
self.row_idx += 1;
return Ok(Some(()));
return Ok(Some(row));
}
}
Ok(None)
Expand Down
8 changes: 2 additions & 6 deletions wrappers/src/fdw/firebase_fdw/firebase_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,11 @@ impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> FirebaseFdwResult<Option<()>> {
fn iter_scan(&mut self) -> FirebaseFdwResult<Option<Row>> {
if self.scan_result.is_empty() {
Ok(None)
} else {
Ok(self
.scan_result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row)))
Ok(self.scan_result.drain(0..1).last())
}
}

Expand Down
5 changes: 3 additions & 2 deletions wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> HelloWorldFdwResult<Option<()>> {
fn iter_scan(&mut self) -> HelloWorldFdwResult<Option<Row>> {
let mut row = Row::new();
// this is called on each row and we only return one row here
if self.row_cnt < 1 {
// add values to row if they are in target column list
Expand All @@ -81,7 +82,7 @@ impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
self.row_cnt += 1;

// return Some(()) to Postgres and continue data scan
return Ok(Some(()));
return Ok(Some(row));
}

// return 'None' to stop data scan
Expand Down
8 changes: 2 additions & 6 deletions wrappers/src/fdw/logflare_fdw/logflare_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,11 @@ impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> LogflareFdwResult<Option<()>> {
fn iter_scan(&mut self) -> LogflareFdwResult<Option<Row>> {
if self.scan_result.is_empty() {
Ok(None)
} else {
Ok(self
.scan_result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row)))
Ok(self.scan_result.drain(0..1).last())
}
}

Expand Down
16 changes: 9 additions & 7 deletions wrappers/src/fdw/s3_fdw/s3_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,21 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> S3FdwResult<Option<()>> {
fn iter_scan(&mut self) -> S3FdwResult<Option<Row>> {
let mut row = Row::new();
// read parquet record
if let Parser::Parquet(ref mut s3parquet) = &mut self.parser {
if self.rt.block_on(s3parquet.refill())?.is_none() {
return Ok(None);
}
let ret = s3parquet.read_into_row(row, &self.tgt_cols)?;
if ret.is_some() {
let ret = s3parquet.read_into_row(&mut row, &self.tgt_cols)?;
return if ret.is_some() {
self.rows_out += 1;
Ok(Some(row))
} else {
stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsOut, self.rows_out);
}
return Ok(ret);
Ok(None)
};
}

// read csv or jsonl record
Expand All @@ -315,7 +317,7 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
row.push(&col.name, cell);
}
self.rows_out += 1;
return Ok(Some(()));
return Ok(Some(row));
} else {
// no more records left in the local buffer, refill from remote
self.buf.clear();
Expand Down Expand Up @@ -349,7 +351,7 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
}
}
self.rows_out += 1;
return Ok(Some(()));
return Ok(Some(row));
}
None => {
// no more records left in the local buffer, refill from remote
Expand Down
7 changes: 2 additions & 5 deletions wrappers/src/fdw/stripe_fdw/stripe_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,13 +730,10 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> StripeFdwResult<Option<()>> {
fn iter_scan(&mut self) -> StripeFdwResult<Option<Row>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return Ok(result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row)));
return Ok(result.drain(0..1).last());
}
}
Ok(None)
Expand Down
Loading