Skip to content

Commit

Permalink
Implement StreamTable and StreamTableProvider (#7994) (#8021)
Browse files Browse the repository at this point in the history
* Implement FIFO using extension points (#7994)

* Clippy

* Rename to StreamTable and make public

* Add StreamEncoding

* Rework sort order

* Fix logical conflicts

* Format

* Add DefaultTableProvider

* Fix doc

* Fix project sort keys and CSV headers

* Respect batch size on read

* Tests are updated

* Resolving clippy

---------

Co-authored-by: metesynnada <[email protected]>
  • Loading branch information
tustvold and metesynnada authored Nov 15, 2023
1 parent 77a6326 commit 020b8fc
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 190 deletions.
37 changes: 5 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::PartitionedFile;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{
create_ordering,
file_format::{
arrow::ArrowFormat,
avro::AvroFormat,
Expand All @@ -40,15 +41,13 @@ use crate::datasource::{
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
Expand All @@ -57,10 +56,9 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -677,34 +675,7 @@ impl ListingTable {

/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in &self.options.file_sort_order {
// Construct PhsyicalSortExpr objects from Expr objects:
let sort_exprs = exprs
.iter()
.map(|expr| {
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
if let Expr::Column(col) = expr.as_ref() {
let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
Ok(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
} else {
plan_err!("Expected single column references in output_ordering, got {expr}")
}
} else {
plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
}
})
.collect::<Result<Vec<_>>>()?;
all_sort_orders.push(sort_exprs);
}
Ok(all_sort_orders)
create_ordering(&self.table_schema, &self.options.file_sort_order)
}
}

Expand Down Expand Up @@ -1040,9 +1011,11 @@ mod tests {

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow_schema::SortOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
use rstest::*;
use tempfile::TempDir;

Expand Down
9 changes: 2 additions & 7 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable;
use async_trait::async_trait;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
#[derive(Debug, Default)]
pub struct ListingTableFactory {}

impl ListingTableFactory {
/// Creates a new `ListingTableFactory`
pub fn new() -> Self {
Self {}
}
}

impl Default for ListingTableFactory {
fn default() -> Self {
Self::new()
Self::default()
}
}

Expand Down
44 changes: 44 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod memory;
pub mod physical_plan;
pub mod provider;
mod statistics;
pub mod stream;
pub mod streaming;
pub mod view;

Expand All @@ -43,3 +44,46 @@ pub use self::provider::TableProvider;
pub use self::view::ViewTable;
pub use crate::logical_expr::TableType;
pub use statistics::get_statistics_with_limit;

use arrow_schema::{Schema, SortOptions};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::Expr;
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};

fn create_ordering(
schema: &Schema,
sort_order: &[Vec<Expr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
}
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
}
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
}
}
if !sort_exprs.is_empty() {
all_sort_orders.push(sort_exprs);
}
}
Ok(all_sort_orders)
}
40 changes: 40 additions & 0 deletions datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::listing_table_factory::ListingTableFactory;
use crate::datasource::stream::StreamTableFactory;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
Expand Down Expand Up @@ -214,3 +216,41 @@ pub trait TableProviderFactory: Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// The default [`TableProviderFactory`]
///
/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`],
/// otherwise calls [`ListingTableFactory::create`]
#[derive(Debug, Default)]
pub struct DefaultTableFactory {
stream: StreamTableFactory,
listing: ListingTableFactory,
}

impl DefaultTableFactory {
/// Creates a new [`DefaultTableFactory`]
pub fn new() -> Self {
Self::default()
}
}

#[async_trait]
impl TableProviderFactory for DefaultTableFactory {
async fn create(
&self,
state: &SessionState,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let mut unbounded = cmd.unbounded;
for (k, v) in &cmd.options {
if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
unbounded = true
}
}

match unbounded {
true => self.stream.create(state, cmd).await,
false => self.listing.create(state, cmd).await,
}
}
}
Loading

0 comments on commit 020b8fc

Please sign in to comment.