Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for prepared statements #600

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 169 additions & 152 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ criterion2 = "0.7.0"
crossterm = "0.25"
ctrlc = "3.2.2"
dag-jose = "0.2"
datafusion = "42"
datafusion = "43"
datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
deadqueue = "0.2.3"
derivative = "2.2"
Expand Down Expand Up @@ -270,3 +270,7 @@ arrow-row = { git = "https://github.com/apache/arrow-rs.git", branch = "master"
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }
arrow-select = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }

[patch."https://github.com/datafusion-contrib/datafusion-federation.git"]
# Can remove once https://github.com/datafusion-contrib/datafusion-federation/pull/81 merges
datafusion-flight-sql-server = { git = "https://github.com/nathanielc/datafusion-federation.git", branch = "feat/put-prepared" }
4 changes: 2 additions & 2 deletions flight/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ pub async fn run(
/// Constructs a new server and can be started.
pub fn new_server(ctx: SessionContext) -> anyhow::Result<Router> {
let svc = FlightServiceServer::new(
FlightSqlService::new(ctx.state()).with_sql_options(Some(
FlightSqlService::new(ctx.state()).with_sql_options(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

// Disable all access except read only queries.
SQLOptions::new()
.with_allow_dml(false)
.with_allow_ddl(false)
.with_allow_statements(false),
)),
),
);
Ok(Server::builder().add_service(svc))
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ceramic-core.workspace = true
ceramic-event.workspace = true
cid.workspace = true
datafusion.workspace = true
datafusion-functions-json = "0.42.0"
datafusion-functions-json = "0.43.0"
expect-test.workspace = true
futures.workspace = true
int-enum.workspace = true
Expand Down
18 changes: 13 additions & 5 deletions pipeline/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use arrow::{
array::{Array as _, ArrayBuilder as _, ArrayRef, BinaryBuilder},
datatypes::DataType,
};
use arrow_schema::Field;
use datafusion::{
common::{cast::as_binary_array, exec_datafusion_err, Result},
logical_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
function::PartitionEvaluatorArgs, PartitionEvaluator, Signature, TypeSignature, Volatility,
WindowUDF, WindowUDFImpl,
},
};
use json_patch::PatchOperation;
Expand Down Expand Up @@ -54,12 +56,18 @@ impl WindowUDFImpl for CeramicPatch {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CeramicPatchEvaluator))
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CeramicPatchEvaluator))
fn field(
&self,
field_args: datafusion::logical_expr::function::WindowUDFFieldArgs,
) -> Result<arrow_schema::Field> {
Ok(Field::new(field_args.name(), DataType::Binary, false))
}
}
// Small wrapper container around the data/state fields to hold
Expand Down
6 changes: 3 additions & 3 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use datafusion::{
functions_aggregate::min_max::max,
functions_array::extract::array_element,
logical_expr::{
col, expr::WindowFunction, lit, Expr, ExprFunctionExt as _, LogicalPlanBuilder,
WindowFunctionDefinition,
col, dml::InsertOp, expr::WindowFunction, lit, Expr, ExprFunctionExt as _,
LogicalPlanBuilder, WindowFunctionDefinition,
},
physical_plan::collect_partitioned,
};
Expand Down Expand Up @@ -244,7 +244,7 @@ async fn process_conclusion_events_batch(
ctx.read_batch(RecordBatch::new_empty(schemas::event_states()))?
.write_table(
EVENT_STATES_MEM_TABLE,
DataFrameWriteOptions::new().with_overwrite(true),
DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite),
)
.await?;
}
Expand Down
20 changes: 14 additions & 6 deletions pipeline/src/cache_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::common::not_impl_err;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::SortExpr;
use datafusion::{
catalog::Session,
Expand Down Expand Up @@ -156,7 +158,7 @@ impl TableProvider for CacheTable {
create_physical_sort_exprs(sort_exprs, &df_schema, state.execution_props())
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
exec = exec.try_with_sort_information(file_sort_order)?;
}

Ok(Arc::new(exec))
Expand All @@ -178,7 +180,7 @@ impl TableProvider for CacheTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];
Expand All @@ -191,9 +193,15 @@ impl TableProvider for CacheTable {
{
return plan_err!("Inserting query must have the same schema with the table.");
}
if overwrite {
self.clear().await;
}
match insert_op {
InsertOp::Append => {}
InsertOp::Overwrite => {
self.clear().await;
}
InsertOp::Replace => {
return not_impl_err!("replace not implemented for CacheTable yet");
}
};
let sink = Arc::new(CacheSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
input,
Expand Down Expand Up @@ -540,7 +548,7 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
1 change: 1 addition & 0 deletions pipeline/src/conclusion/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl<T: ConclusionFeed> ConclusionFeed for Arc<T> {

// Implements the [`TableProvider`] trait producing a [`FeedExec`] instance when the table is
// scanned, which in turn calls into the [`ConclusionFeed`] to get the actual events.
#[derive(Debug)]
pub struct FeedTable<T> {
feed: Arc<T>,
schema: SchemaRef,
Expand Down
Loading