Skip to content

Commit

Permalink
fix: respect case sensitivity on operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda authored and rtyler committed Dec 12, 2023
1 parent 6d07bc5 commit b4c055d
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 15 deletions.
5 changes: 4 additions & 1 deletion crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use sqlparser::tokenizer::Tokenizer;

use crate::{DeltaResult, DeltaTableError};

use super::DeltaParserOptions;

pub(crate) struct DeltaContextProvider<'a> {
state: &'a SessionState,
}
Expand Down Expand Up @@ -97,7 +99,8 @@ pub(crate) fn parse_predicate_expression(
})?;

let context_provider = DeltaContextProvider { state: df_state };
let sql_to_rel = SqlToRel::new(&context_provider);
let sql_to_rel =
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());

Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?)
}
Expand Down
174 changes: 173 additions & 1 deletion crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
Expand All @@ -65,6 +65,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use log::error;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -1494,6 +1495,111 @@ pub async fn find_files<'a>(
}
}

/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
pub struct DeltaParserOptions {
inner: ParserOptions,
}

impl Default for DeltaParserOptions {
fn default() -> Self {
DeltaParserOptions {
inner: ParserOptions {
enable_ident_normalization: false,
..ParserOptions::default()
},
}
}
}

impl From<DeltaParserOptions> for ParserOptions {
fn from(value: DeltaParserOptions) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
pub struct DeltaSessionConfig {
inner: SessionConfig,
}

impl Default for DeltaSessionConfig {
fn default() -> Self {
DeltaSessionConfig {
inner: SessionConfig::default()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
}
}
}

impl From<DeltaSessionConfig> for SessionConfig {
fn from(value: DeltaSessionConfig) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
pub struct DeltaSessionContext {
inner: SessionContext,
}

impl Default for DeltaSessionContext {
fn default() -> Self {
DeltaSessionContext {
inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()),
}
}
}

impl From<DeltaSessionContext> for SessionContext {
fn from(value: DeltaSessionContext) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
pub struct DeltaColumn {
inner: Column,
}

impl From<&str> for DeltaColumn {
fn from(c: &str) -> Self {
DeltaColumn {
inner: Column::from_qualified_name_ignore_case(c),
}
}
}

/// Create a column, cloning the string
impl From<&String> for DeltaColumn {
fn from(c: &String) -> Self {
DeltaColumn {
inner: Column::from_qualified_name_ignore_case(c),
}
}
}

/// Create a column, reusing the existing string
impl From<String> for DeltaColumn {
fn from(c: String) -> Self {
DeltaColumn {
inner: Column::from_qualified_name_ignore_case(c),
}
}
}

impl From<DeltaColumn> for Column {
fn from(value: DeltaColumn) -> Self {
value.inner
}
}

/// Create a column, resuing the existing datafusion column
impl From<Column> for DeltaColumn {
fn from(c: Column) -> Self {
DeltaColumn { inner: c }
}
}

#[cfg(test)]
mod tests {
use crate::writer::test_utils::get_delta_schema;
Expand Down Expand Up @@ -1804,4 +1910,70 @@ mod tests {
];
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn delta_scan_case_sensitive() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("moDified", DataType::Utf8, true),
Field::new("ID", DataType::Utf8, true),
Field::new("vaLue", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();
let log = table.log_store();

let provider = DeltaTableProvider::try_new(table.state, log, config).unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

let df = ctx
.sql("select ID, moDified, vaLue from test")
.await
.unwrap();
let actual = df.collect().await.unwrap();
let expected = vec![
"+----+------------+-------+",
"| ID | moDified | vaLue |",
"+----+------------+-------+",
"| A | 2021-02-01 | 1 |",
"| B | 2021-02-01 | 10 |",
"| C | 2021-02-02 | 20 |",
"| D | 2021-02-02 | 100 |",
"+----+------------+-------+",
];
assert_batches_sorted_eq!(&expected, &actual);

/* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
/*
let df = ctx
.table("test")
.await
.unwrap()
.select(vec![col("ID"), col("moDified"), col("vaLue")])
.unwrap();
let actual = df.collect().await.unwrap();
assert_batches_sorted_eq!(&expected, &actual);
*/
}
}
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use serde_json::Value;
use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder, DeltaSessionContext};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Remove};
use crate::operations::transaction::commit;
Expand Down Expand Up @@ -280,7 +280,7 @@ impl std::future::IntoFuture for DeleteBuilder {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
let session = SessionContext::new();
let session: SessionContext = DeltaSessionContext::default().into();

// If a user provides their own their DF state then they must register the store themselves
register_store(this.log_store.clone(), session.runtime_env());
Expand Down
Loading

0 comments on commit b4c055d

Please sign in to comment.